PYTHON-1422 Don't update a closed topology
This commit is contained in:
parent
585d0fbd4f
commit
77804b1aed
@ -246,38 +246,49 @@ class Topology(object):
|
||||
server_selection_timeout,
|
||||
address)
|
||||
|
||||
def _process_change(self, server_description):
|
||||
"""Process a new ServerDescription on an opened topology.
|
||||
|
||||
Hold the lock when calling this.
|
||||
"""
|
||||
td_old = self._description
|
||||
if self._publish_server:
|
||||
old_server_description = td_old._server_descriptions[
|
||||
server_description.address]
|
||||
self._events.put((
|
||||
self._listeners.publish_server_description_changed,
|
||||
(old_server_description, server_description,
|
||||
server_description.address, self._topology_id)))
|
||||
|
||||
self._description = updated_topology_description(
|
||||
self._description, server_description)
|
||||
|
||||
self._update_servers()
|
||||
self._receive_cluster_time_no_lock(server_description.cluster_time)
|
||||
|
||||
if self._publish_tp:
|
||||
self._events.put((
|
||||
self._listeners.publish_topology_description_changed,
|
||||
(td_old, self._description, self._topology_id)))
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
self._condition.notify_all()
|
||||
|
||||
def on_change(self, server_description):
|
||||
"""Process a new ServerDescription after an ismaster call completes."""
|
||||
# We do no I/O holding the lock.
|
||||
with self._lock:
|
||||
# Monitors may continue working on ismaster calls for some time
|
||||
# after a call to Topology.close, so this method may be called at
|
||||
# any time. Ensure the topology is open before processing the
|
||||
# change.
|
||||
# Any monitored server was definitely in the topology description
|
||||
# once. Check if it's still in the description or if some state-
|
||||
# change removed it. E.g., we got a host list from the primary
|
||||
# that didn't include this server.
|
||||
if self._description.has_server(server_description.address):
|
||||
td_old = self._description
|
||||
if self._publish_server:
|
||||
old_server_description = td_old._server_descriptions[
|
||||
server_description.address]
|
||||
self._events.put((
|
||||
self._listeners.publish_server_description_changed,
|
||||
(old_server_description, server_description,
|
||||
server_description.address, self._topology_id)))
|
||||
|
||||
self._description = updated_topology_description(
|
||||
self._description, server_description)
|
||||
|
||||
self._update_servers()
|
||||
self._receive_cluster_time_no_lock(
|
||||
server_description.cluster_time)
|
||||
|
||||
if self._publish_tp:
|
||||
self._events.put((
|
||||
self._listeners.publish_topology_description_changed,
|
||||
(td_old, self._description, self._topology_id)))
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
self._condition.notify_all()
|
||||
if (self._opened and
|
||||
self._description.has_server(server_description.address)):
|
||||
self._process_change(server_description)
|
||||
|
||||
def get_server_by_address(self, address):
|
||||
"""Get a Server or None.
|
||||
|
||||
@ -75,15 +75,16 @@ class MockMonitor(object):
|
||||
def __init__(self, server_description, topology, pool, topology_settings):
|
||||
self._server_description = server_description
|
||||
self._topology = topology
|
||||
self.opened = False
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
self.opened = True
|
||||
|
||||
def request_check(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.opened = False
|
||||
|
||||
|
||||
class SetNameDiscoverySettings(TopologySettings):
|
||||
@ -122,9 +123,16 @@ def disconnected(topology, server_address):
|
||||
topology.on_change(ServerDescription(server_address))
|
||||
|
||||
|
||||
def get_server(topology, hostname):
|
||||
return topology.get_server_by_address((hostname, 27017))
|
||||
|
||||
|
||||
def get_type(topology, hostname):
|
||||
description = topology.get_server_by_address((hostname, 27017)).description
|
||||
return description.server_type
|
||||
return get_server(topology, hostname).description.server_type
|
||||
|
||||
|
||||
def get_monitor(topology, hostname):
|
||||
return get_server(topology, hostname)._monitor
|
||||
|
||||
|
||||
class TopologyTest(unittest.TestCase):
|
||||
@ -398,6 +406,8 @@ class TestMultiServerTopology(TopologyTest):
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, 'b'))
|
||||
self.assertTrue(get_monitor(t, 'a').opened)
|
||||
self.assertTrue(get_monitor(t, 'b').opened)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
t.description.topology_type)
|
||||
|
||||
@ -405,19 +415,28 @@ class TestMultiServerTopology(TopologyTest):
|
||||
self.assertEqual(2, len(t.description.server_descriptions()))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b'))
|
||||
self.assertFalse(get_monitor(t, 'a').opened)
|
||||
self.assertFalse(get_monitor(t, 'b').opened)
|
||||
self.assertEqual('rs', t.description.replica_set_name)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
t.description.topology_type)
|
||||
|
||||
# A closed topology should not be updated when receiving an isMaster.
|
||||
got_ismaster(t, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
'hosts': ['a', 'b', 'c']})
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a'))
|
||||
self.assertEqual(2, len(t.description.server_descriptions()))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b'))
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
self.assertFalse(get_monitor(t, 'a').opened)
|
||||
self.assertFalse(get_monitor(t, 'b').opened)
|
||||
# Server c should not have been added.
|
||||
self.assertEqual(None, get_server(t, 'c'))
|
||||
self.assertEqual('rs', t.description.replica_set_name)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
t.description.topology_type)
|
||||
|
||||
def test_reset_server(self):
|
||||
@ -480,7 +499,7 @@ class TestMultiServerTopology(TopologyTest):
|
||||
self.assertEqual(t.description.replica_set_name, None)
|
||||
self.assertEqual(t.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
||||
|
||||
t.open()
|
||||
got_ismaster(t, address, {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -516,7 +535,7 @@ class TestMultiServerTopology(TopologyTest):
|
||||
self.assertEqual(t.description.replica_set_name, None)
|
||||
self.assertEqual(t.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
||||
|
||||
t.open()
|
||||
got_ismaster(t, address, {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
|
||||
@ -159,12 +159,14 @@ def create_test(scenario_def):
|
||||
# the set of servers matching both the ReadPreference's mode
|
||||
# and tag sets.
|
||||
top_latency = Topology(TopologySettings(**settings))
|
||||
top_latency.open()
|
||||
|
||||
# "In latency window" is defined in the server selection
|
||||
# spec as the subset of suitable_servers that falls within the
|
||||
# allowable latency window.
|
||||
settings['local_threshold_ms'] = 1000000
|
||||
top_suitable = Topology(TopologySettings(**settings))
|
||||
top_suitable.open()
|
||||
|
||||
# Update topologies with server descriptions.
|
||||
for server in scenario_def['topology_description']['servers']:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user