PYTHON-1332 - send $clusterTime with isMaster

This commit is contained in:
A. Jesse Jiryu Davis 2017-10-19 12:25:00 +02:00
parent 5705b10d57
commit dcd8f68994
4 changed files with 19 additions and 11 deletions

View File

@ -120,8 +120,10 @@ class Monitor(object):
start = _time()
try:
cluster_time = self._topology.max_cluster_time()
# If the server type is unknown, send metadata with first check.
return self._check_once(metadata=metadata)
return self._check_once(metadata=metadata,
cluster_time=cluster_time)
except ReferenceError:
raise
except Exception as error:
@ -137,9 +139,12 @@ class Monitor(object):
return default
# Try a second and final time. If it fails return original error.
# Always send metadata: this is a new connection.
start = _time()
try:
return self._check_once(metadata=self._pool.opts.metadata)
cluster_time = self._topology.max_cluster_time()
return self._check_once(metadata=self._pool.opts.metadata,
cluster_time=cluster_time)
except ReferenceError:
raise
except Exception as error:
@ -150,7 +155,7 @@ class Monitor(object):
self._avg_round_trip_time.reset()
return default
def _check_once(self, metadata=None):
def _check_once(self, metadata=None, cluster_time=None):
"""A single attempt to call ismaster.
Returns a ServerDescription, or raises an exception.
@ -160,7 +165,7 @@ class Monitor(object):
self._listeners.publish_server_heartbeat_started(address)
with self._pool.get_socket({}) as sock_info:
response, round_trip_time = self._check_with_socket(
sock_info, metadata=metadata)
sock_info, metadata=metadata, cluster_time=cluster_time)
self._avg_round_trip_time.add_sample(round_trip_time)
sd = ServerDescription(
address=address,
@ -172,7 +177,7 @@ class Monitor(object):
return sd
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, sock_info, metadata=None, cluster_time=None):
"""Return (IsMaster, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
@ -180,6 +185,8 @@ class Monitor(object):
cmd = SON([('ismaster', 1)])
if metadata is not None:
cmd['client'] = metadata
if cluster_time is not None:
cmd['$clusterTime'] = cluster_time
start = _time()
request_id, msg, max_doc_size = message.query(
0, 'admin.$cmd', 0, -1, cmd,

View File

@ -76,7 +76,7 @@ class MockMonitor(Monitor):
pool,
topology_settings)
def _check_once(self, metadata=None):
def _check_once(self, metadata=None, cluster_time=None):
address = self._server_description.address
response, rtt = self.client.mock_is_master('%s:%d' % address)
return ServerDescription(address, IsMaster(response), rtt)

View File

@ -75,7 +75,7 @@ class TestHeartbeatMonitoring(unittest.TestCase):
min_heartbeat_interval=0.1,
events_queue_frequency=0.1):
class MockMonitor(Monitor):
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, *args, **kwargs):
if isinstance(responses[1], Exception):
raise responses[1]
return IsMaster(responses[1]), 99

View File

@ -254,7 +254,7 @@ class TestSingleServerTopology(TopologyTest):
available = True
class TestMonitor(Monitor):
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, *args, **kwargs):
if available:
return (IsMaster({'ok': 1, 'maxWireVersion': 6}),
round_trip_time)
@ -619,6 +619,7 @@ def wait_for_master(topology):
thread. In applications this is harmless but it would break some tests,
so we pass server_selection_timeout=0 and poll instead.
"""
def get_master():
try:
return topology.select_server(writable_server_selector, 0)
@ -636,7 +637,7 @@ class TestTopologyErrors(TopologyTest):
ismaster_count = [0]
class TestMonitor(Monitor):
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, *args, **kwargs):
ismaster_count[0] += 1
if ismaster_count[0] == 1:
return IsMaster({'ok': 1, 'maxWireVersion': 6}), 0
@ -657,7 +658,7 @@ class TestTopologyErrors(TopologyTest):
ismaster_count = [0]
class TestMonitor(Monitor):
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, *args, **kwargs):
ismaster_count[0] += 1
if ismaster_count[0] in (1, 3):
return IsMaster({'ok': 1, 'maxWireVersion': 6}), 0
@ -679,7 +680,7 @@ class TestTopologyErrors(TopologyTest):
exception = AssertionError('internal error')
class TestMonitor(Monitor):
def _check_with_socket(self, sock_info, metadata=None):
def _check_with_socket(self, *args, **kwargs):
raise exception
t = create_mock_topology(monitor_class=TestMonitor)