From 6609cc571476f9be33da3a6d5dd703bfcb26a377 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 15 Jan 2020 17:17:19 -0800 Subject: [PATCH] PYTHON-2098 Publish server/topology events when reseting a server due to an application error --- pymongo/database.py | 4 +- pymongo/mongo_client.py | 22 ++++---- pymongo/topology.py | 18 +++---- test/test_sdam_monitoring_spec.py | 89 ++++++++++++++++++++++++++++++- test/test_topology.py | 6 +-- test/utils.py | 8 +++ 6 files changed, 120 insertions(+), 27 deletions(-) diff --git a/pymongo/database.py b/pymongo/database.py index 701e55221..50a5d1fa3 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -29,6 +29,7 @@ from pymongo.command_cursor import CommandCursor from pymongo.errors import (CollectionInvalid, ConfigurationError, InvalidName, + NotMasterError, OperationFailure) from pymongo.message import _first_batch from pymongo.read_preferences import ReadPreference @@ -1135,7 +1136,8 @@ class Database(common.BaseObject): # doing so already. primary = self.__client.primary if primary: - self.__client._reset_server_and_request_check(primary) + self.__client._reset_server_and_request_check( + primary, NotMasterError(error_msg, error)) return error def last_status(self): diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index edee1afb6..8294af14e 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1490,13 +1490,9 @@ class MongoClient(common.BaseObject): with self._tmp_session(session) as s: return self._retry_with_session(retryable, func, s, None) - def _reset_server(self, address): - """Clear our connection pool for a server and mark it Unknown.""" - self._topology.reset_server(address) - - def _reset_server_and_request_check(self, address): + def _reset_server_and_request_check(self, address, error): """Clear our pool for a server, mark it Unknown, and check it soon.""" - self._topology.reset_server_and_request_check(address) + self._topology.reset_server_and_request_check(address, error) def __eq__(self, other): if isinstance(other, self.__class__): @@ -2168,7 +2164,7 @@ class _MongoClientErrorHandler(object): self._client = client self._server_address = server_address self._session = session - self._max_wire_version = None + self._max_wire_version = common.MIN_WIRE_VERSION def contribute_socket(self, sock_info): """Provide socket information to the error handler.""" @@ -2205,22 +2201,22 @@ class _MongoClientErrorHandler(object): # Unknown and request an immediate check of the server. err_code = exc_val.details.get('code', -1) is_shutting_down = err_code in helpers._SHUTDOWN_CODES - if (is_shutting_down or (self._max_wire_version is None) or - (self._max_wire_version <= 7)): + if is_shutting_down or (self._max_wire_version <= 7): # Clear the pool, mark server Unknown and request check. self._client._reset_server_and_request_check( - self._server_address) + self._server_address, exc_val) else: self._client._topology.mark_server_unknown_and_request_check( - self._server_address) + self._server_address, exc_val) elif issubclass(exc_type, ConnectionFailure): # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." - self._client._reset_server(self._server_address) + self._client._topology.reset_server(self._server_address, exc_val) if self._session: self._session._server_session.mark_dirty() elif issubclass(exc_type, OperationFailure): # Do not request an immediate check since the server is likely # shutting down. if exc_val.code in helpers._RETRYABLE_ERROR_CODES: - self._client._reset_server(self._server_address) + self._client._topology.reset_server( + self._server_address, exc_val) diff --git a/pymongo/topology.py b/pymongo/topology.py index a3cfe1e79..7cad22cd9 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -37,6 +37,7 @@ from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError from pymongo.monitor import SrvMonitor from pymongo.monotonic import time as _time from pymongo.server import Server +from pymongo.server_description import ServerDescription from pymongo.server_selectors import (any_server_selector, arbiter_server_selector, secondary_server_selector, @@ -407,24 +408,24 @@ class Topology(object): if server: server.pool.reset() - def reset_server(self, address): + def reset_server(self, address, error): """Clear our pool for a server and mark it Unknown. Do *not* request an immediate check. """ with self._lock: - self._reset_server(address, reset_pool=True) + self._reset_server(address, reset_pool=True, error=error) - def reset_server_and_request_check(self, address): + def reset_server_and_request_check(self, address, error): """Clear our pool for a server, mark it Unknown, and check it soon.""" with self._lock: - self._reset_server(address, reset_pool=True) + self._reset_server(address, reset_pool=True, error=error) self._request_check(address) - def mark_server_unknown_and_request_check(self, address): + def mark_server_unknown_and_request_check(self, address, error): """Mark a server Unknown, and check it soon.""" with self._lock: - self._reset_server(address, reset_pool=False) + self._reset_server(address, reset_pool=False, error=error) self._request_check(address) def update_pool(self): @@ -537,7 +538,7 @@ class Topology(object): for server in itervalues(self._servers): server.open() - def _reset_server(self, address, reset_pool): + def _reset_server(self, address, reset_pool, error): """Mark a server Unknown and optionally reset it's pool. Hold the lock when calling this. Does *not* request an immediate check. @@ -550,8 +551,7 @@ class Topology(object): server.reset() # Mark this server Unknown. - self._description = self._description.reset_server(address) - self._update_servers() + self._process_change(ServerDescription(address, error=error)) def _request_check(self, address): """Wake one monitor. Hold the lock when calling this.""" diff --git a/test/test_sdam_monitoring_spec.py b/test/test_sdam_monitoring_spec.py index cb5ebcd5e..a3a843570 100644 --- a/test/test_sdam_monitoring_spec.py +++ b/test/test_sdam_monitoring_spec.py @@ -24,15 +24,18 @@ sys.path[0:0] = [""] from bson.json_util import object_hook from pymongo import monitoring from pymongo import periodic_executor +from pymongo.errors import (ConnectionFailure, + NotMasterError) from pymongo.ismaster import IsMaster from pymongo.monitor import Monitor from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription from pymongo.server_type import SERVER_TYPE from pymongo.topology_description import TOPOLOGY_TYPE -from test import unittest, client_context, client_knobs +from test import unittest, client_context, client_knobs, IntegrationTest from test.utils import (ServerAndTopologyEventListener, single_client, + rs_or_single_client, wait_until) # Location of JSON test specifications. @@ -278,5 +281,89 @@ def create_tests(): create_tests() + +class TestSdamMonitoring(IntegrationTest): + + @classmethod + @client_context.require_failCommand_fail_point + def setUpClass(cls): + super(TestSdamMonitoring, cls).setUpClass() + # Speed up the tests by decreasing the event publish frequency. + cls.knobs = client_knobs(events_queue_frequency=0.1) + cls.knobs.enable() + cls.listener = ServerAndTopologyEventListener() + retry_writes = client_context.supports_transactions() + cls.test_client = rs_or_single_client( + event_listeners=[cls.listener], retryWrites=retry_writes) + cls.coll = cls.test_client[cls.client.db.name].test + cls.coll.insert_one({}) + + @classmethod + def tearDownClass(cls): + cls.test_client.close() + cls.knobs.disable() + super(TestSdamMonitoring, cls).tearDownClass() + + def setUp(self): + self.listener.reset() + + def _test_app_error(self, fail_command_opts, expected_error): + address = self.test_client.address + + # Test that an application error causes a ServerDescriptionChangedEvent + # to be published. + data = {'failCommands': ['insert']} + data.update(fail_command_opts) + fail_insert = { + 'configureFailPoint': 'failCommand', + 'mode': {'times': 1}, + 'data': data, + } + with self.fail_point(fail_insert): + if self.test_client.retry_writes: + self.coll.insert_one({}) + else: + with self.assertRaises(expected_error): + self.coll.insert_one({}) + self.coll.insert_one({}) + + def marked_unknown(event): + return ( + isinstance(event, monitoring.ServerDescriptionChangedEvent) + and event.server_address == address + and not event.new_description.is_server_type_known) + + def discovered_node(event): + return ( + isinstance(event, monitoring.ServerDescriptionChangedEvent) + and event.server_address == address + and not event.previous_description.is_server_type_known + and event.new_description.is_server_type_known) + + def marked_unknown_and_rediscovered(): + return (len(self.listener.matching(marked_unknown)) >= 1 and + len(self.listener.matching(discovered_node)) >= 1) + + # Topology events are published asynchronously + wait_until(marked_unknown_and_rediscovered, 'rediscover node') + + # Expect a single ServerDescriptionChangedEvent for the network error. + marked_unknown_events = self.listener.matching(marked_unknown) + self.assertEqual(len(marked_unknown_events), 1) + self.assertIsInstance( + marked_unknown_events[0].new_description.error, expected_error) + + def test_network_error_publishes_events(self): + self._test_app_error({'closeConnection': True}, ConnectionFailure) + + def test_not_master_error_publishes_events(self): + self._test_app_error({'errorCode': 10107, 'closeConnection': False}, + NotMasterError) + + def test_shutdown_error_publishes_events(self): + self._test_app_error({'errorCode': 91, 'closeConnection': False}, + NotMasterError) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_topology.py b/test/test_topology.py index 31c7b0ce1..017951b57 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -423,7 +423,7 @@ class TestMultiServerTopology(TopologyTest): 'setName': 'rs', 'hosts': ['a', 'b']}) - t.reset_server(('a', 27017)) + t.reset_server(('a', 27017), None) self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a')) self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, 'b')) self.assertEqual('rs', t.description.replica_set_name) @@ -440,7 +440,7 @@ class TestMultiServerTopology(TopologyTest): self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, t.description.topology_type) - t.reset_server(('b', 27017)) + t.reset_server(('b', 27017), None) self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b')) self.assertEqual('rs', t.description.replica_set_name) @@ -451,7 +451,7 @@ class TestMultiServerTopology(TopologyTest): t = create_mock_topology(replica_set_name='rs') # No error resetting a server not in the TopologyDescription. - t.reset_server(('b', 27017)) + t.reset_server(('b', 27017), None) # Server was *not* added as type Unknown. self.assertFalse(t.has_server(('b', 27017))) diff --git a/test/utils.py b/test/utils.py index 95c088585..77e08fc4e 100644 --- a/test/utils.py +++ b/test/utils.py @@ -167,6 +167,14 @@ class ServerAndTopologyEventListener(monitoring.ServerListener, def closed(self, event): self.results.append(event) + def matching(self, matcher): + """Return the matching events.""" + results = self.results[:] + return [event for event in results if matcher(event)] + + def reset(self): + self.results = [] + class HeartbeatEventListener(monitoring.ServerHeartbeatListener): """Listens to only server heartbeat events."""