PYTHON-3808 add types to monitor.py (#1328)

This commit is contained in:
Iris 2023-08-01 09:41:07 -07:00 committed by GitHub
parent c88ae79e58
commit 883d57f7ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 36 deletions

View File

@ -14,10 +14,12 @@
"""Class to monitor a MongoDB server on a background thread."""
from __future__ import annotations
import atexit
import time
import weakref
from typing import Any, Mapping, cast
from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Tuple, cast
from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
@ -29,8 +31,13 @@ from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.srv_resolver import _SrvResolver
if TYPE_CHECKING:
from pymongo.pool import Connection, Pool
from pymongo.settings import TopologySettings
from pymongo.topology import Topology
def _sanitize(error):
def _sanitize(error: Exception) -> None:
"""PYTHON-2433 Clear error traceback info."""
error.__traceback__ = None
error.__context__ = None
@ -38,7 +45,7 @@ def _sanitize(error):
class MonitorBase:
def __init__(self, topology, name, interval, min_interval):
def __init__(self, topology: Topology, name: str, interval: int, min_interval: float):
"""Base class to do periodic work on a background thread.
The background thread is signaled to stop when the Topology or
@ -46,7 +53,7 @@ class MonitorBase:
"""
# We strongly reference the executor and it weakly references us via
# this closure. When the monitor is freed, stop the executor soon.
def target():
def target() -> bool:
monitor = self_ref()
if monitor is None:
return False # Stop the executor.
@ -59,7 +66,7 @@ class MonitorBase:
self._executor = executor
def _on_topology_gc(dummy=None):
def _on_topology_gc(dummy: Optional[Topology] = None) -> None:
# This prevents GC from waiting 10 seconds for hello to complete
# See test_cleanup_executors_on_client_del.
monitor = self_ref()
@ -71,35 +78,41 @@ class MonitorBase:
self._topology = weakref.proxy(topology, _on_topology_gc)
_register(self)
def open(self):
def open(self) -> None:
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
self._executor.open()
def gc_safe_close(self):
def gc_safe_close(self) -> None:
"""GC safe close."""
self._executor.close()
def close(self):
def close(self) -> None:
"""Close and stop monitoring.
open() restarts the monitor after closing.
"""
self.gc_safe_close()
def join(self, timeout=None):
def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the monitor to stop."""
self._executor.join(timeout)
def request_check(self):
def request_check(self) -> None:
"""If the monitor is sleeping, wake it soon."""
self._executor.wake()
class Monitor(MonitorBase):
def __init__(self, server_description, topology, pool, topology_settings):
def __init__(
self,
server_description: ServerDescription,
topology: Topology,
pool: Pool,
topology_settings: TopologySettings,
):
"""Class to monitor a MongoDB server on a background thread.
Pass an initial ServerDescription, a Topology, a Pool, and
@ -128,7 +141,7 @@ class Monitor(MonitorBase):
)
self.heartbeater = None
def cancel_check(self):
def cancel_check(self) -> None:
"""Cancel any concurrent hello check.
Note: this is called from a weakref.proxy callback and MUST NOT take
@ -141,7 +154,7 @@ class Monitor(MonitorBase):
# (depending on the platform).
context.cancel()
def _start_rtt_monitor(self):
def _start_rtt_monitor(self) -> None:
"""Start an _RttMonitor that periodically runs ping."""
# If this monitor is closed directly before (or during) this open()
# call, the _RttMonitor will not be closed. Checking if this monitor
@ -150,23 +163,23 @@ class Monitor(MonitorBase):
if self._executor._stopped:
self._rtt_monitor.close()
def gc_safe_close(self):
def gc_safe_close(self) -> None:
self._executor.close()
self._rtt_monitor.gc_safe_close()
self.cancel_check()
def close(self):
def close(self) -> None:
self.gc_safe_close()
self._rtt_monitor.close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._reset_connection()
def _reset_connection(self):
def _reset_connection(self) -> None:
# Clear our pooled connection.
self._pool.reset()
def _run(self):
def _run(self) -> None:
try:
prev_sd = self._server_description
try:
@ -203,7 +216,7 @@ class Monitor(MonitorBase):
# Topology was garbage-collected.
self.close()
def _check_server(self):
def _check_server(self) -> ServerDescription:
"""Call hello or read the next streaming response.
Returns a ServerDescription.
@ -234,7 +247,7 @@ class Monitor(MonitorBase):
# Server type defaults to Unknown.
return ServerDescription(address, error=error)
def _check_once(self):
def _check_once(self) -> ServerDescription:
"""A single attempt to call hello.
Returns a ServerDescription, or raises an exception.
@ -259,7 +272,7 @@ class Monitor(MonitorBase):
)
return sd
def _check_with_socket(self, conn):
def _check_with_socket(self, conn: Connection) -> Tuple[Hello, float]:
"""Return (Hello, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
@ -283,7 +296,7 @@ class Monitor(MonitorBase):
class SrvMonitor(MonitorBase):
def __init__(self, topology, topology_settings):
def __init__(self, topology: Topology, topology_settings: TopologySettings):
"""Class to poll SRV records on a background thread.
Pass a Topology and a TopologySettings.
@ -298,9 +311,10 @@ class SrvMonitor(MonitorBase):
)
self._settings = topology_settings
self._seedlist = self._settings._seeds
self._fqdn = self._settings.fqdn
assert isinstance(self._settings.fqdn, str)
self._fqdn: str = self._settings.fqdn
def _run(self):
def _run(self) -> None:
seedlist = self._get_seedlist()
if seedlist:
self._seedlist = seedlist
@ -310,7 +324,7 @@ class SrvMonitor(MonitorBase):
# Topology was garbage-collected.
self.close()
def _get_seedlist(self):
def _get_seedlist(self) -> Optional[List[Tuple[str, Any]]]:
"""Poll SRV records for a seedlist.
Returns a list of ServerDescriptions.
@ -338,7 +352,7 @@ class SrvMonitor(MonitorBase):
class _RttMonitor(MonitorBase):
def __init__(self, topology, topology_settings, pool):
def __init__(self, topology: Topology, topology_settings: TopologySettings, pool: Pool):
"""Maintain round trip times for a server.
The Topology is weakly referenced.
@ -355,30 +369,30 @@ class _RttMonitor(MonitorBase):
self._moving_min = MovingMinimum()
self._lock = _create_lock()
def close(self):
def close(self) -> None:
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._pool.reset()
def add_sample(self, sample):
def add_sample(self, sample: float) -> None:
"""Add a RTT sample."""
with self._lock:
self._moving_average.add_sample(sample)
self._moving_min.add_sample(sample)
def get(self):
def get(self) -> Tuple[Optional[float], float]:
"""Get the calculated average, or None if no samples yet and the min."""
with self._lock:
return self._moving_average.get(), self._moving_min.get()
def reset(self):
def reset(self) -> None:
"""Reset the average RTT."""
with self._lock:
self._moving_average.reset()
self._moving_min.reset()
def _run(self):
def _run(self) -> None:
try:
# NOTE: This thread is only run when using the streaming
# heartbeat protocol (MongoDB 4.4+).
@ -391,7 +405,7 @@ class _RttMonitor(MonitorBase):
except Exception:
self._pool.reset()
def _ping(self):
def _ping(self) -> float:
"""Run a "hello" command and return the RTT."""
with self._pool.checkout() as conn:
if self._executor._stopped:
@ -407,16 +421,16 @@ class _RttMonitor(MonitorBase):
_MONITORS = set()
def _register(monitor):
def _register(monitor: MonitorBase) -> None:
ref = weakref.ref(monitor, _unregister)
_MONITORS.add(ref)
def _unregister(monitor_ref):
def _unregister(monitor_ref: weakref.ReferenceType[MonitorBase]) -> None:
_MONITORS.remove(monitor_ref)
def _shutdown_monitors():
def _shutdown_monitors() -> None:
if _MONITORS is None:
return
@ -432,7 +446,7 @@ def _shutdown_monitors():
monitor = None
def _shutdown_resources():
def _shutdown_resources() -> None:
# _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
shutdown = _shutdown_monitors
if shutdown: # type:ignore[truthy-function]

View File

@ -85,7 +85,7 @@ class MockMonitor(Monitor):
def _check_once(self):
client = self.client
address = self._server_description.address
response, rtt = client.mock_hello("%s:%d" % address)
response, rtt = client.mock_hello("%s:%d" % address) # type: ignore[str-format]
return ServerDescription(address, Hello(response), rtt)