From 77804b1aed3d65db369f5b6ecb41df64ed66616a Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 9 Feb 2018 15:48:26 -0800 Subject: [PATCH] PYTHON-1422 Don't update a closed topology --- pymongo/topology.py | 59 +++++++++++++++++++++-------------- test/test_topology.py | 37 ++++++++++++++++------ test/utils_selection_tests.py | 2 ++ 3 files changed, 65 insertions(+), 33 deletions(-) diff --git a/pymongo/topology.py b/pymongo/topology.py index 2ece443df..3891777d5 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -246,38 +246,49 @@ class Topology(object): server_selection_timeout, address) + def _process_change(self, server_description): + """Process a new ServerDescription on an opened topology. + + Hold the lock when calling this. + """ + td_old = self._description + if self._publish_server: + old_server_description = td_old._server_descriptions[ + server_description.address] + self._events.put(( + self._listeners.publish_server_description_changed, + (old_server_description, server_description, + server_description.address, self._topology_id))) + + self._description = updated_topology_description( + self._description, server_description) + + self._update_servers() + self._receive_cluster_time_no_lock(server_description.cluster_time) + + if self._publish_tp: + self._events.put(( + self._listeners.publish_topology_description_changed, + (td_old, self._description, self._topology_id))) + + # Wake waiters in select_servers(). + self._condition.notify_all() + def on_change(self, server_description): """Process a new ServerDescription after an ismaster call completes.""" # We do no I/O holding the lock. with self._lock: + # Monitors may continue working on ismaster calls for some time + # after a call to Topology.close, so this method may be called at + # any time. Ensure the topology is open before processing the + # change. # Any monitored server was definitely in the topology description # once. Check if it's still in the description or if some state- # change removed it. E.g., we got a host list from the primary # that didn't include this server. - if self._description.has_server(server_description.address): - td_old = self._description - if self._publish_server: - old_server_description = td_old._server_descriptions[ - server_description.address] - self._events.put(( - self._listeners.publish_server_description_changed, - (old_server_description, server_description, - server_description.address, self._topology_id))) - - self._description = updated_topology_description( - self._description, server_description) - - self._update_servers() - self._receive_cluster_time_no_lock( - server_description.cluster_time) - - if self._publish_tp: - self._events.put(( - self._listeners.publish_topology_description_changed, - (td_old, self._description, self._topology_id))) - - # Wake waiters in select_servers(). - self._condition.notify_all() + if (self._opened and + self._description.has_server(server_description.address)): + self._process_change(server_description) def get_server_by_address(self, address): """Get a Server or None. diff --git a/test/test_topology.py b/test/test_topology.py index 76732a9f2..11c726324 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -75,15 +75,16 @@ class MockMonitor(object): def __init__(self, server_description, topology, pool, topology_settings): self._server_description = server_description self._topology = topology + self.opened = False def open(self): - pass + self.opened = True def request_check(self): pass def close(self): - pass + self.opened = False class SetNameDiscoverySettings(TopologySettings): @@ -122,9 +123,16 @@ def disconnected(topology, server_address): topology.on_change(ServerDescription(server_address)) +def get_server(topology, hostname): + return topology.get_server_by_address((hostname, 27017)) + + def get_type(topology, hostname): - description = topology.get_server_by_address((hostname, 27017)).description - return description.server_type + return get_server(topology, hostname).description.server_type + + +def get_monitor(topology, hostname): + return get_server(topology, hostname)._monitor class TopologyTest(unittest.TestCase): @@ -398,6 +406,8 @@ class TestMultiServerTopology(TopologyTest): self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a')) self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, 'b')) + self.assertTrue(get_monitor(t, 'a').opened) + self.assertTrue(get_monitor(t, 'b').opened) self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, t.description.topology_type) @@ -405,19 +415,28 @@ class TestMultiServerTopology(TopologyTest): self.assertEqual(2, len(t.description.server_descriptions())) self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b')) + self.assertFalse(get_monitor(t, 'a').opened) + self.assertFalse(get_monitor(t, 'b').opened) self.assertEqual('rs', t.description.replica_set_name) self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, t.description.topology_type) + # A closed topology should not be updated when receiving an isMaster. got_ismaster(t, ('a', 27017), { 'ok': 1, 'ismaster': True, 'setName': 'rs', - 'hosts': ['a', 'b']}) + 'hosts': ['a', 'b', 'c']}) - self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a')) + self.assertEqual(2, len(t.description.server_descriptions())) + self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b')) - self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + self.assertFalse(get_monitor(t, 'a').opened) + self.assertFalse(get_monitor(t, 'b').opened) + # Server c should not have been added. + self.assertEqual(None, get_server(t, 'c')) + self.assertEqual('rs', t.description.replica_set_name) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, t.description.topology_type) def test_reset_server(self): @@ -480,7 +499,7 @@ class TestMultiServerTopology(TopologyTest): self.assertEqual(t.description.replica_set_name, None) self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetNoPrimary) - + t.open() got_ismaster(t, address, { 'ok': 1, 'ismaster': True, @@ -516,7 +535,7 @@ class TestMultiServerTopology(TopologyTest): self.assertEqual(t.description.replica_set_name, None) self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetNoPrimary) - + t.open() got_ismaster(t, address, { 'ok': 1, 'ismaster': False, diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index 58655e165..767655266 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -159,12 +159,14 @@ def create_test(scenario_def): # the set of servers matching both the ReadPreference's mode # and tag sets. top_latency = Topology(TopologySettings(**settings)) + top_latency.open() # "In latency window" is defined in the server selection # spec as the subset of suitable_servers that falls within the # allowable latency window. settings['local_threshold_ms'] = 1000000 top_suitable = Topology(TopologySettings(**settings)) + top_suitable.open() # Update topologies with server descriptions. for server in scenario_def['topology_description']['servers']: