diff --git a/pymongo/monitor.py b/pymongo/monitor.py index e26f93b5a..a0e689455 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -120,8 +120,10 @@ class Monitor(object): start = _time() try: + cluster_time = self._topology.max_cluster_time() # If the server type is unknown, send metadata with first check. - return self._check_once(metadata=metadata) + return self._check_once(metadata=metadata, + cluster_time=cluster_time) except ReferenceError: raise except Exception as error: @@ -137,9 +139,12 @@ class Monitor(object): return default # Try a second and final time. If it fails return original error. + # Always send metadata: this is a new connection. start = _time() try: - return self._check_once(metadata=self._pool.opts.metadata) + cluster_time = self._topology.max_cluster_time() + return self._check_once(metadata=self._pool.opts.metadata, + cluster_time=cluster_time) except ReferenceError: raise except Exception as error: @@ -150,7 +155,7 @@ class Monitor(object): self._avg_round_trip_time.reset() return default - def _check_once(self, metadata=None): + def _check_once(self, metadata=None, cluster_time=None): """A single attempt to call ismaster. Returns a ServerDescription, or raises an exception. @@ -160,7 +165,7 @@ class Monitor(object): self._listeners.publish_server_heartbeat_started(address) with self._pool.get_socket({}) as sock_info: response, round_trip_time = self._check_with_socket( - sock_info, metadata=metadata) + sock_info, metadata=metadata, cluster_time=cluster_time) self._avg_round_trip_time.add_sample(round_trip_time) sd = ServerDescription( address=address, @@ -172,7 +177,7 @@ class Monitor(object): return sd - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, sock_info, metadata=None, cluster_time=None): """Return (IsMaster, round_trip_time). Can raise ConnectionFailure or OperationFailure. @@ -180,6 +185,8 @@ class Monitor(object): cmd = SON([('ismaster', 1)]) if metadata is not None: cmd['client'] = metadata + if cluster_time is not None: + cmd['$clusterTime'] = cluster_time start = _time() request_id, msg, max_doc_size = message.query( 0, 'admin.$cmd', 0, -1, cmd, diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index b01afcdb5..0ec64109b 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -76,7 +76,7 @@ class MockMonitor(Monitor): pool, topology_settings) - def _check_once(self, metadata=None): + def _check_once(self, metadata=None, cluster_time=None): address = self._server_description.address response, rtt = self.client.mock_is_master('%s:%d' % address) return ServerDescription(address, IsMaster(response), rtt) diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py index 5a9d5bb89..53c54abae 100644 --- a/test/test_heartbeat_monitoring.py +++ b/test/test_heartbeat_monitoring.py @@ -75,7 +75,7 @@ class TestHeartbeatMonitoring(unittest.TestCase): min_heartbeat_interval=0.1, events_queue_frequency=0.1): class MockMonitor(Monitor): - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, *args, **kwargs): if isinstance(responses[1], Exception): raise responses[1] return IsMaster(responses[1]), 99 diff --git a/test/test_topology.py b/test/test_topology.py index f9328dfa4..4dd19a8ea 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -254,7 +254,7 @@ class TestSingleServerTopology(TopologyTest): available = True class TestMonitor(Monitor): - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, *args, **kwargs): if available: return (IsMaster({'ok': 1, 'maxWireVersion': 6}), round_trip_time) @@ -619,6 +619,7 @@ def wait_for_master(topology): thread. In applications this is harmless but it would break some tests, so we pass server_selection_timeout=0 and poll instead. """ + def get_master(): try: return topology.select_server(writable_server_selector, 0) @@ -636,7 +637,7 @@ class TestTopologyErrors(TopologyTest): ismaster_count = [0] class TestMonitor(Monitor): - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, *args, **kwargs): ismaster_count[0] += 1 if ismaster_count[0] == 1: return IsMaster({'ok': 1, 'maxWireVersion': 6}), 0 @@ -657,7 +658,7 @@ class TestTopologyErrors(TopologyTest): ismaster_count = [0] class TestMonitor(Monitor): - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, *args, **kwargs): ismaster_count[0] += 1 if ismaster_count[0] in (1, 3): return IsMaster({'ok': 1, 'maxWireVersion': 6}), 0 @@ -679,7 +680,7 @@ class TestTopologyErrors(TopologyTest): exception = AssertionError('internal error') class TestMonitor(Monitor): - def _check_with_socket(self, sock_info, metadata=None): + def _check_with_socket(self, *args, **kwargs): raise exception t = create_mock_topology(monitor_class=TestMonitor)