PYTHON-2098 Publish server/topology events when reseting a server due to an application error

This commit is contained in:
Shane Harvey 2020-01-15 17:17:19 -08:00
parent c4b8aef1e8
commit 6609cc5714
6 changed files with 120 additions and 27 deletions

View File

@ -29,6 +29,7 @@ from pymongo.command_cursor import CommandCursor
from pymongo.errors import (CollectionInvalid,
ConfigurationError,
InvalidName,
NotMasterError,
OperationFailure)
from pymongo.message import _first_batch
from pymongo.read_preferences import ReadPreference
@ -1135,7 +1136,8 @@ class Database(common.BaseObject):
# doing so already.
primary = self.__client.primary
if primary:
self.__client._reset_server_and_request_check(primary)
self.__client._reset_server_and_request_check(
primary, NotMasterError(error_msg, error))
return error
def last_status(self):

View File

@ -1490,13 +1490,9 @@ class MongoClient(common.BaseObject):
with self._tmp_session(session) as s:
return self._retry_with_session(retryable, func, s, None)
def _reset_server(self, address):
"""Clear our connection pool for a server and mark it Unknown."""
self._topology.reset_server(address)
def _reset_server_and_request_check(self, address):
def _reset_server_and_request_check(self, address, error):
"""Clear our pool for a server, mark it Unknown, and check it soon."""
self._topology.reset_server_and_request_check(address)
self._topology.reset_server_and_request_check(address, error)
def __eq__(self, other):
if isinstance(other, self.__class__):
@ -2168,7 +2164,7 @@ class _MongoClientErrorHandler(object):
self._client = client
self._server_address = server_address
self._session = session
self._max_wire_version = None
self._max_wire_version = common.MIN_WIRE_VERSION
def contribute_socket(self, sock_info):
"""Provide socket information to the error handler."""
@ -2205,22 +2201,22 @@ class _MongoClientErrorHandler(object):
# Unknown and request an immediate check of the server.
err_code = exc_val.details.get('code', -1)
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
if (is_shutting_down or (self._max_wire_version is None) or
(self._max_wire_version <= 7)):
if is_shutting_down or (self._max_wire_version <= 7):
# Clear the pool, mark server Unknown and request check.
self._client._reset_server_and_request_check(
self._server_address)
self._server_address, exc_val)
else:
self._client._topology.mark_server_unknown_and_request_check(
self._server_address)
self._server_address, exc_val)
elif issubclass(exc_type, ConnectionFailure):
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
self._client._reset_server(self._server_address)
self._client._topology.reset_server(self._server_address, exc_val)
if self._session:
self._session._server_session.mark_dirty()
elif issubclass(exc_type, OperationFailure):
# Do not request an immediate check since the server is likely
# shutting down.
if exc_val.code in helpers._RETRYABLE_ERROR_CODES:
self._client._reset_server(self._server_address)
self._client._topology.reset_server(
self._server_address, exc_val)

View File

