Prevent concurrent calls to Thread.start() and refactor RS client, PYTHON-560.

This commit is contained in:
A. Jesse Jiryu Davis 2013-11-01 09:07:34 -04:00
parent db1ec12f68
commit 263ac8f8b7
4 changed files with 127 additions and 60 deletions

View File

@ -58,6 +58,7 @@ from pymongo.errors import (AutoReconnect,
InvalidDocument,
OperationFailure,
InvalidOperation)
from pymongo.thread_util import DummyLock
EMPTY = b("")
MAX_RETRY = 3
@ -277,6 +278,10 @@ class Monitor(object):
def start_sync(self):
"""Start the Monitor and block until it's really started.
"""
# start() can return before the thread is fully bootstrapped,
# so a fork can leave the thread thinking it's alive in a child
# process when it's really dead:
# http://bugs.python.org/issue18418.
self.start() # Implemented in subclasses.
self.started_event.wait(5)
@ -337,13 +342,6 @@ class MonitorThread(threading.Thread, Monitor):
self.setName("ReplicaSetMonitorThread")
self.setDaemon(True)
# Track whether the thread has started. (Greenlets track this already.)
self.started = False
def start(self):
self.started = True
super(MonitorThread, self).start()
def run(self):
"""Override Thread's run method.
"""
@ -363,9 +361,16 @@ try:
"""Greenlet based replica set monitor.
"""
def __init__(self, rsc):
self.monitor_greenlet_alive = False
Monitor.__init__(self, rsc, Event)
Greenlet.__init__(self)
def start_sync(self):
self.monitor_greenlet_alive = True
# Call superclass.
Monitor.start_sync(self)
# Don't override `run` in a Greenlet. Add _run instead.
# Refer to gevent's Greenlet docs and source for more
# information.
@ -375,8 +380,11 @@ try:
self.monitor()
def isAlive(self):
# Gevent defines bool(Greenlet) as True if it's alive.
return bool(self)
# bool(self) isn't immediately True after someone calls start(),
# but isAlive() is. Thus it's safe for greenlets to do:
# "if not monitor.isAlive(): monitor.start()"
# ... and be guaranteed only one greenlet starts the monitor.
return self.monitor_greenlet_alive
except ImportError:
pass
@ -641,6 +649,7 @@ class MongoReplicaSetClient(common.BaseObject):
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__document_class = document_class
self.__monitor = None
self.__closed = False
# Compatibility with mongo_client.MongoClient
host = kwargs.pop('host', hosts_or_uri)
@ -765,17 +774,18 @@ class MongoReplicaSetClient(common.BaseObject):
# Common case: monitor RS with a background thread.
self.__monitor_class = MonitorThread
self.__monitor = self.__monitor_class(self)
register_monitor(self.__monitor)
self.__monitor_lock = threading.Lock()
if self.__use_greenlets:
# Greenlets don't need to lock around access to the monitor.
# A Greenlet can safely do:
# "if not self.__monitor: self.__monitor = monitor_class()"
# because it won't be interrupted between the check and the
# assignment.
self.__monitor_lock = DummyLock()
else:
self.__monitor_lock = threading.Lock()
if _connect:
# Wait for the monitor to really start. Otherwise if we return to
# caller and caller forks immediately, the monitor could think it's
# still alive in the child process when it really isn't.
# See http://bugs.python.org/issue18418.
self.__monitor.start_sync()
self.__ensure_monitor()
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
@ -1074,28 +1084,26 @@ class MongoReplicaSetClient(common.BaseObject):
is in progress, the work of refreshing the state is only performed
once.
"""
if not self.__monitor:
if self.__closed:
raise InvalidOperation('MongoReplicaSetClient has been closed')
if not self.__monitor.isAlive():
# We've forked since monitor was created.
self.__restart_monitor()
self.__monitor.schedule_refresh()
monitor = self.__ensure_monitor()
monitor.schedule_refresh()
if sync:
self.__monitor.wait_for_refresh(timeout_seconds=5)
monitor.wait_for_refresh(timeout_seconds=5)
def __restart_monitor(self):
def __ensure_monitor(self):
"""Ensure the monitor is started, and return it."""
self.__monitor_lock.acquire()
try:
# Another thread may have restarted the monitor while we were
# waiting for the lock.
if self.__monitor.isAlive():
return
# Another thread can start the monitor while we wait for the lock.
if self.__monitor is not None and self.__monitor.isAlive():
return self.__monitor
self.__monitor = self.__monitor_class(self)
register_monitor(self.__monitor)
self.__monitor.start_sync()
monitor = self.__monitor = self.__monitor_class(self)
register_monitor(monitor)
monitor.start_sync()
return monitor
finally:
self.__monitor_lock.release()
@ -1261,13 +1269,8 @@ class MongoReplicaSetClient(common.BaseObject):
"""Ensure this client instance is connected to a primary.
"""
# This may be the first time we're connecting to the set.
if self.__monitor and not self.__monitor.started:
try:
self.__monitor.start()
# Minor race condition. It's possible that two (or more)
# threads could call monitor.start() consecutively. Just pass.
except RuntimeError:
pass
self.__ensure_monitor()
if sync:
rs_state = self.__rs_state
if not rs_state.primary_member:
@ -1301,6 +1304,7 @@ class MongoReplicaSetClient(common.BaseObject):
.. versionchanged:: 2.2.1
The :meth:`close` method now terminates the replica set monitor.
"""
self.__closed = True
self.__rs_state = RSState(self.__make_threadlocal())
monitor, self.__monitor = self.__monitor, None

View File

@ -44,6 +44,14 @@ from pymongo.errors import ExceededMaxWaiters
issue1868 = (sys.version_info[:3] <= (2, 7, 0))
class DummyLock(object):
def acquire(self):
pass
def release(self):
pass
class Ident(object):
def __init__(self):
self._refs = {}
@ -67,20 +75,13 @@ class Ident(object):
class ThreadIdent(Ident):
class _DummyLock(object):
def acquire(self):
pass
def release(self):
pass
def __init__(self):
super(ThreadIdent, self).__init__()
self._local = threading.local()
if issue1868:
self._lock = threading.Lock()
else:
self._lock = ThreadIdent._DummyLock()
self._lock = DummyLock()
# We watch for thread-death using a weakref callback to a thread local.
# Weakrefs are permitted on subclasses of object but not object() itself.

View File

@ -1179,5 +1179,12 @@ class TestReplicaSetClientLazyConnect(
pass
# Test concurrent access to a lazily-connecting RS client, with Gevent.
class TestReplicaSetClientLazyConnectGevent(
TestReplicaSetClientBase,
_TestLazyConnectMixin):
use_greenlets = True
if __name__ == "__main__":
unittest.main()

View File

@ -15,13 +15,23 @@
"""Utilities for testing pymongo
"""
import sys
import threading
from nose.plugins.skip import SkipTest
from pymongo import MongoClient, MongoReplicaSetClient
from pymongo.errors import AutoReconnect
from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo
from test import host, port, version
PY3 = sys.version_info[0] == 3
try:
import gevent
has_gevent = True
except ImportError:
has_gevent = False
# No functools in Python 2.4
def my_partial(f, *args, **kwargs):
@ -352,37 +362,82 @@ class TestRequestMixin(object):
class _TestLazyConnectMixin(object):
"""Inherit from this class and from unittest.TestCase, and override
_get_client(self, **kwargs), for testing clients with _connect=False.
"""Test concurrent operations on a lazily-connecting client.
Inherit from this class and from unittest.TestCase, and override
_get_client(self, **kwargs), for testing a lazily-connecting
client, i.e. a client initialized with _connect=False.
Set use_greenlets = True to test with Gevent.
"""
use_greenlets = False
ntrials = 10
nthreads = 10
interval = None
def run_threads(self, collection, target):
"""Run a target function in many threads.
target is a function taking a Collection and an integer.
"""
threads = [
threading.Thread(target=my_partial(target, collection, i))
for i in range(self.nthreads)]
threads = []
for i in range(self.nthreads):
bound_target = my_partial(target, collection, i)
if self.use_greenlets:
threads.append(gevent.Greenlet(run=bound_target))
else:
threads.append(threading.Thread(target=bound_target))
for t in threads:
t.start()
for t in threads:
t.join(30)
assert not t.isAlive()
if self.use_greenlets:
# bool(Greenlet) is True if it's alive.
assert not t
else:
assert not t.isAlive()
def trial(self, reset, target, test):
"""Test concurrent operations on a lazily-connecting client.
`reset` takes a collection and resets it for the next trial.
`target` takes a lazily-connecting collection and an index from
0 to nthreads, and performs some operation, e.g. an insert.
`test` takes a collection and asserts a post-condition to prove
`target` succeeded.
"""
if self.use_greenlets and not has_gevent:
raise SkipTest('Gevent not installed')
collection = self._get_client().pymongo_test.test
for i in range(self.ntrials):
reset(collection)
lazy_client = self._get_client(_connect=False)
lazy_collection = lazy_client.pymongo_test.test
self.run_threads(lazy_collection, target)
test(collection)
# Make concurrency bugs more likely to manifest.
if PY3:
self.interval = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
else:
self.interval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
for i in range(self.ntrials):
reset(collection)
lazy_client = self._get_client(
_connect=False, use_greenlets=self.use_greenlets)
lazy_collection = lazy_client.pymongo_test.test
self.run_threads(lazy_collection, target)
test(collection)
finally:
if PY3:
sys.setswitchinterval(self.interval)
else:
sys.setcheckinterval(self.interval)
def test_insert(self):
def reset(collection):