PYTHON-2602 Test that pool paused errors are retryable (#681)

Allow client_knobs to be used as a decorator.
This commit is contained in:
Shane Harvey 2021-07-30 17:56:01 -07:00 committed by GitHub
parent 97a84e199e
commit f541e7731c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 195 additions and 66 deletions

View File

@ -125,7 +125,7 @@ class Topology(object):
executor = periodic_executor.PeriodicExecutor(
interval=common.EVENTS_QUEUE_FREQUENCY,
min_interval=0.5,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
name="pymongo_events_thread")

View File

@ -191,6 +191,16 @@ class client_knobs(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.disable()
def __call__(self, func):
def make_wrapper(f):
@wraps(f)
def wrap(*args, **kwargs):
with self:
return f(*args, **kwargs)
return wrap
return make_wrapper(func)
def __del__(self):
if self._enabled:
msg = (
@ -761,6 +771,16 @@ class ClientContext(object):
"failCommand appName must be supported",
func=func)
def require_failCommand_blockConnection(self, func):
"""Run a test only if the server supports failCommand blockConnection.
"""
return self._require(
lambda: (self.test_commands_enabled and (
(not self.is_mongos and self.version >= (4, 2, 9))) or
(self.is_mongos and self.version >= (4, 4))),
"failCommand blockConnection is not supported",
func=func)
def require_tls(self, func):
"""Run a test only if the client can connect over TLS."""
return self._require(lambda: self.tls,
@ -847,7 +867,6 @@ class ClientContext(object):
return (self.version.at_least(4, 0) and
self.test_commands_enabled)
@property
def requires_hint_with_min_max_queries(self):
"""Does the server require a hint with min/max queries."""
@ -930,12 +949,6 @@ class IntegrationTest(PyMongoTestCase):
self.addCleanup(patcher.disable)
# Use assertRaisesRegex if available, otherwise use Python 2.7's
# deprecated assertRaisesRegexp, with a 'p'.
if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
class MockClientTest(unittest.TestCase):
"""Base class for TestCases that use MockClient.

View File

@ -453,59 +453,6 @@ class TestCMAP(IntegrationTest):
with pool.get_socket({}):
pass
@client_context.require_version_max(4, 3) # Remove after SERVER-53624.
@client_context.require_retryable_writes
@client_context.require_failCommand_fail_point
def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = rs_or_single_client(
maxPoolSize=1,
heartbeatFrequencyMS=500,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [InsertThread(client.pymongo_test.test) for _ in range(3)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# The two threads in the wait queue fail the initial connection check
# out attempt and then succeed on retry.
self.assertEqual(
2, cmap_listener.event_count(ConnectionCheckOutFailedEvent))
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
self.assertEqual(4, len(cmd_listener.results['started']))
self.assertEqual(3, len(cmd_listener.results['succeeded']))
self.assertEqual(1, len(cmd_listener.results['failed']))
class InsertThread(threading.Thread):
def __init__(self, collection):
super(InsertThread, self).__init__()
self.daemon = True
self.collection = collection
self.passed = False
def run(self):
self.collection.insert_one({})
self.passed = True
def create_test(scenario_def, test, name):
def run_scenario(self):

View File

@ -15,15 +15,28 @@
"""Test retryable reads spec."""
import os
import pprint
import sys
import threading
sys.path[0:0] = [""]
from pymongo.mongo_client import MongoClient
from pymongo.monitoring import (ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
PoolClearedEvent)
from pymongo.write_concern import WriteConcern
from test import unittest, client_context, PyMongoTestCase
from test.utils import TestCreator
from test import (client_context,
client_knobs,
IntegrationTest,
PyMongoTestCase,
unittest)
from test.utils import (CMAPListener,
OvertCommandListener,
rs_or_single_client,
TestCreator)
from test.utils_spec_runner import SpecRunner
@ -123,5 +136,77 @@ def create_test(scenario_def, test, name):
test_creator = TestCreator(create_test, TestSpec, _TEST_PATH)
test_creator.create_tests()
class FindThread(threading.Thread):
def __init__(self, collection):
super().__init__()
self.daemon = True
self.collection = collection
self.passed = False
def run(self):
self.collection.find_one({})
self.passed = True
class TestPoolPausedError(IntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@client_context.require_failCommand_blockConnection
@client_knobs(heartbeat_frequency=.05, min_heartbeat_interval=.05)
def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = rs_or_single_client(
maxPoolSize=1,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['find'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# Via CMAP monitoring, assert that the first check out succeeds.
cmap_events = cmap_listener.events_by_type((
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
PoolClearedEvent))
msg = pprint.pformat(cmap_listener.events)
self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg)
self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg)
self.assertIsInstance(
cmap_events[2], ConnectionCheckOutFailedEvent, msg)
self.assertEqual(cmap_events[2].reason,
ConnectionCheckOutFailedReason.CONN_ERROR,
msg)
self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg)
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
started = cmd_listener.results['started']
msg = pprint.pformat(cmd_listener.results)
self.assertEqual(3, len(started), msg)
succeeded = cmd_listener.results['succeeded']
self.assertEqual(2, len(succeeded), msg)
failed = cmd_listener.results['failed']
self.assertEqual(1, len(failed), msg)
if __name__ == "__main__":
unittest.main()

View File

@ -16,13 +16,14 @@
import copy
import os
import pprint
import sys
import threading
sys.path[0:0] = [""]
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.int64 import Int64
from bson.objectid import ObjectId
from bson.raw_bson import RawBSONDocument
from bson.son import SON
@ -32,6 +33,10 @@ from pymongo.errors import (ConnectionFailure,
ServerSelectionTimeoutError,
WriteConcernError)
from pymongo.mongo_client import MongoClient
from pymongo.monitoring import (ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
PoolClearedEvent)
from pymongo.operations import (InsertOne,
DeleteMany,
DeleteOne,
@ -40,10 +45,15 @@ from pymongo.operations import (InsertOne,
UpdateOne)
from pymongo.write_concern import WriteConcern
from test import unittest, client_context, IntegrationTest, SkipTest, client_knobs
from test.utils import (rs_or_single_client,
from test import (client_context,
client_knobs,
IntegrationTest,
SkipTest,
unittest)
from test.utils import (CMAPListener,
DeprecationFilter,
OvertCommandListener,
rs_or_single_client,
TestCreator)
from test.utils_spec_runner import SpecRunner
from test.version import Version
@ -153,6 +163,7 @@ class TestRetryableWritesMMAPv1(IgnoreDeprecationsTest):
def tearDownClass(cls):
cls.knobs.disable()
cls.client.close()
super(TestRetryableWritesMMAPv1, cls).tearDownClass()
@client_context.require_version_min(3, 5)
@client_context.require_no_standalone
@ -477,6 +488,79 @@ class TestWriteConcernError(IntegrationTest):
self.assertIn('RetryableWriteError', result['errorLabels'])
class InsertThread(threading.Thread):
def __init__(self, collection):
super().__init__()
self.daemon = True
self.collection = collection
self.passed = False
def run(self):
self.collection.insert_one({})
self.passed = True
class TestPoolPausedError(IntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@client_context.require_failCommand_blockConnection
@client_context.require_retryable_writes
@client_knobs(heartbeat_frequency=.05, min_heartbeat_interval=.05)
def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = rs_or_single_client(
maxPoolSize=1,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [InsertThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
'errorLabels': ['RetryableWriteError'],
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# Via CMAP monitoring, assert that the first check out succeeds.
cmap_events = cmap_listener.events_by_type((
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
PoolClearedEvent))
msg = pprint.pformat(cmap_listener.events)
self.assertIsInstance(cmap_events[0], ConnectionCheckedOutEvent, msg)
self.assertIsInstance(cmap_events[1], PoolClearedEvent, msg)
self.assertIsInstance(
cmap_events[2], ConnectionCheckOutFailedEvent, msg)
self.assertEqual(cmap_events[2].reason,
ConnectionCheckOutFailedReason.CONN_ERROR,
msg)
self.assertIsInstance(cmap_events[3], ConnectionCheckedOutEvent, msg)
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
started = cmd_listener.results['started']
msg = pprint.pformat(cmd_listener.results)
self.assertEqual(3, len(started), msg)
succeeded = cmd_listener.results['succeeded']
self.assertEqual(2, len(succeeded), msg)
failed = cmd_listener.results['failed']
self.assertEqual(1, len(failed), msg)
# TODO: Make this a real integration test where we stepdown the primary.
class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest):
@client_context.require_version_min(3, 6)