Add more tests

This commit is contained in:
Noah Stapp 2026-03-10 12:38:02 -04:00
parent 4926c3aae2
commit af54ac3b56
7 changed files with 2335 additions and 357 deletions

View File

@ -2946,6 +2946,7 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
session,
retryable=not cmd._performs_write,
operation=_Op.AGGREGATE,
is_aggregate_write=cmd._performs_write,
)
async def aggregate(

View File

@ -2010,6 +2010,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
retryable: bool = False,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
) -> T:
"""Internal retryable helper for all client transactions.
@ -2022,6 +2023,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
:param read_pref: Topology of read operation, defaults to None
:param retryable: If the operation should be retried once, defaults to None
:param is_run_command: If this is a runCommand operation, defaults to False
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:return: Output of the calling func()
"""
@ -2037,6 +2039,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
retryable=retryable,
operation_id=operation_id,
is_run_command=is_run_command,
is_aggregate_write=is_aggregate_write,
).run()
async def _retryable_read(
@ -2049,6 +2052,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
retryable: bool = True,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
) -> T:
"""Execute an operation with consecutive retries if possible
@ -2065,6 +2069,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
:param retryable: if we should attempt retries
(may not always be supported even if supplied), defaults to False
:param is_run_command: If this is a runCommand operation, defaults to False.
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
"""
# Ensure that the client supports retrying on reads and there is no session in
@ -2084,6 +2089,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
retryable=retryable,
operation_id=operation_id,
is_run_command=is_run_command,
is_aggregate_write=is_aggregate_write,
)
async def _retryable_write(
@ -2754,6 +2760,7 @@ class _ClientConnectionRetryable(Generic[T]):
retryable: bool = False,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
):
self._last_error: Optional[Exception] = None
self._retrying = False
@ -2777,6 +2784,7 @@ class _ClientConnectionRetryable(Generic[T]):
self._operation_id = operation_id
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
async def run(self) -> T:
"""Runs the supplied func() and attempts a retry
@ -2822,6 +2830,9 @@ class _ClientConnectionRetryable(Generic[T]):
self._client.options.retry_reads and self._client.options.retry_writes
):
raise
if self._is_aggregate_write and not self._client.options.retry_writes:
raise
# Execute specialized catch on read
if self._is_read:
if isinstance(exc, (ConnectionFailure, OperationFailure)):
@ -2870,9 +2881,9 @@ class _ClientConnectionRetryable(Generic[T]):
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
# Always retry abortTransaction and commitTransaction up to once
if not (self._client.options.retry_writes and self._retryable) and (
not always_retryable
and self._operation not in ["abortTransaction", "commitTransaction"]
if self._operation not in ["abortTransaction", "commitTransaction"] and (
not self._client.options.retry_writes
or not (self._retryable or always_retryable)
):
raise
if retryable_write_label or always_retryable:

View File

@ -2939,6 +2939,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
session,
retryable=not cmd._performs_write,
operation=_Op.AGGREGATE,
is_aggregate_write=cmd._performs_write,
)
def aggregate(

View File

@ -2006,6 +2006,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
retryable: bool = False,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
) -> T:
"""Internal retryable helper for all client transactions.
@ -2018,6 +2019,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
:param read_pref: Topology of read operation, defaults to None
:param retryable: If the operation should be retried once, defaults to None
:param is_run_command: If this is a runCommand operation, defaults to False
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:return: Output of the calling func()
"""
@ -2033,6 +2035,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
retryable=retryable,
operation_id=operation_id,
is_run_command=is_run_command,
is_aggregate_write=is_aggregate_write,
).run()
def _retryable_read(
@ -2045,6 +2048,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
retryable: bool = True,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
) -> T:
"""Execute an operation with consecutive retries if possible
@ -2061,6 +2065,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
:param retryable: if we should attempt retries
(may not always be supported even if supplied), defaults to False
:param is_run_command: If this is a runCommand operation, defaults to False.
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
"""
# Ensure that the client supports retrying on reads and there is no session in
@ -2080,6 +2085,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
retryable=retryable,
operation_id=operation_id,
is_run_command=is_run_command,
is_aggregate_write=is_aggregate_write,
)
def _retryable_write(
@ -2744,6 +2750,7 @@ class _ClientConnectionRetryable(Generic[T]):
retryable: bool = False,
operation_id: Optional[int] = None,
is_run_command: bool = False,
is_aggregate_write: bool = False,
):
self._last_error: Optional[Exception] = None
self._retrying = False
@ -2767,6 +2774,7 @@ class _ClientConnectionRetryable(Generic[T]):
self._operation_id = operation_id
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
def run(self) -> T:
"""Runs the supplied func() and attempts a retry
@ -2812,6 +2820,9 @@ class _ClientConnectionRetryable(Generic[T]):
self._client.options.retry_reads and self._client.options.retry_writes
):
raise
if self._is_aggregate_write and not self._client.options.retry_writes:
raise
# Execute specialized catch on read
if self._is_read:
if isinstance(exc, (ConnectionFailure, OperationFailure)):
@ -2860,9 +2871,9 @@ class _ClientConnectionRetryable(Generic[T]):
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
# Always retry abortTransaction and commitTransaction up to once
if not (self._client.options.retry_writes and self._retryable) and (
not always_retryable
and self._operation not in ["abortTransaction", "commitTransaction"]
if self._operation not in ["abortTransaction", "commitTransaction"] and (
not self._client.options.retry_writes
or not (self._retryable or always_retryable)
):
raise
if retryable_write_label or always_retryable:

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,9 @@
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
],
"ignoreCommandMonitoringEvents": [
"killCursors"
]
}
},
@ -89,8 +92,8 @@
}
},
{
"object": "client",
"name": "listDatabases",
"object": "client",
"arguments": {
"filter": {}
},
@ -193,8 +196,8 @@
}
},
{
"object": "client",
"name": "listDatabaseNames",
"object": "client",
"expectError": {
"isError": true,
"isClientError": false
@ -294,8 +297,8 @@
}
},
{
"object": "client",
"name": "createChangeStream",
"object": "client",
"arguments": {
"pipeline": []
},
@ -403,8 +406,8 @@
}
},
{
"object": "client",
"name": "clientBulkWrite",
"object": "client",
"arguments": {
"models": [
{
@ -517,8 +520,8 @@
}
},
{
"object": "database",
"name": "aggregate",
"object": "database",
"arguments": {
"pipeline": [
{
@ -628,8 +631,8 @@
}
},
{
"object": "database",
"name": "listCollections",
"object": "database",
"arguments": {
"filter": {}
},
@ -732,8 +735,8 @@
}
},
{
"object": "database",
"name": "listCollectionNames",
"object": "database",
"arguments": {
"filter": {}
},
@ -836,8 +839,8 @@
}
},
{
"object": "database",
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
@ -943,8 +946,8 @@
}
},
{
"object": "database",
"name": "createChangeStream",
"object": "database",
"arguments": {
"pipeline": []
},
@ -1047,8 +1050,8 @@
}
},
{
"object": "collection",
"name": "aggregate",
"object": "collection",
"arguments": {
"pipeline": []
},
@ -1151,8 +1154,8 @@
}
},
{
"object": "collection",
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {}
},
@ -1255,8 +1258,8 @@
}
},
{
"object": "collection",
"name": "estimatedDocumentCount",
"object": "collection",
"expectError": {
"isError": true,
"isClientError": false
@ -1356,8 +1359,8 @@
}
},
{
"object": "collection",
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "x",
"filter": {}
@ -1461,8 +1464,8 @@
}
},
{
"object": "collection",
"name": "find",
"object": "collection",
"arguments": {
"filter": {}
},
@ -1565,8 +1568,8 @@
}
},
{
"object": "collection",
"name": "findOne",
"object": "collection",
"arguments": {
"filter": {}
},
@ -1669,8 +1672,8 @@
}
},
{
"object": "collection",
"name": "listIndexes",
"object": "collection",
"expectError": {
"isError": true,
"isClientError": false
@ -1770,8 +1773,8 @@
}
},
{
"object": "collection",
"name": "listIndexNames",
"object": "collection",
"expectError": {
"isError": true,
"isClientError": false
@ -1871,8 +1874,8 @@
}
},
{
"object": "collection",
"name": "createChangeStream",
"object": "collection",
"arguments": {
"pipeline": []
},
@ -1975,8 +1978,8 @@
}
},
{
"object": "collection",
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 2,
@ -2082,8 +2085,8 @@
}
},
{
"object": "collection",
"name": "insertMany",
"object": "collection",
"arguments": {
"documents": [
{
@ -2191,8 +2194,8 @@
}
},
{
"object": "collection",
"name": "deleteOne",
"object": "collection",
"arguments": {
"filter": {}
},
@ -2295,8 +2298,8 @@
}
},
{
"object": "collection",
"name": "deleteMany",
"object": "collection",
"arguments": {
"filter": {}
},
@ -2399,8 +2402,8 @@
}
},
{
"object": "collection",
"name": "replaceOne",
"object": "collection",
"arguments": {
"filter": {},
"replacement": {
@ -2506,8 +2509,8 @@
}
},
{
"object": "collection",
"name": "updateOne",
"object": "collection",
"arguments": {
"filter": {},
"update": {
@ -2615,8 +2618,8 @@
}
},
{
"object": "collection",
"name": "updateMany",
"object": "collection",
"arguments": {
"filter": {},
"update": {
@ -2724,8 +2727,8 @@
}
},
{
"object": "collection",
"name": "findOneAndDelete",
"object": "collection",
"arguments": {
"filter": {}
},
@ -2828,8 +2831,8 @@
}
},
{
"object": "collection",
"name": "findOneAndReplace",
"object": "collection",
"arguments": {
"filter": {},
"replacement": {
@ -2935,8 +2938,8 @@
}
},
{
"object": "collection",
"name": "findOneAndUpdate",
"object": "collection",
"arguments": {
"filter": {},
"update": {
@ -3044,8 +3047,8 @@
}
},
{
"object": "collection",
"name": "bulkWrite",
"object": "collection",
"arguments": {
"requests": [
{
@ -3157,8 +3160,8 @@
}
},
{
"object": "collection",
"name": "createIndex",
"object": "collection",
"arguments": {
"keys": {
"x": 11
@ -3264,8 +3267,8 @@
}
},
{
"object": "collection",
"name": "dropIndex",
"object": "collection",
"arguments": {
"name": "x_11"
},
@ -3368,8 +3371,8 @@
}
},
{
"object": "collection",
"name": "dropIndexes",
"object": "collection",
"expectError": {
"isError": true,
"isClientError": false
@ -3443,6 +3446,114 @@
]
}
]
},
{
"description": "collection.aggregate retries at most maxAttempts=5 times",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "fail_point_client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": [
"aggregate"
],
"errorLabels": [
"RetryableError",
"SystemOverloadedError"
],
"errorCode": 2
}
}
}
},
{
"name": "aggregate",
"object": "collection",
"arguments": {
"pipeline": [
{
"$out": "output"
}
]
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
},
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
},
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
},
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
},
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
},
{
"commandStartedEvent": {
"commandName": "aggregate"
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
}
]
}
]
}
]
}

