PYTHON-5528 - Token buckets are opt-in only

This commit is contained in:
Noah Stapp 2026-02-20 12:48:47 -05:00
parent 1d219a9002
commit e666b895d6
10 changed files with 216 additions and 60 deletions

View File

@ -79,7 +79,6 @@ def _handle_reauth(func: F) -> F:
_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
# DRIVERS-3240 will determine these defaults.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1
@ -101,7 +100,6 @@ class _TokenBucket:
):
self.lock = _async_create_lock()
self.capacity = capacity
# DRIVERS-3240 will determine how full the bucket should start.
self.tokens = capacity
self.return_rate = return_rate
@ -123,7 +121,7 @@ class _TokenBucket:
class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.
Retry attempts are limited by a token bucket to prevent overwhelming the server during
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""
@ -133,15 +131,18 @@ class _RetryPolicy:
attempts: int = _MAX_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry
async def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
await self.token_bucket.deposit(retry)
if self.adaptive_retry:
await self.token_bucket.deposit(retry)
def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
@ -158,7 +159,7 @@ class _RetryPolicy:
return False
# Check token bucket last since we only want to consume a token if we actually retry.
if not await self.token_bucket.consume():
if self.adaptive_retry and not await self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False

View File

@ -615,8 +615,18 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
details.
| **Adaptive retry options:**
| (If not enabled explicitly, adaptive retries will not be enabled.)
- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
Defaults to ``False``.
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
.. versionchanged:: 4.XX
Added the ``adaptive_retries`` URI and keyword argument.
.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.
@ -778,7 +788,6 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
self._timeout: float | None = None
self._topology_settings: TopologySettings = None # type: ignore[assignment]
self._event_listeners: _EventListeners | None = None
self._retry_policy = _RetryPolicy(_TokenBucket())
# _pool_class, _monitor_class, and _condition_class are for deep
# customization of PyMongo, e.g. Motor.
@ -890,6 +899,10 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
self._opened = False
self._closed = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._retry_policy = _RetryPolicy(
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
)
if not is_srv:
self._init_background()

View File

