PYTHON-5751 - Only retry overload errors if retries are enabled
This commit is contained in:
parent
e7a5247bed
commit
95d8f5fee2
@ -952,7 +952,7 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]):
|
||||
)
|
||||
|
||||
return await self._client._retryable_read(
|
||||
inner, read_preference, session, command_name, None, False
|
||||
inner, read_preference, session, command_name, None, False, is_run_command=True
|
||||
)
|
||||
|
||||
@_csot.apply
|
||||
|
||||
@ -2009,6 +2009,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
read_pref: Optional[_ServerMode] = None,
|
||||
retryable: bool = False,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
) -> T:
|
||||
"""Internal retryable helper for all client transactions.
|
||||
|
||||
@ -2020,6 +2021,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
:param address: Server Address, defaults to None
|
||||
:param read_pref: Topology of read operation, defaults to None
|
||||
:param retryable: If the operation should be retried once, defaults to None
|
||||
:param is_run_command: If this is a runCommand operation, defaults to False
|
||||
|
||||
:return: Output of the calling func()
|
||||
"""
|
||||
@ -2034,6 +2036,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
address=address,
|
||||
retryable=retryable,
|
||||
operation_id=operation_id,
|
||||
is_run_command=is_run_command,
|
||||
).run()
|
||||
|
||||
async def _retryable_read(
|
||||
@ -2045,6 +2048,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
address: Optional[_Address] = None,
|
||||
retryable: bool = True,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
) -> T:
|
||||
"""Execute an operation with consecutive retries if possible
|
||||
|
||||
@ -2060,6 +2064,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
:param address: Optional address when sending a message, defaults to None
|
||||
:param retryable: if we should attempt retries
|
||||
(may not always be supported even if supplied), defaults to False
|
||||
:param is_run_command: If this is a runCommand operation, defaults to False.
|
||||
"""
|
||||
|
||||
# Ensure that the client supports retrying on reads and there is no session in
|
||||
@ -2078,6 +2083,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
read_pref=read_pref,
|
||||
retryable=retryable,
|
||||
operation_id=operation_id,
|
||||
is_run_command=is_run_command,
|
||||
)
|
||||
|
||||
async def _retryable_write(
|
||||
@ -2747,6 +2753,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
address: Optional[_Address] = None,
|
||||
retryable: bool = False,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
):
|
||||
self._last_error: Optional[Exception] = None
|
||||
self._retrying = False
|
||||
@ -2769,6 +2776,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
self._operation = operation
|
||||
self._operation_id = operation_id
|
||||
self._attempt_number = 0
|
||||
self._is_run_command = is_run_command
|
||||
|
||||
async def run(self) -> T:
|
||||
"""Runs the supplied func() and attempts a retry
|
||||
@ -2809,6 +2817,11 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
always_retryable = False
|
||||
overloaded = False
|
||||
exc_to_check = exc
|
||||
|
||||
if self._is_run_command and not (
|
||||
self._client.options.retry_reads and self._client.options.retry_writes
|
||||
):
|
||||
raise
|
||||
# Execute specialized catch on read
|
||||
if self._is_read:
|
||||
if isinstance(exc, (ConnectionFailure, OperationFailure)):
|
||||
@ -2816,11 +2829,15 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
exc_code = getattr(exc, "code", None)
|
||||
overloaded = exc.has_error_label("SystemOverloadedError")
|
||||
always_retryable = exc.has_error_label("RetryableError") and overloaded
|
||||
if not always_retryable and (
|
||||
self._is_not_eligible_for_retry()
|
||||
or (
|
||||
isinstance(exc, OperationFailure)
|
||||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
|
||||
if (
|
||||
not self._client.options.retry_reads
|
||||
or not always_retryable
|
||||
and (
|
||||
self._is_not_eligible_for_retry()
|
||||
or (
|
||||
isinstance(exc, OperationFailure)
|
||||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
|
||||
)
|
||||
)
|
||||
):
|
||||
raise
|
||||
@ -2851,7 +2868,11 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
|
||||
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
|
||||
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
|
||||
if not self._retryable and not always_retryable:
|
||||
if (
|
||||
not self._client.options.retry_writes
|
||||
or not self._retryable
|
||||
and not always_retryable
|
||||
):
|
||||
raise
|
||||
if retryable_write_label or always_retryable:
|
||||
assert self._session
|
||||
|
||||
@ -952,7 +952,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
|
||||
)
|
||||
|
||||
return self._client._retryable_read(
|
||||
inner, read_preference, session, command_name, None, False
|
||||
inner, read_preference, session, command_name, None, False, is_run_command=True
|
||||
)
|
||||
|
||||
@_csot.apply
|
||||
|
||||
@ -2005,6 +2005,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
read_pref: Optional[_ServerMode] = None,
|
||||
retryable: bool = False,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
) -> T:
|
||||
"""Internal retryable helper for all client transactions.
|
||||
|
||||
@ -2016,6 +2017,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
:param address: Server Address, defaults to None
|
||||
:param read_pref: Topology of read operation, defaults to None
|
||||
:param retryable: If the operation should be retried once, defaults to None
|
||||
:param is_run_command: If this is a runCommand operation, defaults to False
|
||||
|
||||
:return: Output of the calling func()
|
||||
"""
|
||||
@ -2030,6 +2032,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
address=address,
|
||||
retryable=retryable,
|
||||
operation_id=operation_id,
|
||||
is_run_command=is_run_command,
|
||||
).run()
|
||||
|
||||
def _retryable_read(
|
||||
@ -2041,6 +2044,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
address: Optional[_Address] = None,
|
||||
retryable: bool = True,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
) -> T:
|
||||
"""Execute an operation with consecutive retries if possible
|
||||
|
||||
@ -2056,6 +2060,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
:param address: Optional address when sending a message, defaults to None
|
||||
:param retryable: if we should attempt retries
|
||||
(may not always be supported even if supplied), defaults to False
|
||||
:param is_run_command: If this is a runCommand operation, defaults to False.
|
||||
"""
|
||||
|
||||
# Ensure that the client supports retrying on reads and there is no session in
|
||||
@ -2074,6 +2079,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
|
||||
read_pref=read_pref,
|
||||
retryable=retryable,
|
||||
operation_id=operation_id,
|
||||
is_run_command=is_run_command,
|
||||
)
|
||||
|
||||
def _retryable_write(
|
||||
@ -2737,6 +2743,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
address: Optional[_Address] = None,
|
||||
retryable: bool = False,
|
||||
operation_id: Optional[int] = None,
|
||||
is_run_command: bool = False,
|
||||
):
|
||||
self._last_error: Optional[Exception] = None
|
||||
self._retrying = False
|
||||
@ -2759,6 +2766,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
self._operation = operation
|
||||
self._operation_id = operation_id
|
||||
self._attempt_number = 0
|
||||
self._is_run_command = is_run_command
|
||||
|
||||
def run(self) -> T:
|
||||
"""Runs the supplied func() and attempts a retry
|
||||
@ -2799,6 +2807,11 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
always_retryable = False
|
||||
overloaded = False
|
||||
exc_to_check = exc
|
||||
|
||||
if self._is_run_command and not (
|
||||
self._client.options.retry_reads and self._client.options.retry_writes
|
||||
):
|
||||
raise
|
||||
# Execute specialized catch on read
|
||||
if self._is_read:
|
||||
if isinstance(exc, (ConnectionFailure, OperationFailure)):
|
||||
@ -2806,11 +2819,15 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
exc_code = getattr(exc, "code", None)
|
||||
overloaded = exc.has_error_label("SystemOverloadedError")
|
||||
always_retryable = exc.has_error_label("RetryableError") and overloaded
|
||||
if not always_retryable and (
|
||||
self._is_not_eligible_for_retry()
|
||||
or (
|
||||
isinstance(exc, OperationFailure)
|
||||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
|
||||
if (
|
||||
not self._client.options.retry_reads
|
||||
or not always_retryable
|
||||
and (
|
||||
self._is_not_eligible_for_retry()
|
||||
or (
|
||||
isinstance(exc, OperationFailure)
|
||||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
|
||||
)
|
||||
)
|
||||
):
|
||||
raise
|
||||
@ -2841,7 +2858,11 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
|
||||
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
|
||||
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
|
||||
if not self._retryable and not always_retryable:
|
||||
if (
|
||||
not self._client.options.retry_writes
|
||||
or not self._retryable
|
||||
and not always_retryable
|
||||
):
|
||||
raise
|
||||
if retryable_write_label or always_retryable:
|
||||
assert self._session
|
||||
|
||||
@ -227,7 +227,7 @@ class AsyncTestClientBackpressure(AsyncIntegrationTest):
|
||||
self.listener.reset()
|
||||
self.app_name = self.__class__.__name__.lower()
|
||||
self.client = await self.async_rs_or_single_client(
|
||||
event_listeners=[self.listener], retryWrites=False, appName=self.app_name
|
||||
event_listeners=[self.listener], appName=self.app_name
|
||||
)
|
||||
|
||||
@patch("random.random")
|
||||
|
||||
@ -0,0 +1,281 @@
|
||||
{
|
||||
"description": "tests that runCommand only retries when retryReads=true and retryWrites=true",
|
||||
"schemaVersion": "1.3",
|
||||
"runOnRequirements": [
|
||||
{
|
||||
"minServerVersion": "4.4",
|
||||
"topologies": [
|
||||
"replicaset",
|
||||
"sharded",
|
||||
"load-balanced"
|
||||
]
|
||||
}
|
||||
],
|
||||
"createEntities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "client_defaults",
|
||||
"useMultipleMongoses": false,
|
||||
"observeEvents": [
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"commandFailedEvent"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database_defaults",
|
||||
"client": "client_defaults",
|
||||
"databaseName": "backpressure-runCommand-requirements-db"
|
||||
}
|
||||
},
|
||||
{
|
||||
"client": {
|
||||
"id": "client_retryReads_false",
|
||||
"useMultipleMongoses": false,
|
||||
"observeEvents": [
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"commandFailedEvent"
|
||||
],
|
||||
"uriOptions": {
|
||||
"retryReads": false
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database_retryReads_false",
|
||||
"client": "client_retryReads_false",
|
||||
"databaseName": "backpressure-runCommand-requirements-db"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "retryable-reads-tests",
|
||||
"database": "database_retryReads_false",
|
||||
"collectionName": "coll_retryReads_false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"client": {
|
||||
"id": "client_retryWrites_false",
|
||||
"useMultipleMongoses": false,
|
||||
"observeEvents": [
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"commandFailedEvent"
|
||||
],
|
||||
"uriOptions": {
|
||||
"retryWrites": false
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database_retryWrites_false",
|
||||
"client": "client_retryWrites_false",
|
||||
"databaseName": "backpressure-runCommand-requirements-db"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "retryable-writes-tests",
|
||||
"database": "database_retryWrites_false",
|
||||
"collectionName": "coll_retryWrites_false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"client": {
|
||||
"id": "internal_client",
|
||||
"useMultipleMongoses": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"tests": [
|
||||
{
|
||||
"description": "database.runCommand retries with defaults (retryReads=true and retryWrites=true)",
|
||||
"operations": [
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"client": "internal_client",
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 1
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"ping"
|
||||
],
|
||||
"errorLabels": [
|
||||
"RetryableError",
|
||||
"SystemOverloadedError"
|
||||
],
|
||||
"errorCode": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runCommand",
|
||||
"object": "database_defaults",
|
||||
"arguments": {
|
||||
"command": {
|
||||
"ping": 1
|
||||
},
|
||||
"commandName": "ping"
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client_defaults",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"description": "database.runCommand does not retry when retryReads=false",
|
||||
"operations": [
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"client": "internal_client",
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 1
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"ping"
|
||||
],
|
||||
"errorLabels": [
|
||||
"RetryableError",
|
||||
"SystemOverloadedError"
|
||||
],
|
||||
"errorCode": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runCommand",
|
||||
"object": "database_retryReads_false",
|
||||
"arguments": {
|
||||
"command": {
|
||||
"ping": 1
|
||||
},
|
||||
"commandName": "ping"
|
||||
},
|
||||
"expectError": {
|
||||
"isError": true,
|
||||
"isClientError": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client_retryReads_false",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"description": "database.runCommand does not retry when retryWrites=false",
|
||||
"operations": [
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"client": "internal_client",
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 1
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"ping"
|
||||
],
|
||||
"errorLabels": [
|
||||
"RetryableError",
|
||||
"SystemOverloadedError"
|
||||
],
|
||||
"errorCode": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runCommand",
|
||||
"object": "database_retryWrites_false",
|
||||
"arguments": {
|
||||
"command": {
|
||||
"ping": 1
|
||||
},
|
||||
"commandName": "ping"
|
||||
},
|
||||
"expectError": {
|
||||
"isError": true,
|
||||
"isClientError": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client_retryWrites_false",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "ping"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -227,7 +227,7 @@ class TestClientBackpressure(IntegrationTest):
|
||||
self.listener.reset()
|
||||
self.app_name = self.__class__.__name__.lower()
|
||||
self.client = self.rs_or_single_client(
|
||||
event_listeners=[self.listener], retryWrites=False, appName=self.app_name
|
||||
event_listeners=[self.listener], appName=self.app_name
|
||||
)
|
||||
|
||||
@patch("random.random")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user