View File

@ -1,281 +0,0 @@
{
"description": "tests that runCommand only retries when retryReads=true and retryWrites=true",
"schemaVersion": "1.3",
"runOnRequirements": [
{
"minServerVersion": "4.4",
"topologies": [
"replicaset",
"sharded",
"load-balanced"
]
}
],
"createEntities": [
{
"client": {
"id": "client_defaults",
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database_defaults",
"client": "client_defaults",
"databaseName": "backpressure-runCommand-requirements-db"
}
},
{
"client": {
"id": "client_retryReads_false",
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
],
"uriOptions": {
"retryReads": false
}
}
},
{
"database": {
"id": "database_retryReads_false",
"client": "client_retryReads_false",
"databaseName": "backpressure-runCommand-requirements-db"
}
},
{
"collection": {
"id": "retryable-reads-tests",
"database": "database_retryReads_false",
"collectionName": "coll_retryReads_false"
}
},
{
"client": {
"id": "client_retryWrites_false",
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
],
"uriOptions": {
"retryWrites": false
}
}
},
{
"database": {
"id": "database_retryWrites_false",
"client": "client_retryWrites_false",
"databaseName": "backpressure-runCommand-requirements-db"
}
},
{
"collection": {
"id": "retryable-writes-tests",
"database": "database_retryWrites_false",
"collectionName": "coll_retryWrites_false"
}
},
{
"client": {
"id": "internal_client",
"useMultipleMongoses": false
}
}
],
"tests": [
{
"description": "database.runCommand retries with defaults (retryReads=true and retryWrites=true)",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "internal_client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"ping"
],
"errorLabels": [
"RetryableError",
"SystemOverloadedError"
],
"errorCode": 2
}
}
}
},
{
"name": "runCommand",
"object": "database_defaults",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
}
],
"expectEvents": [
{
"client": "client_defaults",
"events": [
{
"commandStartedEvent": {
"commandName": "ping"
}
},
{
"commandFailedEvent": {
"commandName": "ping"
}
},
{
"commandStartedEvent": {
"commandName": "ping"
}
},
{
"commandSucceededEvent": {
"commandName": "ping"
}
}
]
}
]
},
{
"description": "database.runCommand does not retry when retryReads=false",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "internal_client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"ping"
],
"errorLabels": [
"RetryableError",
"SystemOverloadedError"
],
"errorCode": 2
}
}
}
},
{
"name": "runCommand",
"object": "database_retryReads_false",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client_retryReads_false",
"events": [
{
"commandStartedEvent": {
"commandName": "ping"
}
},
{
"commandFailedEvent": {
"commandName": "ping"
}
}
]
}
]
},
{
"description": "database.runCommand does not retry when retryWrites=false",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "internal_client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"ping"
],
"errorLabels": [
"RetryableError",
"SystemOverloadedError"
],
"errorCode": 2
}
}
}
},
{
"name": "runCommand",
"object": "database_retryWrites_false",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client_retryWrites_false",
"events": [
{
"commandStartedEvent": {
"commandName": "ping"
}
},
{
"commandFailedEvent": {
"commandName": "ping"
}
}
]
}
]
}
]
}