diff --git a/pymongo/topology.py b/pymongo/topology.py index e08c12ab0..340e504d1 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -125,7 +125,7 @@ class Topology(object): executor = periodic_executor.PeriodicExecutor( interval=common.EVENTS_QUEUE_FREQUENCY, - min_interval=0.5, + min_interval=common.MIN_HEARTBEAT_INTERVAL, target=target, name="pymongo_events_thread") diff --git a/test/__init__.py b/test/__init__.py index 54761fb55..8099cc51f 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -191,6 +191,16 @@ class client_knobs(object): def __exit__(self, exc_type, exc_val, exc_tb): self.disable() + def __call__(self, func): + def make_wrapper(f): + @wraps(f) + def wrap(*args, **kwargs): + with self: + return f(*args, **kwargs) + return wrap + + return make_wrapper(func) + def __del__(self): if self._enabled: msg = ( @@ -761,6 +771,16 @@ class ClientContext(object): "failCommand appName must be supported", func=func) + def require_failCommand_blockConnection(self, func): + """Run a test only if the server supports failCommand blockConnection. + """ + return self._require( + lambda: (self.test_commands_enabled and ( + (not self.is_mongos and self.version >= (4, 2, 9))) or + (self.is_mongos and self.version >= (4, 4))), + "failCommand blockConnection is not supported", + func=func) + def require_tls(self, func): """Run a test only if the client can connect over TLS.""" return self._require(lambda: self.tls, @@ -847,7 +867,6 @@ class ClientContext(object): return (self.version.at_least(4, 0) and self.test_commands_enabled) - @property def requires_hint_with_min_max_queries(self): """Does the server require a hint with min/max queries.""" @@ -930,12 +949,6 @@ class IntegrationTest(PyMongoTestCase): self.addCleanup(patcher.disable) -# Use assertRaisesRegex if available, otherwise use Python 2.7's -# deprecated assertRaisesRegexp, with a 'p'. -if not hasattr(unittest.TestCase, 'assertRaisesRegex'): - unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp - - class MockClientTest(unittest.TestCase): """Base class for TestCases that use MockClient. diff --git a/test/test_cmap.py b/test/test_cmap.py index d70691dcf..3a7b70852 100644 --- a/test/test_cmap.py +++ b/test/test_cmap.py @@ -453,59 +453,6 @@ class TestCMAP(IntegrationTest): with pool.get_socket({}): pass - @client_context.require_version_max(4, 3) # Remove after SERVER-53624. - @client_context.require_retryable_writes - @client_context.require_failCommand_fail_point - def test_pool_paused_error_is_retryable(self): - cmap_listener = CMAPListener() - cmd_listener = OvertCommandListener() - client = rs_or_single_client( - maxPoolSize=1, - heartbeatFrequencyMS=500, - event_listeners=[cmap_listener, cmd_listener]) - self.addCleanup(client.close) - threads = [InsertThread(client.pymongo_test.test) for _ in range(3)] - fail_command = { - 'mode': {'times': 1}, - 'data': { - 'failCommands': ['insert'], - 'blockConnection': True, - 'blockTimeMS': 1000, - 'errorCode': 91 - }, - } - with self.fail_point(fail_command): - for thread in threads: - thread.start() - for thread in threads: - thread.join() - for thread in threads: - self.assertTrue(thread.passed) - - # The two threads in the wait queue fail the initial connection check - # out attempt and then succeed on retry. - self.assertEqual( - 2, cmap_listener.event_count(ConnectionCheckOutFailedEvent)) - - # Connection check out failures are not reflected in command - # monitoring because we only publish command events _after_ checking - # out a connection. - self.assertEqual(4, len(cmd_listener.results['started'])) - self.assertEqual(3, len(cmd_listener.results['succeeded'])) - self.assertEqual(1, len(cmd_listener.results['failed'])) - - -class InsertThread(threading.Thread): - def __init__(self, collection): - super(InsertThread, self).__init__() - self.daemon = True - self.collection = collection - self.passed = False - - def run(self): - self.collection.insert_one({}) - self.passed = True - def create_test(scenario_def, test, name): def run_scenario(self): diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 1995b5dc3..963ff3e18 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -15,15 +15,28 @@ """Test retryable reads spec.""" import os +import pprint import sys +import threading sys.path[0:0] = [""] from pymongo.mongo_client import MongoClient +from pymongo.monitoring import (ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + ConnectionCheckOutFailedReason, + PoolClearedEvent) from pymongo.write_concern import WriteConcern -from test import unittest, client_context, PyMongoTestCase -from test.utils import TestCreator +from test import (client_context, + client_knobs, + IntegrationTest, + PyMongoTestCase, + unittest) +from test.utils import (CMAPListener, + OvertCommandListener, + rs_or_single_client, + TestCreator) from test.utils_spec_runner import SpecRunner @@ -123,5 +136,77 @@ def create_test(scenario_def, test, name): test_creator = TestCreator(create_test, TestSpec, _TEST_PATH) test_creator.create_tests() + +class FindThread(threading.Thread): + def __init__(self, collection): + super().__init__() + self.daemon = True + self.collection = collection + self.passed = False + + def run(self): + self.collection.find_one({}) + self.passed = True + + +class TestPoolPausedError(IntegrationTest): + # Pools don't get paused in load balanced mode. + RUN_ON_LOAD_BALANCER = False + RUN_ON_SERVERLESS = False + + @client_context.require_failCommand_blockConnection + @client_knobs(heartbeat_frequency=.05, min_heartbeat_interval=.05) + def test_pool_paused_error_is_retryable(self): + cmap_listener = CMAPListener() + cmd_listener = OvertCommandListener() + client = rs_or_single_client( + maxPoolSize=1, + event_listeners=[cmap_listener, cmd_listener]) + self.addCleanup(client.close) + threads = [FindThread(client.pymongo_test.test) for _ in range(2)] + fail_command = { + 'mode': {'times': 1}, + 'data': { + 'failCommands': ['find'], + 'blockConnection': True, + 'blockTimeMS': 1000, + 'errorCode': 91, + }, + } + with self.fail_point(fail_command): + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + # Via CMAP monitoring, assert that the first check out succeeds. + cmap_events = cmap_listener.events_by_type(( + ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + PoolClearedEvent)) + msg = pprint.pformat(cmap_listener.events) + self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg) + self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg) + self.assertIsInstance( + cmap_events[2], ConnectionCheckOutFailedEvent, msg) + self.assertEqual(cmap_events[2].reason, + ConnectionCheckOutFailedReason.CONN_ERROR, + msg) + self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg) + + # Connection check out failures are not reflected in command + # monitoring because we only publish command events _after_ checking + # out a connection. + started = cmd_listener.results['started'] + msg = pprint.pformat(cmd_listener.results) + self.assertEqual(3, len(started), msg) + succeeded = cmd_listener.results['succeeded'] + self.assertEqual(2, len(succeeded), msg) + failed = cmd_listener.results['failed'] + self.assertEqual(1, len(failed), msg) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 0368f97a6..464ff39ac 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -16,13 +16,14 @@ import copy import os +import pprint import sys +import threading sys.path[0:0] = [""] from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.int64 import Int64 -from bson.objectid import ObjectId from bson.raw_bson import RawBSONDocument from bson.son import SON @@ -32,6 +33,10 @@ from pymongo.errors import (ConnectionFailure, ServerSelectionTimeoutError, WriteConcernError) from pymongo.mongo_client import MongoClient +from pymongo.monitoring import (ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + ConnectionCheckOutFailedReason, + PoolClearedEvent) from pymongo.operations import (InsertOne, DeleteMany, DeleteOne, @@ -40,10 +45,15 @@ from pymongo.operations import (InsertOne, UpdateOne) from pymongo.write_concern import WriteConcern -from test import unittest, client_context, IntegrationTest, SkipTest, client_knobs -from test.utils import (rs_or_single_client, +from test import (client_context, + client_knobs, + IntegrationTest, + SkipTest, + unittest) +from test.utils import (CMAPListener, DeprecationFilter, OvertCommandListener, + rs_or_single_client, TestCreator) from test.utils_spec_runner import SpecRunner from test.version import Version @@ -153,6 +163,7 @@ class TestRetryableWritesMMAPv1(IgnoreDeprecationsTest): def tearDownClass(cls): cls.knobs.disable() cls.client.close() + super(TestRetryableWritesMMAPv1, cls).tearDownClass() @client_context.require_version_min(3, 5) @client_context.require_no_standalone @@ -477,6 +488,79 @@ class TestWriteConcernError(IntegrationTest): self.assertIn('RetryableWriteError', result['errorLabels']) +class InsertThread(threading.Thread): + def __init__(self, collection): + super().__init__() + self.daemon = True + self.collection = collection + self.passed = False + + def run(self): + self.collection.insert_one({}) + self.passed = True + + +class TestPoolPausedError(IntegrationTest): + # Pools don't get paused in load balanced mode. + RUN_ON_LOAD_BALANCER = False + RUN_ON_SERVERLESS = False + + @client_context.require_failCommand_blockConnection + @client_context.require_retryable_writes + @client_knobs(heartbeat_frequency=.05, min_heartbeat_interval=.05) + def test_pool_paused_error_is_retryable(self): + cmap_listener = CMAPListener() + cmd_listener = OvertCommandListener() + client = rs_or_single_client( + maxPoolSize=1, + event_listeners=[cmap_listener, cmd_listener]) + self.addCleanup(client.close) + threads = [InsertThread(client.pymongo_test.test) for _ in range(2)] + fail_command = { + 'mode': {'times': 1}, + 'data': { + 'failCommands': ['insert'], + 'blockConnection': True, + 'blockTimeMS': 1000, + 'errorCode': 91, + 'errorLabels': ['RetryableWriteError'], + }, + } + with self.fail_point(fail_command): + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + # Via CMAP monitoring, assert that the first check out succeeds. + cmap_events = cmap_listener.events_by_type(( + ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + PoolClearedEvent)) + msg = pprint.pformat(cmap_listener.events) + self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg) + self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg) + self.assertIsInstance( + cmap_events[2], ConnectionCheckOutFailedEvent, msg) + self.assertEqual(cmap_events[2].reason, + ConnectionCheckOutFailedReason.CONN_ERROR, + msg) + self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg) + + # Connection check out failures are not reflected in command + # monitoring because we only publish command events _after_ checking + # out a connection. + started = cmd_listener.results['started'] + msg = pprint.pformat(cmd_listener.results) + self.assertEqual(3, len(started), msg) + succeeded = cmd_listener.results['succeeded'] + self.assertEqual(2, len(succeeded), msg) + failed = cmd_listener.results['failed'] + self.assertEqual(1, len(failed), msg) + + # TODO: Make this a real integration test where we stepdown the primary. class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest): @client_context.require_version_min(3, 6)