Merge branch 'master' into spec-resync-04-13-2026
This commit is contained in:
commit
792da43c29
45
.github/copilot-instructions.md
vendored
45
.github/copilot-instructions.md
vendored
@ -1,3 +1,44 @@
|
||||
Please see [AGENTS.md](../AGENTS.md).
|
||||
When reviewing code, focus on:
|
||||
|
||||
Follow the repository instructions defined in `AGENTS.md` when working in this codebase.
|
||||
## Security Critical Issues
|
||||
- Check for hardcoded secrets, API keys, or credentials.
|
||||
- Check for instances of potential method call injection, dynamic code execution, symbol injection or other code injection vulnerabilities.
|
||||
|
||||
## Performance Red Flags
|
||||
- Spot inefficient loops and algorithmic issues.
|
||||
- Check for memory leaks and resource cleanup.
|
||||
|
||||
## Code Quality Essentials
|
||||
- Methods should be focused and appropriately sized. If a method is doing too much, suggest refactorings to split it up.
|
||||
- Use clear, descriptive naming conventions.
|
||||
- Avoid encapsulation violations and ensure proper separation of concerns.
|
||||
- All public classes, modules, and methods should have clear documentation in Sphinx format.
|
||||
|
||||
## PyMongo-specific Concerns
|
||||
- Do not review files within `pymongo/synchronous` or files in `test/` that also have a file of the same name in `test/asynchronous` unless the reviewed changes include a `_IS_SYNC` statement. PyMongo generates these files from `pymongo/asynchronous` and `test/asynchronous` using `tools/synchro.py`.
|
||||
- All asynchronous functions must not call any blocking I/O.
|
||||
|
||||
## Review Style
|
||||
- Be specific and actionable in feedback.
|
||||
- Explain the "why" behind recommendations.
|
||||
- Acknowledge good patterns when you see them.
|
||||
- Ask clarifying questions when code intent is unclear.
|
||||
|
||||
Always prioritize security vulnerabilities and performance issues that could impact users.
|
||||
|
||||
Always suggest changes to improve readability and testability. For example, this suggestion seeks to make the code more readable, reusable, and testable:
|
||||
|
||||
```python
|
||||
# Instead of:
|
||||
if user.email and "@" in user.email and len(user.email) > 5:
|
||||
submit_button.enabled = True
|
||||
else:
|
||||
submit_button.enabled = False
|
||||
|
||||
# Consider:
|
||||
def valid_email(email):
|
||||
return email and "@" in email and len(email) > 5
|
||||
|
||||
|
||||
submit_button.enabled = valid_email(user.email)
|
||||
```
|
||||
|
||||
44
AGENTS.md
44
AGENTS.md
@ -1,44 +0,0 @@
|
||||
When reviewing code, focus on:
|
||||
|
||||
## Security Critical Issues
|
||||
- Check for hardcoded secrets, API keys, or credentials.
|
||||
- Check for instances of potential method call injection, dynamic code execution, symbol injection or other code injection vulnerabilities.
|
||||
|
||||
## Performance Red Flags
|
||||
- Spot inefficient loops and algorithmic issues.
|
||||
- Check for memory leaks and resource cleanup.
|
||||
|
||||
## Code Quality Essentials
|
||||
- Methods should be focused and appropriately sized. If a method is doing too much, suggest refactorings to split it up.
|
||||
- Use clear, descriptive naming conventions.
|
||||
- Avoid encapsulation violations and ensure proper separation of concerns.
|
||||
- All public classes, modules, and methods should have clear documentation in Sphinx format.
|
||||
|
||||
## PyMongo-specific Concerns
|
||||
- Do not review files within `pymongo/synchronous` or files in `test/` that also have a file of the same name in `test/asynchronous` unless the reviewed changes include a `_IS_SYNC` statement. PyMongo generates these files from `pymongo/asynchronous` and `test/asynchronous` using `tools/synchro.py`.
|
||||
- All asynchronous functions must not call any blocking I/O.
|
||||
|
||||
## Review Style
|
||||
- Be specific and actionable in feedback.
|
||||
- Explain the "why" behind recommendations.
|
||||
- Acknowledge good patterns when you see them.
|
||||
- Ask clarifying questions when code intent is unclear.
|
||||
|
||||
Always prioritize security vulnerabilities and performance issues that could impact users.
|
||||
|
||||
Always suggest changes to improve readability and testability. For example, this suggestion seeks to make the code more readable, reusable, and testable:
|
||||
|
||||
```python
|
||||
# Instead of:
|
||||
if user.email and "@" in user.email and len(user.email) > 5:
|
||||
submit_button.enabled = True
|
||||
else:
|
||||
submit_button.enabled = False
|
||||
|
||||
# Consider:
|
||||
def valid_email(email):
|
||||
return email and "@" in email and len(email) > 5
|
||||
|
||||
|
||||
submit_button.enabled = valid_email(user.email)
|
||||
```
|
||||
@ -2779,7 +2779,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
self._last_error: Optional[Exception] = None
|
||||
self._retrying = False
|
||||
self._always_retryable = False
|
||||
self._multiple_retries = _csot.get_timeout() is not None
|
||||
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
|
||||
self._client = mongo_client
|
||||
self._retry_policy = mongo_client._retry_policy
|
||||
self._func = func
|
||||
@ -2852,6 +2852,8 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
# ConnectionFailures do not supply a code property
|
||||
exc_code = getattr(exc, "code", None)
|
||||
overloaded = exc.has_error_label("SystemOverloadedError")
|
||||
if overloaded:
|
||||
self._max_retries = self._client.options.max_adaptive_retries
|
||||
always_retryable = exc.has_error_label("RetryableError") and overloaded
|
||||
if not self._client.options.retry_reads or (
|
||||
not always_retryable
|
||||
@ -2890,6 +2892,8 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
exc_to_check = exc.error
|
||||
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
|
||||
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
|
||||
if overloaded:
|
||||
self._max_retries = self._client.options.max_adaptive_retries
|
||||
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
|
||||
|
||||
# Always retry abortTransaction and commitTransaction up to once
|
||||
@ -2943,7 +2947,9 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
|
||||
def _is_not_eligible_for_retry(self) -> bool:
|
||||
"""Checks if the exchange is not eligible for retry"""
|
||||
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
|
||||
return not self._retryable or (
|
||||
self._is_retrying() and self._attempt_number >= self._max_retries
|
||||
)
|
||||
|
||||
def _is_retrying(self) -> bool:
|
||||
"""Checks if the exchange is currently undergoing a retry"""
|
||||
|
||||
@ -2769,7 +2769,7 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
self._last_error: Optional[Exception] = None
|
||||
self._retrying = False
|
||||
self._always_retryable = False
|
||||
self._multiple_retries = _csot.get_timeout() is not None
|
||||
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
|
||||
self._client = mongo_client
|
||||
self._retry_policy = mongo_client._retry_policy
|
||||
self._func = func
|
||||
@ -2842,6 +2842,8 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
# ConnectionFailures do not supply a code property
|
||||
exc_code = getattr(exc, "code", None)
|
||||
overloaded = exc.has_error_label("SystemOverloadedError")
|
||||
if overloaded:
|
||||
self._max_retries = self._client.options.max_adaptive_retries
|
||||
always_retryable = exc.has_error_label("RetryableError") and overloaded
|
||||
if not self._client.options.retry_reads or (
|
||||
not always_retryable
|
||||
@ -2880,6 +2882,8 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
exc_to_check = exc.error
|
||||
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
|
||||
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
|
||||
if overloaded:
|
||||
self._max_retries = self._client.options.max_adaptive_retries
|
||||
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
|
||||
|
||||
# Always retry abortTransaction and commitTransaction up to once
|
||||
@ -2933,7 +2937,9 @@ class _ClientConnectionRetryable(Generic[T]):
|
||||
|
||||
def _is_not_eligible_for_retry(self) -> bool:
|
||||
"""Checks if the exchange is not eligible for retry"""
|
||||
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
|
||||
return not self._retryable or (
|
||||
self._is_retrying() and self._attempt_number >= self._max_retries
|
||||
)
|
||||
|
||||
def _is_retrying(self) -> bool:
|
||||
"""Checks if the exchange is currently undergoing a retry"""
|
||||
|
||||
@ -20,8 +20,11 @@ import pprint
|
||||
import sys
|
||||
import threading
|
||||
from test.asynchronous.utils import async_set_fail_point
|
||||
from unittest import mock
|
||||
|
||||
from pymongo.errors import OperationFailure
|
||||
from pymongo import MongoClient
|
||||
from pymongo.common import MAX_ADAPTIVE_RETRIES
|
||||
from pymongo.errors import OperationFailure, PyMongoError
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -38,6 +41,7 @@ from test.utils_shared import (
|
||||
)
|
||||
|
||||
from pymongo.monitoring import (
|
||||
CommandFailedEvent,
|
||||
ConnectionCheckedOutEvent,
|
||||
ConnectionCheckOutFailedEvent,
|
||||
ConnectionCheckOutFailedReason,
|
||||
@ -145,6 +149,19 @@ class TestPoolPausedError(AsyncIntegrationTest):
|
||||
|
||||
|
||||
class TestRetryableReads(AsyncIntegrationTest):
|
||||
async def asyncSetUp(self) -> None:
|
||||
await super().asyncSetUp()
|
||||
self.setup_client = MongoClient(**async_client_context.client_options)
|
||||
self.addCleanup(self.setup_client.close)
|
||||
|
||||
# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
|
||||
def configure_fail_point_sync(self, command_args, off=False) -> None:
|
||||
cmd = {"configureFailPoint": "failCommand", **command_args}
|
||||
if off:
|
||||
cmd["mode"] = "off"
|
||||
cmd.pop("data", None)
|
||||
self.setup_client.admin.command(cmd)
|
||||
|
||||
@async_client_context.require_multiple_mongoses
|
||||
@async_client_context.require_failCommand_fail_point
|
||||
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
|
||||
@ -383,6 +400,117 @@ class TestRetryableReads(AsyncIntegrationTest):
|
||||
# 6. Assert that both events occurred on the same server.
|
||||
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id
|
||||
|
||||
@async_client_context.require_failCommand_fail_point
|
||||
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
|
||||
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = await self.async_rs_client(event_listeners=[listener])
|
||||
await client.test.test.insert_one({})
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
with self.assertRaises(PyMongoError):
|
||||
await client.test.test.find_one()
|
||||
|
||||
started_finds = [e for e in listener.started_events if e.command_name == "find"]
|
||||
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)
|
||||
|
||||
@async_client_context.require_failCommand_fail_point
|
||||
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
|
||||
async def test_backoff_is_not_applied_for_non_overload_errors(self):
|
||||
if _IS_SYNC:
|
||||
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
|
||||
else:
|
||||
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"
|
||||
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = await self.async_rs_client(event_listeners=[listener])
|
||||
await client.test.test.insert_one({})
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
# Perform a findOne operation with coll. Expect the operation to fail.
|
||||
with mock.patch(mock_target, return_value=0) as mock_backoff:
|
||||
with self.assertRaises(PyMongoError):
|
||||
await client.test.test.find_one()
|
||||
|
||||
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
|
||||
self.assertEqual(mock_backoff.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -21,6 +21,9 @@ import pprint
|
||||
import sys
|
||||
import threading
|
||||
from test.asynchronous.utils import async_set_fail_point, flaky
|
||||
from unittest import mock
|
||||
|
||||
from pymongo.common import MAX_ADAPTIVE_RETRIES
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -784,6 +787,111 @@ class TestErrorPropagationAfterEncounteringMultipleErrors(AsyncIntegrationTest):
|
||||
# Assert that the error does not contain the error label `NoWritesPerformed`.
|
||||
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]
|
||||
|
||||
async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
|
||||
# Create a client with retryWrites=true.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError", "RetryableWriteError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
with self.assertRaises(PyMongoError):
|
||||
await client.test.test.insert_one({"x": 1})
|
||||
|
||||
started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
|
||||
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)
|
||||
|
||||
async def test_backoff_is_not_applied_for_non_overload_errors(self):
|
||||
if _IS_SYNC:
|
||||
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
|
||||
else:
|
||||
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"
|
||||
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError", "RetryableWriteError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = await self.async_rs_client(event_listeners=[listener])
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
# Perform a findOne operation with coll. Expect the operation to fail.
|
||||
with mock.patch(mock_target, return_value=0) as mock_backoff:
|
||||
with self.assertRaises(PyMongoError):
|
||||
await client.test.test.insert_one({})
|
||||
|
||||
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
|
||||
self.assertEqual(mock_backoff.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -20,8 +20,11 @@ import pprint
|
||||
import sys
|
||||
import threading
|
||||
from test.utils import set_fail_point
|
||||
from unittest import mock
|
||||
|
||||
from pymongo.errors import OperationFailure
|
||||
from pymongo import MongoClient
|
||||
from pymongo.common import MAX_ADAPTIVE_RETRIES
|
||||
from pymongo.errors import OperationFailure, PyMongoError
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -38,6 +41,7 @@ from test.utils_shared import (
|
||||
)
|
||||
|
||||
from pymongo.monitoring import (
|
||||
CommandFailedEvent,
|
||||
ConnectionCheckedOutEvent,
|
||||
ConnectionCheckOutFailedEvent,
|
||||
ConnectionCheckOutFailedReason,
|
||||
@ -145,6 +149,19 @@ class TestPoolPausedError(IntegrationTest):
|
||||
|
||||
|
||||
class TestRetryableReads(IntegrationTest):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.setup_client = MongoClient(**client_context.client_options)
|
||||
self.addCleanup(self.setup_client.close)
|
||||
|
||||
# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
|
||||
def configure_fail_point_sync(self, command_args, off=False) -> None:
|
||||
cmd = {"configureFailPoint": "failCommand", **command_args}
|
||||
if off:
|
||||
cmd["mode"] = "off"
|
||||
cmd.pop("data", None)
|
||||
self.setup_client.admin.command(cmd)
|
||||
|
||||
@client_context.require_multiple_mongoses
|
||||
@client_context.require_failCommand_fail_point
|
||||
def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
|
||||
@ -381,6 +398,117 @@ class TestRetryableReads(IntegrationTest):
|
||||
# 6. Assert that both events occurred on the same server.
|
||||
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id
|
||||
|
||||
@client_context.require_failCommand_fail_point
|
||||
@client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
|
||||
def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = self.rs_client(event_listeners=[listener])
|
||||
client.test.test.insert_one({})
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
with self.assertRaises(PyMongoError):
|
||||
client.test.test.find_one()
|
||||
|
||||
started_finds = [e for e in listener.started_events if e.command_name == "find"]
|
||||
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)
|
||||
|
||||
@client_context.require_failCommand_fail_point
|
||||
@client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
|
||||
def test_backoff_is_not_applied_for_non_overload_errors(self):
|
||||
if _IS_SYNC:
|
||||
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
|
||||
else:
|
||||
mock_target = "pymongo.helpers._RetryPolicy.backoff"
|
||||
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["find"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = self.rs_client(event_listeners=[listener])
|
||||
client.test.test.insert_one({})
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
# Perform a findOne operation with coll. Expect the operation to fail.
|
||||
with mock.patch(mock_target, return_value=0) as mock_backoff:
|
||||
with self.assertRaises(PyMongoError):
|
||||
client.test.test.find_one()
|
||||
|
||||
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
|
||||
self.assertEqual(mock_backoff.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -21,6 +21,9 @@ import pprint
|
||||
import sys
|
||||
import threading
|
||||
from test.utils import flaky, set_fail_point
|
||||
from unittest import mock
|
||||
|
||||
from pymongo.common import MAX_ADAPTIVE_RETRIES
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -780,6 +783,111 @@ class TestErrorPropagationAfterEncounteringMultipleErrors(IntegrationTest):
|
||||
# Assert that the error does not contain the error label `NoWritesPerformed`.
|
||||
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]
|
||||
|
||||
def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
|
||||
# Create a client with retryWrites=true.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError", "RetryableWriteError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = self.rs_client(retryWrites=True, event_listeners=[listener])
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
with self.assertRaises(PyMongoError):
|
||||
client.test.test.insert_one({"x": 1})
|
||||
|
||||
started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
|
||||
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)
|
||||
|
||||
def test_backoff_is_not_applied_for_non_overload_errors(self):
|
||||
if _IS_SYNC:
|
||||
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
|
||||
else:
|
||||
mock_target = "pymongo.helpers._RetryPolicy.backoff"
|
||||
|
||||
# Create a client.
|
||||
listener = OvertCommandListener()
|
||||
|
||||
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
|
||||
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
|
||||
overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 1},
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
||||
"errorCode": 91,
|
||||
},
|
||||
}
|
||||
|
||||
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
|
||||
non_overload_fail_point = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": ["insert"],
|
||||
"errorCode": 91,
|
||||
"errorLabels": ["RetryableError", "RetryableWriteError"],
|
||||
},
|
||||
}
|
||||
|
||||
def failed(event: CommandFailedEvent) -> None:
|
||||
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
|
||||
if listener.failed_events:
|
||||
return
|
||||
assert event.failure["code"] == 91
|
||||
self.configure_fail_point_sync(non_overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
listener.failed_events.append(event)
|
||||
|
||||
listener.failed = failed
|
||||
|
||||
client = self.rs_client(event_listeners=[listener])
|
||||
|
||||
self.configure_fail_point_sync(overload_fail_point)
|
||||
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
|
||||
|
||||
# Perform a findOne operation with coll. Expect the operation to fail.
|
||||
with mock.patch(mock_target, return_value=0) as mock_backoff:
|
||||
with self.assertRaises(PyMongoError):
|
||||
client.test.test.insert_one({})
|
||||
|
||||
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
|
||||
self.assertEqual(mock_backoff.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user