PYTHON-2534 Avoid race in test_pool_paused_error_is_retryable (#704)

This commit is contained in:
Shane Harvey 2021-08-16 10:27:37 -07:00 committed by GitHub
parent 65aa7c86d5
commit 6a18027db8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 38 deletions

View File

@ -154,7 +154,7 @@ class client_knobs(object):
self.old_min_heartbeat_interval = None
self.old_kill_cursor_frequency = None
self.old_events_queue_frequency = None
self._enabled = True
self._enabled = False
self._stack = None
def enable(self):
@ -776,8 +776,8 @@ class ClientContext(object):
"""
return self._require(
lambda: (self.test_commands_enabled and (
(not self.is_mongos and self.version >= (4, 2, 9))) or
(self.is_mongos and self.version >= (4, 4))),
(not self.is_mongos and self.version >= (4, 2, 9)) or
(self.is_mongos and self.version >= (4, 4)))),
"failCommand blockConnection is not supported",
func=func)

View File

@ -163,23 +163,33 @@ class TestPoolPausedError(IntegrationTest):
maxPoolSize=1,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['find'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
for _ in range(10):
cmap_listener.reset()
cmd_listener.reset()
threads = [FindThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['find'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# It's possible that SDAM can rediscover the server and mark the
# pool ready before the thread in the wait queue has a chance
# to run. Repeat the test until the thread actually encounters
# a PoolClearedError.
if cmap_listener.event_count(ConnectionCheckOutFailedEvent):
break
# Via CMAP monitoring, assert that the first check out succeeds.
cmap_events = cmap_listener.events_by_type((

View File

@ -515,24 +515,33 @@ class TestPoolPausedError(IntegrationTest):
maxPoolSize=1,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [InsertThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
'errorLabels': ['RetryableWriteError'],
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
for _ in range(10):
cmap_listener.reset()
cmd_listener.reset()
threads = [InsertThread(client.pymongo_test.test) for _ in range(2)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91,
'errorLabels': ['RetryableWriteError'],
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# It's possible that SDAM can rediscover the server and mark the
# pool ready before the thread in the wait queue has a chance
# to run. Repeat the test until the thread actually encounters
# a PoolClearedError.
if cmap_listener.event_count(ConnectionCheckOutFailedEvent):
break
# Via CMAP monitoring, assert that the first check out succeeds.
cmap_events = cmap_listener.events_by_type((