311 lines
12 KiB
Python
311 lines
12 KiB
Python
# Copyright 2025-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test Client Backpressure spec."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
from time import perf_counter
|
|
from unittest.mock import patch
|
|
|
|
from pymongo.common import MAX_ADAPTIVE_RETRIES
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from test import (
|
|
IntegrationTest,
|
|
client_context,
|
|
unittest,
|
|
)
|
|
from test.unified_format import generate_test_classes
|
|
from test.utils_shared import EventListener, OvertCommandListener
|
|
|
|
from pymongo.errors import OperationFailure, PyMongoError
|
|
|
|
_IS_SYNC = True
|
|
|
|
# Mock a system overload error.
|
|
mock_overload_error = {
|
|
"configureFailPoint": "failCommand",
|
|
"mode": {"times": 1},
|
|
"data": {
|
|
"failCommands": ["find", "insert", "update"],
|
|
"errorCode": 462, # IngressRequestRateLimitExceeded
|
|
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
|
},
|
|
}
|
|
|
|
|
|
def get_mock_overload_error(times: int):
|
|
error = mock_overload_error.copy()
|
|
error["mode"] = {"times": times}
|
|
return error
|
|
|
|
|
|
class TestBackpressure(IntegrationTest):
|
|
RUN_ON_LOAD_BALANCER = True
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_retry_overload_error_command(self):
|
|
self.db.t.insert_one({"x": 1})
|
|
|
|
# Ensure command is retried on overload error.
|
|
fail_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES)
|
|
with self.fail_point(fail_many):
|
|
self.db.command("find", "t")
|
|
|
|
# Ensure command stops retrying after MAX_ADAPTIVE_RETRIES.
|
|
fail_too_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES + 1)
|
|
with self.fail_point(fail_too_many):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
self.db.command("find", "t")
|
|
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_retry_overload_error_find(self):
|
|
self.db.t.insert_one({"x": 1})
|
|
|
|
# Ensure command is retried on overload error.
|
|
fail_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES)
|
|
with self.fail_point(fail_many):
|
|
self.db.t.find_one()
|
|
|
|
# Ensure command stops retrying after MAX_ADAPTIVE_RETRIES.
|
|
fail_too_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES + 1)
|
|
with self.fail_point(fail_too_many):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
self.db.t.find_one()
|
|
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_retry_overload_error_insert_one(self):
|
|
# Ensure command is retried on overload error.
|
|
fail_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES)
|
|
with self.fail_point(fail_many):
|
|
self.db.t.insert_one({"x": 1})
|
|
|
|
# Ensure command stops retrying after MAX_ADAPTIVE_RETRIES.
|
|
fail_too_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES + 1)
|
|
with self.fail_point(fail_too_many):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
self.db.t.insert_one({"x": 1})
|
|
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_retry_overload_error_update_many(self):
|
|
# Even though update_many is not a retryable write operation, it will
|
|
# still be retried via the "RetryableError" error label.
|
|
self.db.t.insert_one({"x": 1})
|
|
|
|
# Ensure command is retried on overload error.
|
|
fail_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES)
|
|
with self.fail_point(fail_many):
|
|
self.db.t.update_many({}, {"$set": {"x": 2}})
|
|
|
|
# Ensure command stops retrying after MAX_ADAPTIVE_RETRIES.
|
|
fail_too_many = get_mock_overload_error(MAX_ADAPTIVE_RETRIES + 1)
|
|
with self.fail_point(fail_too_many):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
self.db.t.update_many({}, {"$set": {"x": 2}})
|
|
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_retry_overload_error_getMore(self):
|
|
coll = self.db.t
|
|
coll.insert_many([{"x": 1} for _ in range(10)])
|
|
|
|
# Ensure command is retried on overload error.
|
|
fail_many = {
|
|
"configureFailPoint": "failCommand",
|
|
"mode": {"times": MAX_ADAPTIVE_RETRIES},
|
|
"data": {
|
|
"failCommands": ["getMore"],
|
|
"errorCode": 462, # IngressRequestRateLimitExceeded
|
|
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
|
},
|
|
}
|
|
cursor = coll.find(batch_size=2)
|
|
cursor.next()
|
|
with self.fail_point(fail_many):
|
|
cursor.to_list()
|
|
|
|
# Ensure command stops retrying after MAX_ADAPTIVE_RETRIES.
|
|
fail_too_many = fail_many.copy()
|
|
fail_too_many["mode"] = {"times": MAX_ADAPTIVE_RETRIES + 1}
|
|
cursor = coll.find(batch_size=2)
|
|
cursor.next()
|
|
with self.fail_point(fail_too_many):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
cursor.to_list()
|
|
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
|
|
# Prose tests.
|
|
class TestClientBackpressure(IntegrationTest):
|
|
listener: EventListener
|
|
|
|
@classmethod
|
|
def setUpClass(cls) -> None:
|
|
cls.listener = OvertCommandListener()
|
|
|
|
@client_context.require_connection
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self.listener.reset()
|
|
self.app_name = self.__class__.__name__.lower()
|
|
self.client = self.rs_or_single_client(
|
|
event_listeners=[self.listener], appName=self.app_name
|
|
)
|
|
|
|
@patch("random.random")
|
|
@client_context.require_failCommand_appName
|
|
def test_01_operation_retry_uses_exponential_backoff(self, random_func):
|
|
# Drivers should test that retries do not occur immediately when a SystemOverloadedError is encountered.
|
|
|
|
# 1. let `client` be a `MongoClient`
|
|
client = self.client
|
|
|
|
# 2. let `collection` be a collection
|
|
collection = client.test.test
|
|
|
|
# 3. Now, run transactions without backoff:
|
|
|
|
# a. Configure the random number generator used for jitter to always return `0` -- this effectively disables backoff.
|
|
random_func.return_value = 0
|
|
|
|
# b. Configure the following failPoint:
|
|
fail_point = dict(
|
|
mode="alwaysOn",
|
|
data=dict(
|
|
failCommands=["insert"],
|
|
errorCode=2,
|
|
errorLabels=["SystemOverloadedError", "RetryableError"],
|
|
appName=self.app_name,
|
|
),
|
|
)
|
|
with self.fail_point(fail_point):
|
|
# c. Execute the following command. Expect that the command errors. Measure the duration of the command execution.
|
|
start0 = perf_counter()
|
|
with self.assertRaises(OperationFailure):
|
|
collection.insert_one({"a": 1})
|
|
end0 = perf_counter()
|
|
|
|
# d. Configure the random number generator used for jitter to always return `1`.
|
|
random_func.return_value = 1
|
|
|
|
# e. Execute step c again.
|
|
start1 = perf_counter()
|
|
with self.assertRaises(OperationFailure):
|
|
collection.insert_one({"a": 1})
|
|
end1 = perf_counter()
|
|
|
|
# f. Compare the times between the two runs.
|
|
# The sum of 2 backoffs is 0.3 seconds. There is a 0.3-second window to account for potential variance between the two
|
|
# runs.
|
|
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 0.3)) < 0.3)
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_03_overload_retries_limited(self):
|
|
# Drivers should test that overload errors are retried a maximum of two times.
|
|
|
|
# 1. Let `client` be a `MongoClient`.
|
|
client = self.client
|
|
# 2. Let `coll` be a collection.
|
|
coll = client.pymongo_test.coll
|
|
|
|
# 3. Configure the following failpoint:
|
|
failpoint = {
|
|
"configureFailPoint": "failCommand",
|
|
"mode": "alwaysOn",
|
|
"data": {
|
|
"failCommands": ["find"],
|
|
"errorCode": 462, # IngressRequestRateLimitExceeded
|
|
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
|
},
|
|
}
|
|
|
|
# 4. Perform a find operation with `coll` that fails.
|
|
with self.fail_point(failpoint):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
coll.find_one({})
|
|
|
|
# 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
# 6. Assert that the total number of started commands is MAX_ADAPTIVE_RETRIES + 1.
|
|
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)
|
|
|
|
@client_context.require_failCommand_appName
|
|
def test_04_overload_retries_limited_configured(self):
|
|
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
|
|
max_retries = 1
|
|
|
|
# 1. Let `client` be a `MongoClient` with `maxAdaptiveRetries=1` and command event monitoring enabled.
|
|
client = self.single_client(maxAdaptiveRetries=max_retries, event_listeners=[self.listener])
|
|
# 2. Let `coll` be a collection.
|
|
coll = client.pymongo_test.coll
|
|
|
|
# 3. Configure the following failpoint:
|
|
failpoint = {
|
|
"configureFailPoint": "failCommand",
|
|
"mode": "alwaysOn",
|
|
"data": {
|
|
"failCommands": ["find"],
|
|
"errorCode": 462, # IngressRequestRateLimitExceeded
|
|
"errorLabels": ["RetryableError", "SystemOverloadedError"],
|
|
},
|
|
}
|
|
|
|
# 4. Perform a find operation with `coll` that fails.
|
|
with self.fail_point(failpoint):
|
|
with self.assertRaises(PyMongoError) as error:
|
|
coll.find_one({})
|
|
|
|
# 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
|
|
self.assertIn("RetryableError", str(error.exception))
|
|
self.assertIn("SystemOverloadedError", str(error.exception))
|
|
|
|
# 6. Assert that the total number of started commands is max_retries + 1.
|
|
self.assertEqual(len(self.listener.started_events), max_retries + 1)
|
|
|
|
|
|
# Location of JSON test specifications.
|
|
if _IS_SYNC:
|
|
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "client-backpressure")
|
|
else:
|
|
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent.parent, "client-backpressure")
|
|
|
|
globals().update(
|
|
generate_test_classes(
|
|
_TEST_PATH,
|
|
module=__name__,
|
|
)
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|