PYTHON-799 Create a PeriodicExecutor class for background monitoring.

The executor will also be used to solve a deadlock in Cursor.__del__.
This commit is contained in:
A. Jesse Jiryu Davis 2014-12-15 15:40:47 -05:00
parent 34398d5bf7
commit f0daebf4bb
4 changed files with 165 additions and 93 deletions

View File

@ -14,12 +14,9 @@
"""Class to monitor a MongoDB server on a background thread."""
import atexit
import threading
import time
import weakref
from pymongo import common, helpers, message, thread_util
from pymongo import common, helpers, message, periodic_executor
from pymongo.server_type import SERVER_TYPE
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
@ -42,7 +39,6 @@ class Monitor(object):
The Topology is weakly referenced. The Pool must be exclusive to this
Monitor.
"""
super(Monitor, self).__init__()
self._server_description = server_description
# A weakref callback, takes ref to the dead topology as its parameter.
@ -52,70 +48,52 @@ class Monitor(object):
self._topology = weakref.proxy(topology, close)
self._pool = pool
self._settings = topology_settings
self._stopped = False
self._event = thread_util.Event(self._settings.condition_class)
self._thread = None
self._avg_round_trip_time = MovingAverage()
# We strongly reference the executor and it weakly references us via
# this closure. When the monitor is freed, a call to target() raises
# ReferenceError and stops the executor.
def target():
Monitor._run(weakref.proxy(self))
self._executor = periodic_executor.PeriodicExecutor(
condition_class=self._settings.condition_class,
interval=common.HEARTBEAT_FREQUENCY,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target)
def open(self):
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
self._stopped = False
started = False
try:
started = self._thread and self._thread.is_alive()
except ReferenceError:
# Thread terminated.
pass
if not started:
thread = threading.Thread(target=self.run)
thread.daemon = True
self._thread = weakref.proxy(thread)
register_monitor(self)
thread.start()
self._executor.open()
def close(self):
"""Disconnect and stop monitoring.
open() restarts the monitor after closing.
"""
self._stopped = True
self._executor.close()
# Increment the pool_id and maybe close the socket. If the executor
# thread has the socket checked out, it will be closed when checked in.
self._pool.reset()
# Wake the thread so it notices that _stopped is True.
self.request_check()
def join(self, timeout=None):
if self._thread is not None:
try:
self._thread.join(timeout)
except ReferenceError:
# Thread already terminated.
pass
self._executor.join(timeout)
def request_check(self):
"""If the monitor is sleeping, wake and check the server soon."""
self._event.set()
self._executor.wake()
def run(self):
while not self._stopped:
try:
self._server_description = self._check_with_retry()
self._topology.on_change(self._server_description)
except ReferenceError:
# Topology was garbage-collected.
self.close()
else:
start = _time()
self._event.wait(common.HEARTBEAT_FREQUENCY)
self._event.clear()
wait_time = _time() - start
if wait_time < common.MIN_HEARTBEAT_INTERVAL:
# request_check() was called before min interval passed.
time.sleep(common.MIN_HEARTBEAT_INTERVAL - wait_time)
def _run(self):
try:
self._server_description = self._check_with_retry()
self._topology.on_change(self._server_description)
except ReferenceError:
# Topology was garbage-collected.
self.close()
def _check_with_retry(self):
"""Call ismaster once or twice. Reset server's pool on error.
@ -175,34 +153,3 @@ class Monitor(object):
raw_response = sock_info.receive_message(1, request_id)
result = helpers._unpack_response(raw_response)
return IsMaster(result['data'][0]), _time() - start
# MONITORS has a weakref to each running Monitor. A Monitor is kept alive by
# a strong reference from its Server and its Thread. Once both are destroyed
# the Monitor is garbage-collected and removed from MONITORS. If, however,
# any threads are still running when the interpreter begins to shut down,
# we attempt to halt and join them to avoid spurious errors.
MONITORS = set()
def register_monitor(monitor):
ref = weakref.ref(monitor, _on_monitor_deleted)
MONITORS.add(ref)
def _on_monitor_deleted(ref):
MONITORS.remove(ref)
def shutdown_monitors():
# Keep a local copy of MONITORS as
# shutting down threads has a side effect
# of removing them from the MONITORS set()
monitors = list(MONITORS)
for ref in monitors:
monitor = ref()
if monitor:
monitor.close()
monitor.join(10)
atexit.register(shutdown_monitors)

View File

@ -0,0 +1,126 @@
# Copyright 2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Run a target function on a background thread."""
import atexit
import threading
import time
import weakref
from pymongo import thread_util
class PeriodicExecutor(object):
def __init__(self, condition_class, interval, min_interval, target):
""""Run a target function periodically on a background thread.
If the function raises an exception, the executor stops.
:Parameters:
- `condition_class`: A class like threading.Condition.
- `interval`: Seconds between calls to `target`.
- `min_interval`: Minimum seconds between calls if `wake` is
called very often.
- `target`: A function.
"""
self._event = thread_util.Event(condition_class)
self._interval = interval
self._min_interval = min_interval
self._target = target
self._stopped = False
self._thread = None
def open(self):
"""Start. Multiple calls have no effect.
Not safe to call from multiple threads at once.
"""
self._stopped = False
started = False
try:
started = self._thread and self._thread.is_alive()
except ReferenceError:
# Thread terminated.
pass
if not started:
thread = threading.Thread(target=self._run)
thread.daemon = True
self._thread = weakref.proxy(thread)
_register_executor(self)
thread.start()
def close(self):
"""Stop. To restart, call open()."""
self._stopped = True
# Wake the thread so it notices that _stopped is True.
self.wake()
def join(self, timeout=None):
if self._thread is not None:
try:
self._thread.join(timeout)
except ReferenceError:
# Thread already terminated.
pass
def wake(self):
"""Execute the target function soon."""
self._event.set()
def _run(self):
while not self._stopped:
try:
self._target()
except Exception:
# It's the target's responsibility to report errors.
self._stopped = True
break
# Avoid running too frequently if wake() is called very often.
time.sleep(self._min_interval)
self._event.wait(self._interval - self._min_interval)
self._event.clear()
# _EXECUTORS has a weakref to each running PeriodicExecutor. Once started,
# an executor is kept alive by a strong reference from its thread and perhaps
# from other objects. When the thread dies and all other referrers are freed,
# the executor is freed and removed from _EXECUTORS. If any threads are
# running when the interpreter begins to shut down, we try to halt and join
# them to avoid spurious errors.
_EXECUTORS = set()
def _register_executor(executor):
ref = weakref.ref(executor, _on_executor_deleted)
_EXECUTORS.add(ref)
def _on_executor_deleted(ref):
_EXECUTORS.remove(ref)
def _shutdown_executors():
# Copy the set. Stopping threads has the side effect of removing executors.
executors = list(_EXECUTORS)
for ref in executors:
executor = ref()
if executor:
executor.close()
executor.join(10)
atexit.register(_shutdown_executors)

View File

@ -431,13 +431,13 @@ class TestClient(IntegrationTest):
any_server_selector)
# In child, only the thread that called fork() is alive.
assert not any(s._monitor._thread.is_alive()
assert not any(s._monitor._executor._thread.is_alive()
for s in servers)
db.test.find_one()
wait_until(
lambda: all(s._monitor._thread.is_alive()
lambda: all(s._monitor._executor._thread.is_alive()
for s in servers),
"restart monitor threads")
except:

View File

@ -20,14 +20,14 @@ from functools import partial
sys.path[0:0] = [""]
from pymongo.monitor import MONITORS
from pymongo.periodic_executor import _EXECUTORS
from test import unittest, port, host, IntegrationTest
from test.utils import single_client, one, connected, wait_until
def find_monitor_ref(monitor):
for ref in MONITORS.copy():
if ref() is monitor:
def registered(executor):
for ref in _EXECUTORS.copy():
if ref() is executor:
return ref
return None
@ -35,23 +35,22 @@ def find_monitor_ref(monitor):
def unregistered(ref):
gc.collect()
return ref not in MONITORS
return ref not in _EXECUTORS
class TestMonitor(IntegrationTest):
def test_atexit_hook(self):
client = single_client(host, port)
monitor = one(client._topology._servers.values())._monitor
executor = one(client._topology._servers.values())._monitor._executor
connected(client)
# The client registers a weakref to the monitor.
ref = wait_until(partial(find_monitor_ref, monitor),
'register monitor')
# The executor stores a weakref to itself in _EXECUTORS.
ref = wait_until(partial(registered, executor), 'register executor')
del monitor
del executor
del client
wait_until(partial(unregistered, ref), 'unregister monitor')
wait_until(partial(unregistered, ref), 'unregister executor')
if __name__ == "__main__":