From f0daebf4bbc1038aedbcc155201f27d256940a59 Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Mon, 15 Dec 2014 15:40:47 -0500 Subject: [PATCH] PYTHON-799 Create a PeriodicExecutor class for background monitoring. The executor will also be used to solve a deadlock in Cursor.__del__. --- pymongo/monitor.py | 107 ++++++++--------------------- pymongo/periodic_executor.py | 126 +++++++++++++++++++++++++++++++++++ test/test_client.py | 4 +- test/test_monitor.py | 21 +++--- 4 files changed, 165 insertions(+), 93 deletions(-) create mode 100644 pymongo/periodic_executor.py diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 2b6174697..89faaca03 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -14,12 +14,9 @@ """Class to monitor a MongoDB server on a background thread.""" -import atexit -import threading -import time import weakref -from pymongo import common, helpers, message, thread_util +from pymongo import common, helpers, message, periodic_executor from pymongo.server_type import SERVER_TYPE from pymongo.ismaster import IsMaster from pymongo.monotonic import time as _time @@ -42,7 +39,6 @@ class Monitor(object): The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ - super(Monitor, self).__init__() self._server_description = server_description # A weakref callback, takes ref to the dead topology as its parameter. @@ -52,70 +48,52 @@ class Monitor(object): self._topology = weakref.proxy(topology, close) self._pool = pool self._settings = topology_settings - self._stopped = False - self._event = thread_util.Event(self._settings.condition_class) - self._thread = None self._avg_round_trip_time = MovingAverage() + # We strongly reference the executor and it weakly references us via + # this closure. When the monitor is freed, a call to target() raises + # ReferenceError and stops the executor. + def target(): + Monitor._run(weakref.proxy(self)) + + self._executor = periodic_executor.PeriodicExecutor( + condition_class=self._settings.condition_class, + interval=common.HEARTBEAT_FREQUENCY, + min_interval=common.MIN_HEARTBEAT_INTERVAL, + target=target) + def open(self): """Start monitoring, or restart after a fork. Multiple calls have no effect. """ - self._stopped = False - started = False - try: - started = self._thread and self._thread.is_alive() - except ReferenceError: - # Thread terminated. - pass - - if not started: - thread = threading.Thread(target=self.run) - thread.daemon = True - self._thread = weakref.proxy(thread) - register_monitor(self) - thread.start() + self._executor.open() def close(self): """Disconnect and stop monitoring. open() restarts the monitor after closing. """ - self._stopped = True + self._executor.close() + + # Increment the pool_id and maybe close the socket. If the executor + # thread has the socket checked out, it will be closed when checked in. self._pool.reset() - # Wake the thread so it notices that _stopped is True. - self.request_check() - def join(self, timeout=None): - if self._thread is not None: - try: - self._thread.join(timeout) - except ReferenceError: - # Thread already terminated. - pass + self._executor.join(timeout) def request_check(self): """If the monitor is sleeping, wake and check the server soon.""" - self._event.set() + self._executor.wake() - def run(self): - while not self._stopped: - try: - self._server_description = self._check_with_retry() - self._topology.on_change(self._server_description) - except ReferenceError: - # Topology was garbage-collected. - self.close() - else: - start = _time() - self._event.wait(common.HEARTBEAT_FREQUENCY) - self._event.clear() - wait_time = _time() - start - if wait_time < common.MIN_HEARTBEAT_INTERVAL: - # request_check() was called before min interval passed. - time.sleep(common.MIN_HEARTBEAT_INTERVAL - wait_time) + def _run(self): + try: + self._server_description = self._check_with_retry() + self._topology.on_change(self._server_description) + except ReferenceError: + # Topology was garbage-collected. + self.close() def _check_with_retry(self): """Call ismaster once or twice. Reset server's pool on error. @@ -175,34 +153,3 @@ class Monitor(object): raw_response = sock_info.receive_message(1, request_id) result = helpers._unpack_response(raw_response) return IsMaster(result['data'][0]), _time() - start - - -# MONITORS has a weakref to each running Monitor. A Monitor is kept alive by -# a strong reference from its Server and its Thread. Once both are destroyed -# the Monitor is garbage-collected and removed from MONITORS. If, however, -# any threads are still running when the interpreter begins to shut down, -# we attempt to halt and join them to avoid spurious errors. -MONITORS = set() - - -def register_monitor(monitor): - ref = weakref.ref(monitor, _on_monitor_deleted) - MONITORS.add(ref) - - -def _on_monitor_deleted(ref): - MONITORS.remove(ref) - - -def shutdown_monitors(): - # Keep a local copy of MONITORS as - # shutting down threads has a side effect - # of removing them from the MONITORS set() - monitors = list(MONITORS) - for ref in monitors: - monitor = ref() - if monitor: - monitor.close() - monitor.join(10) - -atexit.register(shutdown_monitors) diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py new file mode 100644 index 000000000..4f80a58ff --- /dev/null +++ b/pymongo/periodic_executor.py @@ -0,0 +1,126 @@ +# Copyright 2014 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Run a target function on a background thread.""" + +import atexit +import threading +import time +import weakref + +from pymongo import thread_util + + +class PeriodicExecutor(object): + def __init__(self, condition_class, interval, min_interval, target): + """"Run a target function periodically on a background thread. + + If the function raises an exception, the executor stops. + + :Parameters: + - `condition_class`: A class like threading.Condition. + - `interval`: Seconds between calls to `target`. + - `min_interval`: Minimum seconds between calls if `wake` is + called very often. + - `target`: A function. + """ + self._event = thread_util.Event(condition_class) + self._interval = interval + self._min_interval = min_interval + self._target = target + self._stopped = False + self._thread = None + + def open(self): + """Start. Multiple calls have no effect. + + Not safe to call from multiple threads at once. + """ + self._stopped = False + started = False + try: + started = self._thread and self._thread.is_alive() + except ReferenceError: + # Thread terminated. + pass + + if not started: + thread = threading.Thread(target=self._run) + thread.daemon = True + self._thread = weakref.proxy(thread) + _register_executor(self) + thread.start() + + def close(self): + """Stop. To restart, call open().""" + self._stopped = True + + # Wake the thread so it notices that _stopped is True. + self.wake() + + def join(self, timeout=None): + if self._thread is not None: + try: + self._thread.join(timeout) + except ReferenceError: + # Thread already terminated. + pass + + def wake(self): + """Execute the target function soon.""" + self._event.set() + + def _run(self): + while not self._stopped: + try: + self._target() + except Exception: + # It's the target's responsibility to report errors. + self._stopped = True + break + + # Avoid running too frequently if wake() is called very often. + time.sleep(self._min_interval) + self._event.wait(self._interval - self._min_interval) + self._event.clear() + + +# _EXECUTORS has a weakref to each running PeriodicExecutor. Once started, +# an executor is kept alive by a strong reference from its thread and perhaps +# from other objects. When the thread dies and all other referrers are freed, +# the executor is freed and removed from _EXECUTORS. If any threads are +# running when the interpreter begins to shut down, we try to halt and join +# them to avoid spurious errors. +_EXECUTORS = set() + + +def _register_executor(executor): + ref = weakref.ref(executor, _on_executor_deleted) + _EXECUTORS.add(ref) + + +def _on_executor_deleted(ref): + _EXECUTORS.remove(ref) + + +def _shutdown_executors(): + # Copy the set. Stopping threads has the side effect of removing executors. + executors = list(_EXECUTORS) + for ref in executors: + executor = ref() + if executor: + executor.close() + executor.join(10) + +atexit.register(_shutdown_executors) diff --git a/test/test_client.py b/test/test_client.py index 500c962c8..f5e0ee421 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -431,13 +431,13 @@ class TestClient(IntegrationTest): any_server_selector) # In child, only the thread that called fork() is alive. - assert not any(s._monitor._thread.is_alive() + assert not any(s._monitor._executor._thread.is_alive() for s in servers) db.test.find_one() wait_until( - lambda: all(s._monitor._thread.is_alive() + lambda: all(s._monitor._executor._thread.is_alive() for s in servers), "restart monitor threads") except: diff --git a/test/test_monitor.py b/test/test_monitor.py index 13b0e62bd..c669b9164 100644 --- a/test/test_monitor.py +++ b/test/test_monitor.py @@ -20,14 +20,14 @@ from functools import partial sys.path[0:0] = [""] -from pymongo.monitor import MONITORS +from pymongo.periodic_executor import _EXECUTORS from test import unittest, port, host, IntegrationTest from test.utils import single_client, one, connected, wait_until -def find_monitor_ref(monitor): - for ref in MONITORS.copy(): - if ref() is monitor: +def registered(executor): + for ref in _EXECUTORS.copy(): + if ref() is executor: return ref return None @@ -35,23 +35,22 @@ def find_monitor_ref(monitor): def unregistered(ref): gc.collect() - return ref not in MONITORS + return ref not in _EXECUTORS class TestMonitor(IntegrationTest): def test_atexit_hook(self): client = single_client(host, port) - monitor = one(client._topology._servers.values())._monitor + executor = one(client._topology._servers.values())._monitor._executor connected(client) - # The client registers a weakref to the monitor. - ref = wait_until(partial(find_monitor_ref, monitor), - 'register monitor') + # The executor stores a weakref to itself in _EXECUTORS. + ref = wait_until(partial(registered, executor), 'register executor') - del monitor + del executor del client - wait_until(partial(unregistered, ref), 'unregister monitor') + wait_until(partial(unregistered, ref), 'unregister executor') if __name__ == "__main__":