@ -235,6 +235,11 @@ class ClientOptions:
self.__server_monitoring_mode = options.get(
"servermonitoringmode", common.SERVER_MONITORING_MODE
)
self.__adaptive_retries = (
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
if "adaptive_retries" in options
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
)
@property
def _options(self) -> Mapping[str, Any]:
@ -346,3 +351,11 @@ class ClientOptions:
.. versionadded:: 4.5
"""
return self.__server_monitoring_mode
@property
def adaptive_retries(self) -> bool:
"""The configured adaptiveRetries option.
.. versionadded:: 4.XX
"""
return self.__adaptive_retries

View File

@ -140,6 +140,9 @@ SRV_SERVICE_NAME = "mongodb"
# Default value for serverMonitoringMode
SERVER_MONITORING_MODE = "auto" # poll/stream/auto
# Default value for adaptiveRetries
ADAPTIVE_RETRIES = False
# Auth mechanism properties that must raise an error instead of warning if they invalidate.
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]
@ -738,6 +741,7 @@ URI_OPTIONS_VALIDATOR_MAP: dict[str, Callable[[Any, Any], Any]] = {
"srvmaxhosts": validate_non_negative_integer,
"timeoutms": validate_timeoutms,
"servermonitoringmode": validate_server_monitoring_mode,
"adaptiveretries": validate_boolean_or_string,
}
# Dictionary where keys are the names of URI options specific to pymongo,
@ -771,6 +775,7 @@ KW_VALIDATORS: dict[str, Callable[[Any, Any], Any]] = {
"server_selector": validate_is_callable_or_none,
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
"authoidcallowedhosts": validate_list,
"adaptive_retries": validate_boolean_or_string,
}
# Dictionary where keys are any URI option name, and values are the

View File

@ -79,7 +79,6 @@ def _handle_reauth(func: F) -> F:
_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
# DRIVERS-3240 will determine these defaults.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1
@ -101,7 +100,6 @@ class _TokenBucket:
):
self.lock = _create_lock()
self.capacity = capacity
# DRIVERS-3240 will determine how full the bucket should start.
self.tokens = capacity
self.return_rate = return_rate
@ -123,7 +121,7 @@ class _TokenBucket:
class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.
Retry attempts are limited by a token bucket to prevent overwhelming the server during
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""
@ -133,15 +131,18 @@ class _RetryPolicy:
attempts: int = _MAX_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry
def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
self.token_bucket.deposit(retry)
if self.adaptive_retry:
self.token_bucket.deposit(retry)
def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
@ -158,7 +159,7 @@ class _RetryPolicy:
return False
# Check token bucket last since we only want to consume a token if we actually retry.
if not self.token_bucket.consume():
if self.adaptive_retry and not self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False

View File

@ -615,8 +615,18 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
details.
| **Adaptive retry options:**
| (If not enabled explicitly, adaptive retries will not be enabled.)
- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
Defaults to ``False``.
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
.. versionchanged:: 4.XX
Added the ``adaptive_retries`` URI and keyword argument.
.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.
@ -778,7 +788,6 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
self._timeout: float | None = None
self._topology_settings: TopologySettings = None # type: ignore[assignment]
self._event_listeners: _EventListeners | None = None
self._retry_policy = _RetryPolicy(_TokenBucket())
# _pool_class, _monitor_class, and _condition_class are for deep
# customization of PyMongo, e.g. Motor.
@ -890,6 +899,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
self._opened = False
self._closed = False
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._retry_policy = _RetryPolicy(
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
)
if not is_srv:
self._init_background()

View File

@ -652,6 +652,21 @@ class AsyncClientUnitTest(AsyncUnitTest):
with self.assertWarns(UserWarning):
self.simple_client(multi_host)
async def test_adaptive_retries(self):
# Assert that adaptive retries are disabled by default.
c = self.simple_client(connect=False)
self.assertFalse(c.options.adaptive_retries)
# Assert that adaptive retries can be enabled through connection or client options.
c = self.simple_client(connect=False, adaptive_retries=True)
self.assertTrue(c.options.adaptive_retries)
c = self.simple_client(connect=False, adaptiveRetries=True)
self.assertTrue(c.options.adaptive_retries)
c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
self.assertTrue(c.options.adaptive_retries)
class TestClient(AsyncIntegrationTest):
def test_multiple_uris(self):

View File

@ -168,34 +168,11 @@ class TestBackpressure(AsyncIntegrationTest):
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
@async_client_context.require_failCommand_appName
async def test_limit_retry_command(self):
client = await self.async_rs_or_single_client()
client._retry_policy.token_bucket.tokens = 1
db = client.pymongo_test
await db.t.insert_one({"x": 1})
# Ensure command is retried once overload error.
fail_many = mock_overload_error.copy()
fail_many["mode"] = {"times": 1}
async with self.fail_point(fail_many):
await db.command("find", "t")
# Ensure command stops retrying when there are no tokens left.
fail_too_many = mock_overload_error.copy()
fail_too_many["mode"] = {"times": 2}
async with self.fail_point(fail_too_many):
with self.assertRaises(PyMongoError) as error:
await db.command("find", "t")
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
class TestRetryPolicy(AsyncPyMongoTestCase):
async def test_retry_policy(self):
capacity = 10
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity))
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True)
self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES)
self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL)
self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX)
@ -300,6 +277,69 @@ class AsyncTestClientBackpressure(AsyncIntegrationTest):
# runs.
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1)
@async_client_context.require_failCommand_appName
async def test_02_overload_retries_limited(self):
# Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times.
# 1. Let `client` be a `MongoClient`.
client = self.client
# 3. Let `coll` be a collection.
coll = client.pymongo_test.coll
# 4. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}
# 5. Perform a find operation with `coll` that fails.
async with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
await coll.find_one({})
# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
# 7. Assert that the total number of started commands is MAX_RETRIES + 1.
self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1)
@async_client_context.require_failCommand_appName
async def test_03_adaptive_retries_limited_by_tokens(self):
# Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket.
# 1. Let `client` be a `MongoClient` with adaptiveRetries=True.
client = await self.async_rs_or_single_client(adaptive_retries=True)
# 2. Set `client`'s retry token bucket to have 1 token.
client._retry_policy.token_bucket.tokens = 1
# 3. Let `coll` be a collection.
coll = client.pymongo_test.coll
# 4. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
"data": {
"failCommands": ["find", "insert", "update"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}
# 5. Perform a find operation with `coll` that fails.
async with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
await coll.find_one({})
# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
# Location of JSON test specifications.
if _IS_SYNC:

View File

@ -645,6 +645,21 @@ class ClientUnitTest(UnitTest):
with self.assertWarns(UserWarning):
self.simple_client(multi_host)
def test_adaptive_retries(self):
# Assert that adaptive retries are disabled by default.
c = self.simple_client(connect=False)
self.assertFalse(c.options.adaptive_retries)
# Assert that adaptive retries can be enabled through connection or client options.
c = self.simple_client(connect=False, adaptive_retries=True)
self.assertTrue(c.options.adaptive_retries)
c = self.simple_client(connect=False, adaptiveRetries=True)
self.assertTrue(c.options.adaptive_retries)
c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
self.assertTrue(c.options.adaptive_retries)
class TestClient(IntegrationTest):
def test_multiple_uris(self):

View File

@ -168,34 +168,11 @@ class TestBackpressure(IntegrationTest):
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
@client_context.require_failCommand_appName
def test_limit_retry_command(self):
client = self.rs_or_single_client()
client._retry_policy.token_bucket.tokens = 1
db = client.pymongo_test
db.t.insert_one({"x": 1})
# Ensure command is retried once overload error.
fail_many = mock_overload_error.copy()
fail_many["mode"] = {"times": 1}
with self.fail_point(fail_many):
db.command("find", "t")
# Ensure command stops retrying when there are no tokens left.
fail_too_many = mock_overload_error.copy()
fail_too_many["mode"] = {"times": 2}
with self.fail_point(fail_too_many):
with self.assertRaises(PyMongoError) as error:
db.command("find", "t")
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
class TestRetryPolicy(PyMongoTestCase):
def test_retry_policy(self):
capacity = 10
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity))
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True)
self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES)
self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL)
self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX)
@ -300,6 +277,69 @@ class TestClientBackpressure(IntegrationTest):
# runs.
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1)
@client_context.require_failCommand_appName
def test_02_overload_retries_limited(self):
# Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times.
# 1. Let `client` be a `MongoClient`.
client = self.client
# 3. Let `coll` be a collection.
coll = client.pymongo_test.coll
# 4. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}
# 5. Perform a find operation with `coll` that fails.
with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
coll.find_one({})
# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
# 7. Assert that the total number of started commands is MAX_RETRIES + 1.
self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1)
@client_context.require_failCommand_appName
def test_03_adaptive_retries_limited_by_tokens(self):
# Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket.
# 1. Let `client` be a `MongoClient` with adaptiveRetries=True.
client = self.rs_or_single_client(adaptive_retries=True)
# 2. Set `client`'s retry token bucket to have 1 token.
client._retry_policy.token_bucket.tokens = 1
# 3. Let `coll` be a collection.
coll = client.pymongo_test.coll
# 4. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 2},
"data": {
"failCommands": ["find", "insert", "update"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}
# 5. Perform a find operation with `coll` that fails.
with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
coll.find_one({})
# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))
# Location of JSON test specifications.
if _IS_SYNC: