mongo-python-driver/pymongo/monitor.py
Bernie Hackett dee74f220c PYTHON-2707 Limit the use of 'master'
This commit limits the use of the word 'master'
as much as possible without breaking API or
breaking documentation links. PyMongo 4.0 will
include backward breaking API changes to do more.
2021-07-09 12:20:43 -07:00

441 lines
15 KiB
Python

# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Class to monitor a MongoDB server on a background thread."""
import atexit
import threading
import weakref
from bson.py3compat import PY3
from pymongo import common, periodic_executor
from pymongo.errors import (NotPrimaryError,
OperationFailure,
_OperationCancelled)
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
from pymongo.periodic_executor import _shutdown_executors
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.srv_resolver import _SrvResolver
def _sanitize(error):
"""PYTHON-2433 Clear error traceback info."""
if PY3:
error.__traceback__ = None
error.__context__ = None
error.__cause__ = None
class MonitorBase(object):
def __init__(self, topology, name, interval, min_interval):
"""Base class to do periodic work on a background thread.
The the background thread is signaled to stop when the Topology or
this instance is freed.
"""
# We strongly reference the executor and it weakly references us via
# this closure. When the monitor is freed, stop the executor soon.
def target():
monitor = self_ref()
if monitor is None:
return False # Stop the executor.
monitor._run()
return True
executor = periodic_executor.PeriodicExecutor(
interval=interval,
min_interval=min_interval,
target=target,
name=name)
self._executor = executor
def _on_topology_gc(dummy=None):
# This prevents GC from waiting 10 seconds for 'hello' to complete
# See test_cleanup_executors_on_client_del.
monitor = self_ref()
if monitor:
monitor.gc_safe_close()
# Avoid cycles. When self or topology is freed, stop executor soon.
self_ref = weakref.ref(self, executor.close)
self._topology = weakref.proxy(topology, _on_topology_gc)
_register(self)
def open(self):
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
self._executor.open()
def gc_safe_close(self):
"""GC safe close."""
self._executor.close()
def close(self):
"""Close and stop monitoring.
open() restarts the monitor after closing.
"""
self.gc_safe_close()
def join(self, timeout=None):
"""Wait for the monitor to stop."""
self._executor.join(timeout)
def request_check(self):
"""If the monitor is sleeping, wake it soon."""
self._executor.wake()
class Monitor(MonitorBase):
def __init__(
self,
server_description,
topology,
pool,
topology_settings):
"""Class to monitor a MongoDB server on a background thread.
Pass an initial ServerDescription, a Topology, a Pool, and
TopologySettings.
The Topology is weakly referenced. The Pool must be exclusive to this
Monitor.
"""
super(Monitor, self).__init__(
topology,
"pymongo_server_monitor_thread",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL)
self._server_description = server_description
self._pool = pool
self._settings = topology_settings
self._listeners = self._settings._pool_options.event_listeners
pub = self._listeners is not None
self._publish = pub and self._listeners.enabled_for_server_heartbeat
self._cancel_context = None
self._rtt_monitor = _RttMonitor(
topology, topology_settings, topology._create_pool_for_monitor(
server_description.address))
self.heartbeater = None
def cancel_check(self):
"""Cancel any concurrent hello check.
Note: this is called from a weakref.proxy callback and MUST NOT take
any locks.
"""
context = self._cancel_context
if context:
# Note: we cannot close the socket because doing so may cause
# concurrent reads/writes to hang until a timeout occurs
# (depending on the platform).
context.cancel()
def _start_rtt_monitor(self):
"""Start an _RttMonitor that periodically runs ping."""
# If this monitor is closed directly before (or during) this open()
# call, the _RttMonitor will not be closed. Checking if this monitor
# was closed directly after resolves the race.
self._rtt_monitor.open()
if self._executor._stopped:
self._rtt_monitor.close()
def gc_safe_close(self):
self._executor.close()
self._rtt_monitor.gc_safe_close()
self.cancel_check()
def close(self):
self.gc_safe_close()
self._rtt_monitor.close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._reset_connection()
def _reset_connection(self):
# Clear our pooled connection.
self._pool.reset()
def _run(self):
try:
prev_sd = self._server_description
try:
self._server_description = self._check_server()
except _OperationCancelled as exc:
_sanitize(exc)
# Already closed the connection, wait for the next check.
self._server_description = ServerDescription(
self._server_description.address, error=exc)
if prev_sd.is_server_type_known:
# Immediately retry since we've already waited 500ms to
# discover that we've been cancelled.
self._executor.skip_sleep()
return
# Update the Topology and clear the server pool on error.
self._topology.on_change(self._server_description,
reset_pool=self._server_description.error)
if (self._server_description.is_server_type_known and
self._server_description.topology_version):
self._start_rtt_monitor()
# Immediately check for the next streaming response.
self._executor.skip_sleep()
if self._server_description.error and prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
except ReferenceError:
# Topology was garbage-collected.
self.close()
def _check_server(self):
"""Call hello or read the next streaming response.
Returns a ServerDescription.
"""
start = _time()
try:
try:
return self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
self._topology.receive_cluster_time(
exc.details.get('$clusterTime'))
raise
except ReferenceError:
raise
except Exception as error:
_sanitize(error)
sd = self._server_description
address = sd.address
duration = _time() - start
if self._publish:
awaited = sd.is_server_type_known and sd.topology_version
self._listeners.publish_server_heartbeat_failed(
address, duration, error, awaited)
self._reset_connection()
if isinstance(error, _OperationCancelled):
raise
self._rtt_monitor.reset()
# Server type defaults to Unknown.
return ServerDescription(address, error=error)
def _check_once(self):
"""A single attempt to call hello.
Returns a ServerDescription, or raises an exception.
"""
address = self._server_description.address
if self._publish:
self._listeners.publish_server_heartbeat_started(address)
if self._cancel_context and self._cancel_context.cancelled:
self._reset_connection()
with self._pool.get_socket({}) as sock_info:
self._cancel_context = sock_info.cancel_context
response, round_trip_time = self._check_with_socket(sock_info)
if not response.awaitable:
self._rtt_monitor.add_sample(round_trip_time)
sd = ServerDescription(address, response,
self._rtt_monitor.average())
if self._publish:
self._listeners.publish_server_heartbeat_succeeded(
address, round_trip_time, response, response.awaitable)
return sd
def _check_with_socket(self, conn):
"""Return (Hello, round_trip_time).
Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = _time()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
response = IsMaster(conn._next_reply(), awaitable=True)
elif (conn.performed_handshake and
self._server_description.topology_version):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
None)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = conn._hello(cluster_time, None, None, None)
return response, _time() - start
class SrvMonitor(MonitorBase):
def __init__(self, topology, topology_settings):
"""Class to poll SRV records on a background thread.
Pass a Topology and a TopologySettings.
The Topology is weakly referenced.
"""
super(SrvMonitor, self).__init__(
topology,
"pymongo_srv_polling_thread",
common.MIN_SRV_RESCAN_INTERVAL,
topology_settings.heartbeat_frequency)
self._settings = topology_settings
self._seedlist = self._settings._seeds
self._fqdn = self._settings.fqdn
def _run(self):
seedlist = self._get_seedlist()
if seedlist:
self._seedlist = seedlist
try:
self._topology.on_srv_update(self._seedlist)
except ReferenceError:
# Topology was garbage-collected.
self.close()
def _get_seedlist(self):
"""Poll SRV records for a seedlist.
Returns a list of ServerDescriptions.
"""
try:
seedlist, ttl = _SrvResolver(self._fqdn).get_hosts_and_min_ttl()
if len(seedlist) == 0:
# As per the spec: this should be treated as a failure.
raise Exception
except Exception:
# As per the spec, upon encountering an error:
# - An error must not be raised
# - SRV records must be rescanned every heartbeatFrequencyMS
# - Topology must be left unchanged
self.request_check()
return None
else:
self._executor.update_interval(
max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
return seedlist
class _RttMonitor(MonitorBase):
def __init__(self, topology, topology_settings, pool):
"""Maintain round trip times for a server.
The Topology is weakly referenced.
"""
super(_RttMonitor, self).__init__(
topology,
"pymongo_server_rtt_thread",
topology_settings.heartbeat_frequency,
common.MIN_HEARTBEAT_INTERVAL)
self._pool = pool
self._moving_average = MovingAverage()
self._lock = threading.Lock()
def close(self):
self.gc_safe_close()
# Increment the generation and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._pool.reset()
def add_sample(self, sample):
"""Add a RTT sample."""
with self._lock:
self._moving_average.add_sample(sample)
def average(self):
"""Get the calculated average, or None if no samples yet."""
with self._lock:
return self._moving_average.get()
def reset(self):
"""Reset the average RTT."""
with self._lock:
return self._moving_average.reset()
def _run(self):
try:
# NOTE: This thread is only run when when using the streaming
# heartbeat protocol (MongoDB 4.4+).
# XXX: Skip check if the server is unknown?
rtt = self._ping()
self.add_sample(rtt)
except ReferenceError:
# Topology was garbage-collected.
self.close()
except Exception:
self._pool.reset()
def _ping(self):
"""Run a "hello" command and return the RTT."""
with self._pool.get_socket({}) as sock_info:
if self._executor._stopped:
raise Exception('_RttMonitor closed')
start = _time()
sock_info.hello()
return _time() - start
# Close monitors to cancel any in progress streaming checks before joining
# executor threads. For an explanation of how this works see the comment
# about _EXECUTORS in periodic_executor.py.
_MONITORS = set()
def _register(monitor):
ref = weakref.ref(monitor, _unregister)
_MONITORS.add(ref)
def _unregister(monitor_ref):
_MONITORS.remove(monitor_ref)
def _shutdown_monitors():
if _MONITORS is None:
return
# Copy the set. Closing monitors removes them.
monitors = list(_MONITORS)
# Close all monitors.
for ref in monitors:
monitor = ref()
if monitor:
monitor.gc_safe_close()
monitor = None
def _shutdown_resources():
# _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
shutdown = _shutdown_monitors
if shutdown:
shutdown()
shutdown = _shutdown_executors
if shutdown:
shutdown()
atexit.register(_shutdown_resources)