PYTHON-5208 Add spec test for wait queue timeout errors do not clear the pool (#2199)

Also stop running the ping command to advance session cluster times in the unified tests.
This commit is contained in:
Shane Harvey 2025-04-03 12:05:45 -07:00 committed by GitHub
parent 5177e4ec53
commit 8b668898b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 190 additions and 18 deletions

View File

@ -222,7 +222,6 @@ class EntityMapUtil:
self._listeners: Dict[str, EventListenerUtil] = {}
self._session_lsids: Dict[str, Mapping[str, Any]] = {}
self.test: UnifiedSpecTestMixinV1 = test_class
self._cluster_time: Mapping[str, Any] = {}
def __contains__(self, item):
return item in self._entities
@ -421,13 +420,11 @@ class EntityMapUtil:
# session has been closed.
return self._session_lsids[session_name]
async def advance_cluster_times(self) -> None:
async def advance_cluster_times(self, cluster_time) -> None:
"""Manually synchronize entities when desired"""
if not self._cluster_time:
self._cluster_time = (await self.test.client.admin.command("ping")).get("$clusterTime")
for entity in self._entities.values():
if isinstance(entity, AsyncClientSession) and self._cluster_time:
entity.advance_cluster_time(self._cluster_time)
if isinstance(entity, AsyncClientSession) and cluster_time:
entity.advance_cluster_time(cluster_time)
class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
@ -1044,7 +1041,7 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
async def _testOperation_createEntities(self, spec):
await self.entity_map.create_entities_from_spec(spec["entities"], uri=self._uri)
await self.entity_map.advance_cluster_times()
await self.entity_map.advance_cluster_times(self._cluster_time)
def _testOperation_assertSessionTransactionState(self, spec):
session = self.entity_map[spec["session"]]
@ -1443,11 +1440,12 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
await self.entity_map.create_entities_from_spec(
self.TEST_SPEC.get("createEntities", []), uri=uri
)
self._cluster_time = None
# process initialData
if "initialData" in self.TEST_SPEC:
await self.insert_initial_data(self.TEST_SPEC["initialData"])
self._cluster_time = (await self.client.admin.command("ping")).get("$clusterTime")
await self.entity_map.advance_cluster_times()
self._cluster_time = self.client._topology.max_cluster_time()
await self.entity_map.advance_cluster_times(self._cluster_time)
if "expectLogMessages" in spec:
expect_log_messages = spec["expectLogMessages"]

View File

@ -0,0 +1,176 @@
{
"description": "WaitQueueTimeoutError does not clear the pool",
"schemaVersion": "1.9",
"runOnRequirements": [
{
"minServerVersion": "4.4",
"topologies": [
"single",
"replicaset",
"sharded"
]
}
],
"createEntities": [
{
"client": {
"id": "failPointClient",
"useMultipleMongoses": false
}
},
{
"client": {
"id": "client",
"uriOptions": {
"maxPoolSize": 1,
"appname": "waitQueueTimeoutErrorTest"
},
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent",
"poolClearedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "test"
}
}
],
"tests": [
{
"description": "WaitQueueTimeoutError does not clear the pool",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"ping"
],
"blockConnection": true,
"blockTimeMS": 500,
"appName": "waitQueueTimeoutErrorTest"
}
}
}
},
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"thread": {
"id": "thread0"
}
}
]
}
},
{
"name": "runOnThread",
"object": "testRunner",
"arguments": {
"thread": "thread0",
"operation": {
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
}
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"commandStartedEvent": {
"commandName": "ping"
}
},
"count": 1
}
},
{
"name": "runCommand",
"object": "database",
"arguments": {
"timeoutMS": 100,
"command": {
"hello": 1
},
"commandName": "hello"
},
"expectError": {
"isTimeoutError": true
}
},
{
"name": "waitForThread",
"object": "testRunner",
"arguments": {
"thread": "thread0"
}
},
{
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"hello": 1
},
"commandName": "hello"
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"commandName": "ping",
"databaseName": "test",
"command": {
"ping": 1
}
}
},
{
"commandStartedEvent": {
"commandName": "hello",
"databaseName": "test",
"command": {
"hello": 1
}
}
}
]
},
{
"client": "client",
"eventType": "cmap",
"events": []
}
]
}
]
}

View File

@ -221,7 +221,6 @@ class EntityMapUtil:
self._listeners: Dict[str, EventListenerUtil] = {}
self._session_lsids: Dict[str, Mapping[str, Any]] = {}
self.test: UnifiedSpecTestMixinV1 = test_class
self._cluster_time: Mapping[str, Any] = {}
def __contains__(self, item):
return item in self._entities
@ -420,13 +419,11 @@ class EntityMapUtil:
# session has been closed.
return self._session_lsids[session_name]
def advance_cluster_times(self) -> None:
def advance_cluster_times(self, cluster_time) -> None:
"""Manually synchronize entities when desired"""
if not self._cluster_time:
self._cluster_time = (self.test.client.admin.command("ping")).get("$clusterTime")
for entity in self._entities.values():
if isinstance(entity, ClientSession) and self._cluster_time:
entity.advance_cluster_time(self._cluster_time)
if isinstance(entity, ClientSession) and cluster_time:
entity.advance_cluster_time(cluster_time)
class UnifiedSpecTestMixinV1(IntegrationTest):
@ -1035,7 +1032,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
def _testOperation_createEntities(self, spec):
self.entity_map.create_entities_from_spec(spec["entities"], uri=self._uri)
self.entity_map.advance_cluster_times()
self.entity_map.advance_cluster_times(self._cluster_time)
def _testOperation_assertSessionTransactionState(self, spec):
session = self.entity_map[spec["session"]]
@ -1428,11 +1425,12 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self._uri = uri
self.entity_map = EntityMapUtil(self)
self.entity_map.create_entities_from_spec(self.TEST_SPEC.get("createEntities", []), uri=uri)
self._cluster_time = None
# process initialData
if "initialData" in self.TEST_SPEC:
self.insert_initial_data(self.TEST_SPEC["initialData"])
self._cluster_time = (self.client.admin.command("ping")).get("$clusterTime")
self.entity_map.advance_cluster_times()
self._cluster_time = self.client._topology.max_cluster_time()
self.entity_map.advance_cluster_times(self._cluster_time)
if "expectLogMessages" in spec:
expect_log_messages = spec["expectLogMessages"]