@ -37,6 +37,7 @@ from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
from pymongo.monitor import SrvMonitor
from pymongo.monotonic import time as _time
from pymongo.server import Server
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import (any_server_selector,
arbiter_server_selector,
secondary_server_selector,
@ -407,24 +408,24 @@ class Topology(object):
if server:
server.pool.reset()
def reset_server(self, address):
def reset_server(self, address, error):
"""Clear our pool for a server and mark it Unknown.
Do *not* request an immediate check.
"""
with self._lock:
self._reset_server(address, reset_pool=True)
self._reset_server(address, reset_pool=True, error=error)
def reset_server_and_request_check(self, address):
def reset_server_and_request_check(self, address, error):
"""Clear our pool for a server, mark it Unknown, and check it soon."""
with self._lock:
self._reset_server(address, reset_pool=True)
self._reset_server(address, reset_pool=True, error=error)
self._request_check(address)
def mark_server_unknown_and_request_check(self, address):
def mark_server_unknown_and_request_check(self, address, error):
"""Mark a server Unknown, and check it soon."""
with self._lock:
self._reset_server(address, reset_pool=False)
self._reset_server(address, reset_pool=False, error=error)
self._request_check(address)
def update_pool(self):
@ -537,7 +538,7 @@ class Topology(object):
for server in itervalues(self._servers):
server.open()
def _reset_server(self, address, reset_pool):
def _reset_server(self, address, reset_pool, error):
"""Mark a server Unknown and optionally reset it's pool.
Hold the lock when calling this. Does *not* request an immediate check.
@ -550,8 +551,7 @@ class Topology(object):
server.reset()
# Mark this server Unknown.
self._description = self._description.reset_server(address)
self._update_servers()
self._process_change(ServerDescription(address, error=error))
def _request_check(self, address):
"""Wake one monitor. Hold the lock when calling this."""

View File

@ -24,15 +24,18 @@ sys.path[0:0] = [""]
from bson.json_util import object_hook
from pymongo import monitoring
from pymongo import periodic_executor
from pymongo.errors import (ConnectionFailure,
NotMasterError)
from pymongo.ismaster import IsMaster
from pymongo.monitor import Monitor
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.server_type import SERVER_TYPE
from pymongo.topology_description import TOPOLOGY_TYPE
from test import unittest, client_context, client_knobs
from test import unittest, client_context, client_knobs, IntegrationTest
from test.utils import (ServerAndTopologyEventListener,
single_client,
rs_or_single_client,
wait_until)
# Location of JSON test specifications.
@ -278,5 +281,89 @@ def create_tests():
create_tests()
class TestSdamMonitoring(IntegrationTest):
@classmethod
@client_context.require_failCommand_fail_point
def setUpClass(cls):
super(TestSdamMonitoring, cls).setUpClass()
# Speed up the tests by decreasing the event publish frequency.
cls.knobs = client_knobs(events_queue_frequency=0.1)
cls.knobs.enable()
cls.listener = ServerAndTopologyEventListener()
retry_writes = client_context.supports_transactions()
cls.test_client = rs_or_single_client(
event_listeners=[cls.listener], retryWrites=retry_writes)
cls.coll = cls.test_client[cls.client.db.name].test
cls.coll.insert_one({})
@classmethod
def tearDownClass(cls):
cls.test_client.close()
cls.knobs.disable()
super(TestSdamMonitoring, cls).tearDownClass()
def setUp(self):
self.listener.reset()
def _test_app_error(self, fail_command_opts, expected_error):
address = self.test_client.address
# Test that an application error causes a ServerDescriptionChangedEvent
# to be published.
data = {'failCommands': ['insert']}
data.update(fail_command_opts)
fail_insert = {
'configureFailPoint': 'failCommand',
'mode': {'times': 1},
'data': data,
}
with self.fail_point(fail_insert):
if self.test_client.retry_writes:
self.coll.insert_one({})
else:
with self.assertRaises(expected_error):
self.coll.insert_one({})
self.coll.insert_one({})
def marked_unknown(event):
return (
isinstance(event, monitoring.ServerDescriptionChangedEvent)
and event.server_address == address
and not event.new_description.is_server_type_known)
def discovered_node(event):
return (
isinstance(event, monitoring.ServerDescriptionChangedEvent)
and event.server_address == address
and not event.previous_description.is_server_type_known
and event.new_description.is_server_type_known)
def marked_unknown_and_rediscovered():
return (len(self.listener.matching(marked_unknown)) >= 1 and
len(self.listener.matching(discovered_node)) >= 1)
# Topology events are published asynchronously
wait_until(marked_unknown_and_rediscovered, 'rediscover node')
# Expect a single ServerDescriptionChangedEvent for the network error.
marked_unknown_events = self.listener.matching(marked_unknown)
self.assertEqual(len(marked_unknown_events), 1)
self.assertIsInstance(
marked_unknown_events[0].new_description.error, expected_error)
def test_network_error_publishes_events(self):
self._test_app_error({'closeConnection': True}, ConnectionFailure)
def test_not_master_error_publishes_events(self):
self._test_app_error({'errorCode': 10107, 'closeConnection': False},
NotMasterError)
def test_shutdown_error_publishes_events(self):
self._test_app_error({'errorCode': 91, 'closeConnection': False},
NotMasterError)
if __name__ == "__main__":
unittest.main()

View File

@ -423,7 +423,7 @@ class TestMultiServerTopology(TopologyTest):
'setName': 'rs',
'hosts': ['a', 'b']})
t.reset_server(('a', 27017))
t.reset_server(('a', 27017), None)
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'a'))
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, 'b'))
self.assertEqual('rs', t.description.replica_set_name)
@ -440,7 +440,7 @@ class TestMultiServerTopology(TopologyTest):
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
t.description.topology_type)
t.reset_server(('b', 27017))
t.reset_server(('b', 27017), None)
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, 'a'))
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, 'b'))
self.assertEqual('rs', t.description.replica_set_name)
@ -451,7 +451,7 @@ class TestMultiServerTopology(TopologyTest):
t = create_mock_topology(replica_set_name='rs')
# No error resetting a server not in the TopologyDescription.
t.reset_server(('b', 27017))
t.reset_server(('b', 27017), None)
# Server was *not* added as type Unknown.
self.assertFalse(t.has_server(('b', 27017)))

View File

@ -167,6 +167,14 @@ class ServerAndTopologyEventListener(monitoring.ServerListener,
def closed(self, event):
self.results.append(event)
def matching(self, matcher):
"""Return the matching events."""
results = self.results[:]
return [event for event in results if matcher(event)]
def reset(self):
self.results = []
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
"""Listens to only server heartbeat events."""