PYTHON-3616 Use minimum RTT for CSOT maxTimeMS calculation (#1163)
Require at least 2 RTT samples, otherwise use 0 as RTT. Only keep last 10 samples.
This commit is contained in:
parent
715dd34810
commit
32faa261b6
@ -16,8 +16,9 @@
|
||||
|
||||
import functools
|
||||
import time
|
||||
from collections import deque
|
||||
from contextvars import ContextVar, Token
|
||||
from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast
|
||||
from typing import Any, Callable, Deque, MutableMapping, Optional, Tuple, TypeVar, cast
|
||||
|
||||
from pymongo.write_concern import WriteConcern
|
||||
|
||||
@ -116,3 +117,33 @@ def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcer
|
||||
wc.pop("wtimeout", None)
|
||||
if wc:
|
||||
cmd["writeConcern"] = wc
|
||||
|
||||
|
||||
_MAX_RTT_SAMPLES: int = 10
|
||||
_MIN_RTT_SAMPLES: int = 2
|
||||
|
||||
|
||||
class MovingMinimum:
|
||||
"""Tracks a minimum RTT within the last 10 RTT samples."""
|
||||
|
||||
samples: Deque[float]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.samples = deque(maxlen=_MAX_RTT_SAMPLES)
|
||||
|
||||
def add_sample(self, sample: float) -> None:
|
||||
if sample < 0:
|
||||
# Likely system time change while waiting for hello response
|
||||
# and not using time.monotonic. Ignore it, the next one will
|
||||
# probably be valid.
|
||||
return
|
||||
self.samples.append(sample)
|
||||
|
||||
def get(self) -> float:
|
||||
"""Get the min, or 0.0 if there aren't enough samples yet."""
|
||||
if len(self.samples) >= _MIN_RTT_SAMPLES:
|
||||
return min(self.samples)
|
||||
return 0.0
|
||||
|
||||
def reset(self) -> None:
|
||||
self.samples.clear()
|
||||
|
||||
@ -20,6 +20,7 @@ import weakref
|
||||
from typing import Any, Mapping, cast
|
||||
|
||||
from pymongo import common, periodic_executor
|
||||
from pymongo._csot import MovingMinimum
|
||||
from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelled
|
||||
from pymongo.hello import Hello
|
||||
from pymongo.lock import _create_lock
|
||||
@ -40,7 +41,7 @@ class MonitorBase(object):
|
||||
def __init__(self, topology, name, interval, min_interval):
|
||||
"""Base class to do periodic work on a background thread.
|
||||
|
||||
The the background thread is signaled to stop when the Topology or
|
||||
The background thread is signaled to stop when the Topology or
|
||||
this instance is freed.
|
||||
"""
|
||||
# We strongly reference the executor and it weakly references us via
|
||||
@ -250,7 +251,8 @@ class Monitor(MonitorBase):
|
||||
if not response.awaitable:
|
||||
self._rtt_monitor.add_sample(round_trip_time)
|
||||
|
||||
sd = ServerDescription(address, response, self._rtt_monitor.average())
|
||||
avg_rtt, min_rtt = self._rtt_monitor.get()
|
||||
sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt)
|
||||
if self._publish:
|
||||
self._listeners.publish_server_heartbeat_succeeded(
|
||||
address, round_trip_time, response, response.awaitable
|
||||
@ -350,6 +352,7 @@ class _RttMonitor(MonitorBase):
|
||||
|
||||
self._pool = pool
|
||||
self._moving_average = MovingAverage()
|
||||
self._moving_min = MovingMinimum()
|
||||
self._lock = _create_lock()
|
||||
|
||||
def close(self):
|
||||
@ -362,20 +365,22 @@ class _RttMonitor(MonitorBase):
|
||||
"""Add a RTT sample."""
|
||||
with self._lock:
|
||||
self._moving_average.add_sample(sample)
|
||||
self._moving_min.add_sample(sample)
|
||||
|
||||
def average(self):
|
||||
"""Get the calculated average, or None if no samples yet."""
|
||||
def get(self):
|
||||
"""Get the calculated average, or None if no samples yet and the min."""
|
||||
with self._lock:
|
||||
return self._moving_average.get()
|
||||
return self._moving_average.get(), self._moving_min.get()
|
||||
|
||||
def reset(self):
|
||||
"""Reset the average RTT."""
|
||||
with self._lock:
|
||||
return self._moving_average.reset()
|
||||
self._moving_average.reset()
|
||||
self._moving_min.reset()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
# NOTE: This thread is only run when when using the streaming
|
||||
# NOTE: This thread is only run when using the streaming
|
||||
# heartbeat protocol (MongoDB 4.4+).
|
||||
# XXX: Skip check if the server is unknown?
|
||||
rtt = self._ping()
|
||||
|
||||
@ -32,6 +32,7 @@ class ServerDescription(object):
|
||||
- `hello`: Optional Hello instance
|
||||
- `round_trip_time`: Optional float
|
||||
- `error`: Optional, the last error attempting to connect to the server
|
||||
- `round_trip_time`: Optional float, the min latency from the most recent samples
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
@ -47,6 +48,7 @@ class ServerDescription(object):
|
||||
"_min_wire_version",
|
||||
"_max_wire_version",
|
||||
"_round_trip_time",
|
||||
"_min_round_trip_time",
|
||||
"_me",
|
||||
"_is_writable",
|
||||
"_is_readable",
|
||||
@ -66,6 +68,7 @@ class ServerDescription(object):
|
||||
hello: Optional[Hello] = None,
|
||||
round_trip_time: Optional[float] = None,
|
||||
error: Optional[Exception] = None,
|
||||
min_round_trip_time: float = 0.0,
|
||||
) -> None:
|
||||
self._address = address
|
||||
if not hello:
|
||||
@ -88,6 +91,7 @@ class ServerDescription(object):
|
||||
self._is_readable = hello.is_readable
|
||||
self._ls_timeout_minutes = hello.logical_session_timeout_minutes
|
||||
self._round_trip_time = round_trip_time
|
||||
self._min_round_trip_time = min_round_trip_time
|
||||
self._me = hello.me
|
||||
self._last_update_time = time.monotonic()
|
||||
self._error = error
|
||||
@ -203,6 +207,11 @@ class ServerDescription(object):
|
||||
|
||||
return self._round_trip_time
|
||||
|
||||
@property
|
||||
def min_round_trip_time(self) -> float:
|
||||
"""The min latency from the most recent samples."""
|
||||
return self._min_round_trip_time
|
||||
|
||||
@property
|
||||
def error(self) -> Optional[Exception]:
|
||||
"""The last error attempting to connect to the server, or None."""
|
||||
|
||||
@ -271,7 +271,7 @@ class Topology(object):
|
||||
"""Like select_servers, but choose a random server if several match."""
|
||||
server = self._select_server(selector, server_selection_timeout, address)
|
||||
if _csot.get_timeout():
|
||||
_csot.set_rtt(server.description.round_trip_time)
|
||||
_csot.set_rtt(server.description.min_round_trip_time)
|
||||
return server
|
||||
|
||||
def select_server_by_address(self, address, server_selection_timeout=None):
|
||||
|
||||
@ -3,7 +3,14 @@
|
||||
"schemaVersion": "1.9",
|
||||
"runOnRequirements": [
|
||||
{
|
||||
"minServerVersion": "4.9"
|
||||
"minServerVersion": "4.9",
|
||||
"topologies": [
|
||||
"single",
|
||||
"replicaset",
|
||||
"sharded-replicaset",
|
||||
"sharded"
|
||||
],
|
||||
"serverless": "forbid"
|
||||
}
|
||||
],
|
||||
"createEntities": [
|
||||
@ -45,7 +52,7 @@
|
||||
],
|
||||
"appName": "reduceMaxTimeMSTest",
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 20
|
||||
"blockTimeMS": 50
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -62,7 +69,8 @@
|
||||
"uriOptions": {
|
||||
"appName": "reduceMaxTimeMSTest",
|
||||
"w": 1,
|
||||
"timeoutMS": 500
|
||||
"timeoutMS": 500,
|
||||
"heartbeatFrequencyMS": 500
|
||||
},
|
||||
"observeEvents": [
|
||||
"commandStartedEvent"
|
||||
@ -86,6 +94,23 @@
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 1
|
||||
},
|
||||
"timeoutMS": 100000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "wait",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"ms": 1000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "timeoutCollection",
|
||||
@ -100,6 +125,15 @@
|
||||
{
|
||||
"client": "client",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert",
|
||||
"databaseName": "test",
|
||||
"command": {
|
||||
"insert": "timeoutColl"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert",
|
||||
@ -107,7 +141,7 @@
|
||||
"command": {
|
||||
"insert": "timeoutColl",
|
||||
"maxTimeMS": {
|
||||
"$$lte": 500
|
||||
"$$lte": 450
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -134,7 +168,7 @@
|
||||
],
|
||||
"appName": "rttTooHighTest",
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 20
|
||||
"blockTimeMS": 50
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -151,7 +185,8 @@
|
||||
"uriOptions": {
|
||||
"appName": "rttTooHighTest",
|
||||
"w": 1,
|
||||
"timeoutMS": 10
|
||||
"timeoutMS": 10,
|
||||
"heartbeatFrequencyMS": 500
|
||||
},
|
||||
"observeEvents": [
|
||||
"commandStartedEvent"
|
||||
@ -180,11 +215,16 @@
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 2
|
||||
}
|
||||
},
|
||||
"expectError": {
|
||||
"isTimeoutError": true
|
||||
"_id": 1
|
||||
},
|
||||
"timeoutMS": 100000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "wait",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"ms": 1000
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -204,7 +244,7 @@
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 2
|
||||
"_id": 3
|
||||
}
|
||||
},
|
||||
"expectError": {
|
||||
@ -216,7 +256,7 @@
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 2
|
||||
"_id": 4
|
||||
}
|
||||
},
|
||||
"expectError": {
|
||||
@ -227,7 +267,126 @@
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client",
|
||||
"events": []
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert",
|
||||
"databaseName": "test",
|
||||
"command": {
|
||||
"insert": "timeoutColl"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"description": "short-circuit is not enabled with only 1 RTT measurement",
|
||||
"operations": [
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"client": "failPointClient",
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"hello",
|
||||
"isMaster"
|
||||
],
|
||||
"appName": "reduceMaxTimeMSTest",
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 100
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "createEntities",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"entities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "client",
|
||||
"useMultipleMongoses": false,
|
||||
"uriOptions": {
|
||||
"appName": "reduceMaxTimeMSTest",
|
||||
"w": 1,
|
||||
"timeoutMS": 90,
|
||||
"heartbeatFrequencyMS": 100000
|
||||
},
|
||||
"observeEvents": [
|
||||
"commandStartedEvent"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database",
|
||||
"client": "client",
|
||||
"databaseName": "test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "timeoutCollection",
|
||||
"database": "database",
|
||||
"collectionName": "timeoutColl"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 1
|
||||
},
|
||||
"timeoutMS": 100000
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "timeoutCollection",
|
||||
"arguments": {
|
||||
"document": {
|
||||
"_id": 2
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert",
|
||||
"databaseName": "test",
|
||||
"command": {
|
||||
"insert": "timeoutColl"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert",
|
||||
"databaseName": "test",
|
||||
"command": {
|
||||
"insert": "timeoutColl",
|
||||
"maxTimeMS": {
|
||||
"$$lte": 450
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user