diff --git a/justfile b/justfile index 92bdee5be..7ac5bd33f 100644 --- a/justfile +++ b/justfile @@ -1,5 +1,7 @@ # See https://just.systems/man/en/ for instructions set shell := ["bash", "-c"] +# Do not modify the lock file when running justfile commands. +export UV_FROZEN := "1" # Commonly used command segments. typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" @@ -14,7 +16,7 @@ default: [private] resync: - @uv sync --quiet + @uv sync --quiet --frozen install: bash .evergreen/scripts/setup-dev-env.sh @@ -48,12 +50,12 @@ typing-pyright: && resync {{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py [group('lint')] -lint *args="": && resync - uvx pre-commit run --all-files {{args}} +lint: && resync + uv run pre-commit run --all-files [group('lint')] -lint-manual *args="": && resync - uvx pre-commit run --all-files --hook-stage manual {{args}} +lint-manual: && resync + uv run pre-commit run --all-files --hook-stage manual [group('test')] test *args="-v --durations=5 --maxfail=10": && resync @@ -71,10 +73,6 @@ setup-tests *args="": teardown-tests: bash .evergreen/scripts/teardown-tests.sh -[group('test')] -integration-tests: - bash integration_tests/run.sh - [group('server')] run-server *args="": bash .evergreen/scripts/run-server.sh {{args}} diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 4ff3caa10..9dc87ad28 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -16,11 +16,9 @@ from __future__ import annotations import asyncio -import builtins import functools import random import socket -import time import time as time # noqa: PLC0414 # needed in sync version from typing import ( Any, @@ -77,8 +75,8 @@ def _handle_reauth(func: F) -> F: return cast(F, inner) -_MAX_RETRIES = 3 -_BACKOFF_INITIAL = 0.05 +_MAX_RETRIES = 5 +_BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 # DRIVERS-3240 will determine these defaults. DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index f521091e3..2c9c70cd4 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -19,6 +19,8 @@ import collections import contextlib import logging import os +import socket +import ssl import sys import time import weakref @@ -52,10 +54,12 @@ from pymongo.errors import ( # type:ignore[attr-defined] DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, WaitQueueTimeoutError, + _CertificateError, ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _get_timeout_details, format_timeout_details @@ -769,8 +773,8 @@ class Pool: # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id if self.enabled_for_cmap: assert self.opts._event_listeners is not None @@ -1003,6 +1007,21 @@ class Pool: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException) -> None: + # Handle system overload condition for non-sdam pools. + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): + return + assert isinstance(error, AutoReconnect) # Appease type checker. + # If the original error was a DNS, certificate, or SSL error, ignore it. + if isinstance(error.__cause__, (_CertificateError, SSLErrors, socket.gaierror)): + # End of file errors are excluded, because the server may have disconnected + # during the handshake. + if not isinstance(error.__cause__, (ssl.SSLEOFError, ssl.SSLZeroReturnError)): + return + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1054,10 +1073,10 @@ class Pool: reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1066,18 +1085,22 @@ class Pool: self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: await conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) await conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) await conn.close_conn(ConnectionClosedReason.ERROR) raise @@ -1406,8 +1429,8 @@ class Pool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1418,8 +1441,9 @@ class Pool: await conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): await conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 283aabc69..a2b354f7c 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,7 +890,9 @@ class Topology: # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/pymongo/logger.py b/pymongo/logger.py index 1b3fe43b8..ccfc45ed8 100644 --- a/pymongo/logger.py +++ b/pymongo/logger.py @@ -42,6 +42,7 @@ class _ConnectionStatusMessage(str, enum.Enum): POOL_READY = "Connection pool ready" POOL_CLOSED = "Connection pool closed" POOL_CLEARED = "Connection pool cleared" + POOL_BACKOFF = "Connection pool backoff" CONN_CREATED = "Connection created" CONN_READY = "Connection ready" @@ -88,6 +89,7 @@ _SDAM_LOGGER = logging.getLogger("pymongo.topology") _VERBOSE_CONNECTION_ERROR_REASONS = { ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed", ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed", + ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff", ConnectionClosedReason.STALE: "Connection pool was stale", ConnectionClosedReason.ERROR: "An error occurred while using the connection", ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection", diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 46a78aea0..0dfbbb915 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -934,6 +934,9 @@ class ConnectionClosedReason: POOL_CLOSED = "poolClosed" """The pool was closed, making the connection no longer valid.""" + POOL_BACKOFF = "poolBackoff" + """The pool is in backoff mode.""" + class ConnectionCheckOutFailedReason: """An enum that defines values for `reason` on a diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index c1b75a3c9..2c57b7a59 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -16,7 +16,10 @@ from __future__ import annotations import asyncio +import functools +import random import socket +import time as time # noqa: PLC0414 # needed in sync version from typing import ( Any, Callable, @@ -24,10 +27,13 @@ from typing import ( cast, ) +from pymongo import _csot from pymongo.errors import ( OperationFailure, + PyMongoError, ) from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE +from pymongo.lock import _create_lock _IS_SYNC = True @@ -36,6 +42,7 @@ F = TypeVar("F", bound=Callable[..., Any]) def _handle_reauth(func: F) -> F: + @functools.wraps(func) def inner(*args: Any, **kwargs: Any) -> Any: no_reauth = kwargs.pop("no_reauth", False) from pymongo.message import _BulkWriteContext @@ -68,6 +75,123 @@ def _handle_reauth(func: F) -> F: return cast(F, inner) +_MAX_RETRIES = 5 +_BACKOFF_INITIAL = 0.1 +_BACKOFF_MAX = 10 +# DRIVERS-3240 will determine these defaults. +DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 +DEFAULT_RETRY_TOKEN_RETURN = 0.1 + + +def _backoff( + attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX +) -> float: + jitter = random.random() # noqa: S311 + return jitter * min(initial_delay * (2**attempt), max_delay) + + +class _TokenBucket: + """A token bucket implementation for rate limiting.""" + + def __init__( + self, + capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY, + return_rate: float = DEFAULT_RETRY_TOKEN_RETURN, + ): + self.lock = _create_lock() + self.capacity = capacity + # DRIVERS-3240 will determine how full the bucket should start. + self.tokens = capacity + self.return_rate = return_rate + + def consume(self) -> bool: + """Consume a token from the bucket if available.""" + with self.lock: + if self.tokens >= 1: + self.tokens -= 1 + return True + return False + + def deposit(self, retry: bool = False) -> None: + """Deposit a token back into the bucket.""" + retry_token = 1 if retry else 0 + with self.lock: + self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate) + + +class _RetryPolicy: + """A retry limiter that performs exponential backoff with jitter. + + Retry attempts are limited by a token bucket to prevent overwhelming the server during + a prolonged outage or high load. + """ + + def __init__( + self, + token_bucket: _TokenBucket, + attempts: int = _MAX_RETRIES, + backoff_initial: float = _BACKOFF_INITIAL, + backoff_max: float = _BACKOFF_MAX, + ): + self.token_bucket = token_bucket + self.attempts = attempts + self.backoff_initial = backoff_initial + self.backoff_max = backoff_max + + def record_success(self, retry: bool) -> None: + """Record a successful operation.""" + self.token_bucket.deposit(retry) + + def backoff(self, attempt: int) -> float: + """Return the backoff duration for the given .""" + return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + + def should_retry(self, attempt: int, delay: float) -> bool: + """Return if we have budget to retry and how long to backoff.""" + if attempt > self.attempts: + return False + + # If the delay would exceed the deadline, bail early before consuming a token. + if _csot.get_timeout(): + if time.monotonic() + delay > _csot.get_deadline(): + return False + + # Check token bucket last since we only want to consume a token if we actually retry. + if not self.token_bucket.consume(): + # DRIVERS-3246 Improve diagnostics when this case happens. + # We could add info to the exception and log. + return False + return True + + +def _retry_overload(func: F) -> F: + @functools.wraps(func) + def inner(self: Any, *args: Any, **kwargs: Any) -> Any: + retry_policy = self._retry_policy + attempt = 0 + while True: + try: + res = func(self, *args, **kwargs) + retry_policy.record_success(retry=attempt > 0) + return res + except PyMongoError as exc: + if not exc.has_error_label("RetryableError"): + raise + attempt += 1 + delay = 0 + if exc.has_error_label("SystemOverloadedError"): + delay = retry_policy.backoff(attempt) + if not retry_policy.should_retry(attempt, delay): + raise + + # Implement exponential backoff on retry. + if delay: + time.sleep(delay) + continue + + return cast(F, inner) + + def _getaddrinfo( host: Any, port: Any, **kwargs: Any ) -> list[ diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 66258fda1..9e50fa590 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -19,6 +19,8 @@ import collections import contextlib import logging import os +import socket +import ssl import sys import time import weakref @@ -49,10 +51,12 @@ from pymongo.errors import ( # type:ignore[attr-defined] DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, WaitQueueTimeoutError, + _CertificateError, ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _get_timeout_details, format_timeout_details @@ -767,8 +771,8 @@ class Pool: # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 + self._max_connecting = self.opts.max_connecting self._client_id = client_id if self.enabled_for_cmap: assert self.opts._event_listeners is not None @@ -999,6 +1003,21 @@ class Pool: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException) -> None: + # Handle system overload condition for non-sdam pools. + # Look for errors of type AutoReconnect and add error labels if appropriate. + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): + return + assert isinstance(error, AutoReconnect) # Appease type checker. + # If the original error was a DNS, certificate, or SSL error, ignore it. + if isinstance(error.__cause__, (_CertificateError, SSLErrors, socket.gaierror)): + # End of file errors are excluded, because the server may have disconnected + # during the handshake. + if not isinstance(error.__cause__, (ssl.SSLEOFError, ssl.SSLZeroReturnError)): + return + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1050,10 +1069,10 @@ class Pool: reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + self._handle_connection_error(error) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1062,18 +1081,22 @@ class Pool: self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + completed_hello = False try: if not self.is_sdam: conn.hello() + completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) + if not completed_hello: + self._handle_connection_error(e) conn.close_conn(ConnectionClosedReason.ERROR) raise @@ -1402,8 +1425,8 @@ class Pool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1414,8 +1437,9 @@ class Pool: conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index a4ca0e6e0..e967c2089 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,7 +888,9 @@ class Topology: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or ( + error.has_error_label("SystemOverloadedError") + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index 5820d00c4..67df478b9 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -25,8 +25,10 @@ from asyncio import StreamReader, StreamWriter from pathlib import Path from test.asynchronous.helpers import ConcurrentRunner from test.asynchronous.utils import flaky +from test.utils_shared import delay from pymongo.asynchronous.pool import AsyncConnection +from pymongo.errors import ConnectionFailure from pymongo.operations import _Op from pymongo.server_selectors import writable_server_selector @@ -70,7 +72,12 @@ from pymongo.errors import ( ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _check_command_response, _check_write_command_response -from pymongo.monitoring import ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent +from pymongo.monitoring import ( + ConnectionCheckOutFailedEvent, + PoolClearedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatStartedEvent, +) from pymongo.server_description import SERVER_TYPE, ServerDescription from pymongo.topology_description import TOPOLOGY_TYPE @@ -446,6 +453,59 @@ class TestPoolManagement(AsyncIntegrationTest): AsyncConnection.close_conn = original_close +class TestPoolBackpressure(AsyncIntegrationTest): + @async_client_context.require_version_min(7, 0, 0) + async def test_connection_pool_is_not_cleared(self): + listener = CMAPListener() + + # Create a client that listens to CMAP events, with maxConnecting=100. + client = await self.async_rs_or_single_client(maxConnecting=100, event_listeners=[listener]) + + # Enable the ingress rate limiter. + await client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True + ) + await client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20) + await client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1 + ) + await client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1) + + # Disable the ingress rate limiter on teardown. + # Sleep for 1 second before disabling to avoid the rate limiter. + async def teardown(): + await asyncio.sleep(1) + await client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False + ) + + self.addAsyncCleanup(teardown) + + # Make sure the collection has at least one document. + await client.test.test.delete_many({}) + await client.test.test.insert_one({}) + + # Run a slow operation to tie up the connection. + async def target(): + try: + await client.test.test.find_one({"$where": delay(0.1)}) + except ConnectionFailure: + pass + + # Run 100 parallel operations that contend for connections. + tasks = [] + for _ in range(100): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + await t.start() + for t in tasks: + await t.join() + + # Verify there were at least 10 connection checkout failed event but no pool cleared events. + self.assertGreater(len(listener.events_by_type(ConnectionCheckOutFailedEvent)), 10) + self.assertEqual(len(listener.events_by_type(PoolClearedEvent)), 0) + + class TestServerMonitoringMode(AsyncIntegrationTest): @async_client_context.require_no_load_balancer async def asyncSetUp(self): diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 3193d9e3d..2f0d5fc96 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -29,6 +29,7 @@ from pymongo import AsyncMongoClient, message, timeout from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _async_create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -513,6 +514,39 @@ class TestPooling(_TestPoolingBase): str(error.exception), ) + @async_client_context.require_failCommand_appName + async def test_pool_backpressure_preserves_existing_connections(self): + client = await self.async_rs_or_single_client() + coll = client.pymongo_test.t + pool = await async_get_pool(client) + await coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + await t.start() + while t.state != "connection": + await asyncio.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + async with self.fail_point(mock_connection_fail): + await coll.find_one({}) + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + await t.release_conn() + await t.join() + await pool.close() + class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 1c744b850..4334ce257 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -9,15 +9,13 @@ ], "failPoint": { "configureFailPoint": "failCommand", - "mode": { - "times": 50 - }, + "mode": "alwaysOn", "data": { "failCommands": [ "isMaster", "hello" ], - "closeConnection": true, + "errorCode": 91, "appName": "poolCreateMinSizeErrorTest" } }, diff --git a/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json new file mode 100644 index 000000000..f41b76459 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-error-fail.json @@ -0,0 +1,140 @@ +{ + "description": "backpressure-network-error-fail", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-error-fail", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "apply backpressure on network connection errors during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 1000000, + "serverMonitoringMode": "poll", + "appname": "backpressureNetworkErrorFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-error-fail" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backpressureNetworkErrorFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json new file mode 100644 index 000000000..a97c7a329 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-network-timeout-fail.json @@ -0,0 +1,143 @@ +{ + "description": "backpressure-network-timeout-error", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backpressure-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "apply backpressure on network timeout error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverDescriptionChangedEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 1000000, + "appname": "backpressureNetworkTimeoutErrorTest", + "serverMonitoringMode": "poll", + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backpressure-network-timeout-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "blockConnection": true, + "blockTimeMS": 500, + "appName": "backpressureNetworkTimeoutErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true, + "errorLabelsContain": [ + "SystemOverloadedError", + "RetryableError" + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.json b/test/discovery_and_monitoring/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.json new file mode 100644 index 000000000..35a49c132 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.json @@ -0,0 +1,106 @@ +{ + "description": "backpressure-server-description-unchanged-on-min-pool-size-population-error", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "tests": [ + { + "description": "the server description is not changed on handshake error during minPoolSize population", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "serverDescriptionChangedEvent", + "connectionClosedEvent" + ], + "uriOptions": { + "appname": "authErrorTest", + "minPoolSize": 5, + "maxConnecting": 1, + "serverMonitoringMode": "poll", + "heartbeatFrequencyMS": 1000000 + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "skip": 1 + }, + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "authErrorTest", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": {} + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "connectionClosedEvent": {} + }, + "count": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "events": [ + { + "serverDescriptionChangedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 5892dcacd..41bc90be7 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -282,7 +282,7 @@ "isMaster", "hello" ], - "closeConnection": true, + "errorCode": 11600, "appName": "lbSDAMErrorTestClient" } } @@ -297,7 +297,7 @@ } }, "expectError": { - "isClientError": true + "isError": true } } ], diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 67a82996b..2318f259e 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -25,7 +25,9 @@ from asyncio import StreamReader, StreamWriter from pathlib import Path from test.helpers import ConcurrentRunner from test.utils import flaky +from test.utils_shared import delay +from pymongo.errors import ConnectionFailure from pymongo.operations import _Op from pymongo.server_selectors import writable_server_selector from pymongo.synchronous.pool import Connection @@ -67,7 +69,12 @@ from pymongo.errors import ( ) from pymongo.hello import Hello, HelloCompat from pymongo.helpers_shared import _check_command_response, _check_write_command_response -from pymongo.monitoring import ServerHeartbeatFailedEvent, ServerHeartbeatStartedEvent +from pymongo.monitoring import ( + ConnectionCheckOutFailedEvent, + PoolClearedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatStartedEvent, +) from pymongo.server_description import SERVER_TYPE, ServerDescription from pymongo.synchronous.settings import TopologySettings from pymongo.synchronous.topology import Topology, _ErrorContext @@ -444,6 +451,57 @@ class TestPoolManagement(IntegrationTest): Connection.close_conn = original_close +class TestPoolBackpressure(IntegrationTest): + @client_context.require_version_min(7, 0, 0) + def test_connection_pool_is_not_cleared(self): + listener = CMAPListener() + + # Create a client that listens to CMAP events, with maxConnecting=100. + client = self.rs_or_single_client(maxConnecting=100, event_listeners=[listener]) + + # Enable the ingress rate limiter. + client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True + ) + client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20) + client.admin.command("setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1) + client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1) + + # Disable the ingress rate limiter on teardown. + # Sleep for 1 second before disabling to avoid the rate limiter. + def teardown(): + time.sleep(1) + client.admin.command( + "setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False + ) + + self.addCleanup(teardown) + + # Make sure the collection has at least one document. + client.test.test.delete_many({}) + client.test.test.insert_one({}) + + # Run a slow operation to tie up the connection. + def target(): + try: + client.test.test.find_one({"$where": delay(0.1)}) + except ConnectionFailure: + pass + + # Run 100 parallel operations that contend for connections. + tasks = [] + for _ in range(100): + tasks.append(ConcurrentRunner(target=target)) + for t in tasks: + t.start() + for t in tasks: + t.join() + + # Verify there were at least 10 connection checkout failed event but no pool cleared events. + self.assertGreater(len(listener.events_by_type(ConnectionCheckOutFailedEvent)), 10) + self.assertEqual(len(listener.events_by_type(PoolClearedEvent)), 0) + + class TestServerMonitoringMode(IntegrationTest): @client_context.require_no_load_balancer def setUp(self): diff --git a/test/test_pooling.py b/test/test_pooling.py index cb5b20699..0f7ef144f 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -29,6 +29,7 @@ from pymongo import MongoClient, message, timeout from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -511,6 +512,39 @@ class TestPooling(_TestPoolingBase): str(error.exception), ) + @client_context.require_failCommand_appName + def test_pool_backpressure_preserves_existing_connections(self): + client = self.rs_or_single_client() + coll = client.pymongo_test.t + pool = get_pool(client) + coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + t.start() + while t.state != "connection": + time.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + with self.fail_point(mock_connection_fail): + coll.find_one({}) + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + t.release_conn() + t.join() + pool.close() + class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self): diff --git a/tools/synchro.py b/tools/synchro.py index 492fbf428..661d8988c 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -349,7 +349,7 @@ def translate_async_sleeps(lines: list[str]) -> list[str]: sleeps = [line for line in lines if "asyncio.sleep" in line] for line in sleeps: - res = re.search(r"asyncio.sleep\(([^()]*)\)", line) + res = re.search(r"asyncio\.sleep\(\s*(.*?)\)", line) if res: old = res[0] index = lines.index(line)