diff --git a/pymongo/_csot.py b/pymongo/_csot.py index 5170c0d8c..8a4617eca 100644 --- a/pymongo/_csot.py +++ b/pymongo/_csot.py @@ -16,8 +16,9 @@ import functools import time +from collections import deque from contextvars import ContextVar, Token -from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast +from typing import Any, Callable, Deque, MutableMapping, Optional, Tuple, TypeVar, cast from pymongo.write_concern import WriteConcern @@ -116,3 +117,33 @@ def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcer wc.pop("wtimeout", None) if wc: cmd["writeConcern"] = wc + + +_MAX_RTT_SAMPLES: int = 10 +_MIN_RTT_SAMPLES: int = 2 + + +class MovingMinimum: + """Tracks a minimum RTT within the last 10 RTT samples.""" + + samples: Deque[float] + + def __init__(self) -> None: + self.samples = deque(maxlen=_MAX_RTT_SAMPLES) + + def add_sample(self, sample: float) -> None: + if sample < 0: + # Likely system time change while waiting for hello response + # and not using time.monotonic. Ignore it, the next one will + # probably be valid. + return + self.samples.append(sample) + + def get(self) -> float: + """Get the min, or 0.0 if there aren't enough samples yet.""" + if len(self.samples) >= _MIN_RTT_SAMPLES: + return min(self.samples) + return 0.0 + + def reset(self) -> None: + self.samples.clear() diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 44390e918..9031d4b78 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -20,6 +20,7 @@ import weakref from typing import Any, Mapping, cast from pymongo import common, periodic_executor +from pymongo._csot import MovingMinimum from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock @@ -40,7 +41,7 @@ class MonitorBase(object): def __init__(self, topology, name, interval, min_interval): """Base class to do periodic work on a background thread. - The the background thread is signaled to stop when the Topology or + The background thread is signaled to stop when the Topology or this instance is freed. """ # We strongly reference the executor and it weakly references us via @@ -250,7 +251,8 @@ class Monitor(MonitorBase): if not response.awaitable: self._rtt_monitor.add_sample(round_trip_time) - sd = ServerDescription(address, response, self._rtt_monitor.average()) + avg_rtt, min_rtt = self._rtt_monitor.get() + sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt) if self._publish: self._listeners.publish_server_heartbeat_succeeded( address, round_trip_time, response, response.awaitable @@ -350,6 +352,7 @@ class _RttMonitor(MonitorBase): self._pool = pool self._moving_average = MovingAverage() + self._moving_min = MovingMinimum() self._lock = _create_lock() def close(self): @@ -362,20 +365,22 @@ class _RttMonitor(MonitorBase): """Add a RTT sample.""" with self._lock: self._moving_average.add_sample(sample) + self._moving_min.add_sample(sample) - def average(self): - """Get the calculated average, or None if no samples yet.""" + def get(self): + """Get the calculated average, or None if no samples yet and the min.""" with self._lock: - return self._moving_average.get() + return self._moving_average.get(), self._moving_min.get() def reset(self): """Reset the average RTT.""" with self._lock: - return self._moving_average.reset() + self._moving_average.reset() + self._moving_min.reset() def _run(self): try: - # NOTE: This thread is only run when when using the streaming + # NOTE: This thread is only run when using the streaming # heartbeat protocol (MongoDB 4.4+). # XXX: Skip check if the server is unknown? rtt = self._ping() diff --git a/pymongo/server_description.py b/pymongo/server_description.py index 47e27c531..53f90cea2 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -32,6 +32,7 @@ class ServerDescription(object): - `hello`: Optional Hello instance - `round_trip_time`: Optional float - `error`: Optional, the last error attempting to connect to the server + - `round_trip_time`: Optional float, the min latency from the most recent samples """ __slots__ = ( @@ -47,6 +48,7 @@ class ServerDescription(object): "_min_wire_version", "_max_wire_version", "_round_trip_time", + "_min_round_trip_time", "_me", "_is_writable", "_is_readable", @@ -66,6 +68,7 @@ class ServerDescription(object): hello: Optional[Hello] = None, round_trip_time: Optional[float] = None, error: Optional[Exception] = None, + min_round_trip_time: float = 0.0, ) -> None: self._address = address if not hello: @@ -88,6 +91,7 @@ class ServerDescription(object): self._is_readable = hello.is_readable self._ls_timeout_minutes = hello.logical_session_timeout_minutes self._round_trip_time = round_trip_time + self._min_round_trip_time = min_round_trip_time self._me = hello.me self._last_update_time = time.monotonic() self._error = error @@ -203,6 +207,11 @@ class ServerDescription(object): return self._round_trip_time + @property + def min_round_trip_time(self) -> float: + """The min latency from the most recent samples.""" + return self._min_round_trip_time + @property def error(self) -> Optional[Exception]: """The last error attempting to connect to the server, or None.""" diff --git a/pymongo/topology.py b/pymongo/topology.py index 87a566fa6..904f6b183 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -271,7 +271,7 @@ class Topology(object): """Like select_servers, but choose a random server if several match.""" server = self._select_server(selector, server_selection_timeout, address) if _csot.get_timeout(): - _csot.set_rtt(server.description.round_trip_time) + _csot.set_rtt(server.description.min_round_trip_time) return server def select_server_by_address(self, address, server_selection_timeout=None): diff --git a/test/csot/command-execution.json b/test/csot/command-execution.json index 92358f218..10f87d43a 100644 --- a/test/csot/command-execution.json +++ b/test/csot/command-execution.json @@ -3,7 +3,14 @@ "schemaVersion": "1.9", "runOnRequirements": [ { - "minServerVersion": "4.9" + "minServerVersion": "4.9", + "topologies": [ + "single", + "replicaset", + "sharded-replicaset", + "sharded" + ], + "serverless": "forbid" } ], "createEntities": [ @@ -45,7 +52,7 @@ ], "appName": "reduceMaxTimeMSTest", "blockConnection": true, - "blockTimeMS": 20 + "blockTimeMS": 50 } } } @@ -62,7 +69,8 @@ "uriOptions": { "appName": "reduceMaxTimeMSTest", "w": 1, - "timeoutMS": 500 + "timeoutMS": 500, + "heartbeatFrequencyMS": 500 }, "observeEvents": [ "commandStartedEvent" @@ -86,6 +94,23 @@ ] } }, + { + "name": "insertOne", + "object": "timeoutCollection", + "arguments": { + "document": { + "_id": 1 + }, + "timeoutMS": 100000 + } + }, + { + "name": "wait", + "object": "testRunner", + "arguments": { + "ms": 1000 + } + }, { "name": "insertOne", "object": "timeoutCollection", @@ -100,6 +125,15 @@ { "client": "client", "events": [ + { + "commandStartedEvent": { + "commandName": "insert", + "databaseName": "test", + "command": { + "insert": "timeoutColl" + } + } + }, { "commandStartedEvent": { "commandName": "insert", @@ -107,7 +141,7 @@ "command": { "insert": "timeoutColl", "maxTimeMS": { - "$$lte": 500 + "$$lte": 450 } } } @@ -134,7 +168,7 @@ ], "appName": "rttTooHighTest", "blockConnection": true, - "blockTimeMS": 20 + "blockTimeMS": 50 } } } @@ -151,7 +185,8 @@ "uriOptions": { "appName": "rttTooHighTest", "w": 1, - "timeoutMS": 10 + "timeoutMS": 10, + "heartbeatFrequencyMS": 500 }, "observeEvents": [ "commandStartedEvent" @@ -180,11 +215,16 @@ "object": "timeoutCollection", "arguments": { "document": { - "_id": 2 - } - }, - "expectError": { - "isTimeoutError": true + "_id": 1 + }, + "timeoutMS": 100000 + } + }, + { + "name": "wait", + "object": "testRunner", + "arguments": { + "ms": 1000 } }, { @@ -204,7 +244,7 @@ "object": "timeoutCollection", "arguments": { "document": { - "_id": 2 + "_id": 3 } }, "expectError": { @@ -216,7 +256,7 @@ "object": "timeoutCollection", "arguments": { "document": { - "_id": 2 + "_id": 4 } }, "expectError": { @@ -227,7 +267,126 @@ "expectEvents": [ { "client": "client", - "events": [] + "events": [ + { + "commandStartedEvent": { + "commandName": "insert", + "databaseName": "test", + "command": { + "insert": "timeoutColl" + } + } + } + ] + } + ] + }, + { + "description": "short-circuit is not enabled with only 1 RTT measurement", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "reduceMaxTimeMSTest", + "blockConnection": true, + "blockTimeMS": 100 + } + } + } + }, + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "uriOptions": { + "appName": "reduceMaxTimeMSTest", + "w": 1, + "timeoutMS": 90, + "heartbeatFrequencyMS": 100000 + }, + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "timeoutCollection", + "database": "database", + "collectionName": "timeoutColl" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "timeoutCollection", + "arguments": { + "document": { + "_id": 1 + }, + "timeoutMS": 100000 + } + }, + { + "name": "insertOne", + "object": "timeoutCollection", + "arguments": { + "document": { + "_id": 2 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert", + "databaseName": "test", + "command": { + "insert": "timeoutColl" + } + } + }, + { + "commandStartedEvent": { + "commandName": "insert", + "databaseName": "test", + "command": { + "insert": "timeoutColl", + "maxTimeMS": { + "$$lte": 450 + } + } + } + } + ] } ] }