diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 5a164e635..bd084dd3e 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -58,6 +58,7 @@ from pymongo.errors import (AutoReconnect, InvalidDocument, OperationFailure, InvalidOperation) +from pymongo.thread_util import DummyLock EMPTY = b("") MAX_RETRY = 3 @@ -277,6 +278,10 @@ class Monitor(object): def start_sync(self): """Start the Monitor and block until it's really started. """ + # start() can return before the thread is fully bootstrapped, + # so a fork can leave the thread thinking it's alive in a child + # process when it's really dead: + # http://bugs.python.org/issue18418. self.start() # Implemented in subclasses. self.started_event.wait(5) @@ -337,13 +342,6 @@ class MonitorThread(threading.Thread, Monitor): self.setName("ReplicaSetMonitorThread") self.setDaemon(True) - # Track whether the thread has started. (Greenlets track this already.) - self.started = False - - def start(self): - self.started = True - super(MonitorThread, self).start() - def run(self): """Override Thread's run method. """ @@ -363,9 +361,16 @@ try: """Greenlet based replica set monitor. """ def __init__(self, rsc): + self.monitor_greenlet_alive = False Monitor.__init__(self, rsc, Event) Greenlet.__init__(self) + def start_sync(self): + self.monitor_greenlet_alive = True + + # Call superclass. + Monitor.start_sync(self) + # Don't override `run` in a Greenlet. Add _run instead. # Refer to gevent's Greenlet docs and source for more # information. @@ -375,8 +380,11 @@ try: self.monitor() def isAlive(self): - # Gevent defines bool(Greenlet) as True if it's alive. - return bool(self) + # bool(self) isn't immediately True after someone calls start(), + # but isAlive() is. Thus it's safe for greenlets to do: + # "if not monitor.isAlive(): monitor.start()" + # ... and be guaranteed only one greenlet starts the monitor. + return self.monitor_greenlet_alive except ImportError: pass @@ -641,6 +649,7 @@ class MongoReplicaSetClient(common.BaseObject): self.__tz_aware = common.validate_boolean('tz_aware', tz_aware) self.__document_class = document_class self.__monitor = None + self.__closed = False # Compatibility with mongo_client.MongoClient host = kwargs.pop('host', hosts_or_uri) @@ -765,17 +774,18 @@ class MongoReplicaSetClient(common.BaseObject): # Common case: monitor RS with a background thread. self.__monitor_class = MonitorThread - self.__monitor = self.__monitor_class(self) - register_monitor(self.__monitor) - - self.__monitor_lock = threading.Lock() + if self.__use_greenlets: + # Greenlets don't need to lock around access to the monitor. + # A Greenlet can safely do: + # "if not self.__monitor: self.__monitor = monitor_class()" + # because it won't be interrupted between the check and the + # assignment. + self.__monitor_lock = DummyLock() + else: + self.__monitor_lock = threading.Lock() if _connect: - # Wait for the monitor to really start. Otherwise if we return to - # caller and caller forks immediately, the monitor could think it's - # still alive in the child process when it really isn't. - # See http://bugs.python.org/issue18418. - self.__monitor.start_sync() + self.__ensure_monitor() def _cached(self, dbname, coll, index): """Test if `index` is cached. @@ -1074,28 +1084,26 @@ class MongoReplicaSetClient(common.BaseObject): is in progress, the work of refreshing the state is only performed once. """ - if not self.__monitor: + if self.__closed: raise InvalidOperation('MongoReplicaSetClient has been closed') - if not self.__monitor.isAlive(): - # We've forked since monitor was created. - self.__restart_monitor() - - self.__monitor.schedule_refresh() + monitor = self.__ensure_monitor() + monitor.schedule_refresh() if sync: - self.__monitor.wait_for_refresh(timeout_seconds=5) + monitor.wait_for_refresh(timeout_seconds=5) - def __restart_monitor(self): + def __ensure_monitor(self): + """Ensure the monitor is started, and return it.""" self.__monitor_lock.acquire() try: - # Another thread may have restarted the monitor while we were - # waiting for the lock. - if self.__monitor.isAlive(): - return + # Another thread can start the monitor while we wait for the lock. + if self.__monitor is not None and self.__monitor.isAlive(): + return self.__monitor - self.__monitor = self.__monitor_class(self) - register_monitor(self.__monitor) - self.__monitor.start_sync() + monitor = self.__monitor = self.__monitor_class(self) + register_monitor(monitor) + monitor.start_sync() + return monitor finally: self.__monitor_lock.release() @@ -1261,13 +1269,8 @@ class MongoReplicaSetClient(common.BaseObject): """Ensure this client instance is connected to a primary. """ # This may be the first time we're connecting to the set. - if self.__monitor and not self.__monitor.started: - try: - self.__monitor.start() - # Minor race condition. It's possible that two (or more) - # threads could call monitor.start() consecutively. Just pass. - except RuntimeError: - pass + self.__ensure_monitor() + if sync: rs_state = self.__rs_state if not rs_state.primary_member: @@ -1301,6 +1304,7 @@ class MongoReplicaSetClient(common.BaseObject): .. versionchanged:: 2.2.1 The :meth:`close` method now terminates the replica set monitor. """ + self.__closed = True self.__rs_state = RSState(self.__make_threadlocal()) monitor, self.__monitor = self.__monitor, None diff --git a/pymongo/thread_util.py b/pymongo/thread_util.py index 0d01d46fb..a74682095 100644 --- a/pymongo/thread_util.py +++ b/pymongo/thread_util.py @@ -44,6 +44,14 @@ from pymongo.errors import ExceededMaxWaiters issue1868 = (sys.version_info[:3] <= (2, 7, 0)) +class DummyLock(object): + def acquire(self): + pass + + def release(self): + pass + + class Ident(object): def __init__(self): self._refs = {} @@ -67,20 +75,13 @@ class Ident(object): class ThreadIdent(Ident): - class _DummyLock(object): - def acquire(self): - pass - - def release(self): - pass - def __init__(self): super(ThreadIdent, self).__init__() self._local = threading.local() if issue1868: self._lock = threading.Lock() else: - self._lock = ThreadIdent._DummyLock() + self._lock = DummyLock() # We watch for thread-death using a weakref callback to a thread local. # Weakrefs are permitted on subclasses of object but not object() itself. diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 34fa9f49c..7ac50e186 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -1179,5 +1179,12 @@ class TestReplicaSetClientLazyConnect( pass +# Test concurrent access to a lazily-connecting RS client, with Gevent. +class TestReplicaSetClientLazyConnectGevent( + TestReplicaSetClientBase, + _TestLazyConnectMixin): + use_greenlets = True + + if __name__ == "__main__": unittest.main() diff --git a/test/utils.py b/test/utils.py index 96cfd87d8..df1095641 100644 --- a/test/utils.py +++ b/test/utils.py @@ -15,13 +15,23 @@ """Utilities for testing pymongo """ +import sys import threading +from nose.plugins.skip import SkipTest from pymongo import MongoClient, MongoReplicaSetClient from pymongo.errors import AutoReconnect from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo from test import host, port, version +PY3 = sys.version_info[0] == 3 + +try: + import gevent + has_gevent = True +except ImportError: + has_gevent = False + # No functools in Python 2.4 def my_partial(f, *args, **kwargs): @@ -352,37 +362,82 @@ class TestRequestMixin(object): class _TestLazyConnectMixin(object): - """Inherit from this class and from unittest.TestCase, and override - _get_client(self, **kwargs), for testing clients with _connect=False. + """Test concurrent operations on a lazily-connecting client. + + Inherit from this class and from unittest.TestCase, and override + _get_client(self, **kwargs), for testing a lazily-connecting + client, i.e. a client initialized with _connect=False. + + Set use_greenlets = True to test with Gevent. """ + use_greenlets = False ntrials = 10 nthreads = 10 + interval = None def run_threads(self, collection, target): """Run a target function in many threads. target is a function taking a Collection and an integer. """ - threads = [ - threading.Thread(target=my_partial(target, collection, i)) - for i in range(self.nthreads)] + threads = [] + for i in range(self.nthreads): + bound_target = my_partial(target, collection, i) + if self.use_greenlets: + threads.append(gevent.Greenlet(run=bound_target)) + else: + threads.append(threading.Thread(target=bound_target)) for t in threads: t.start() for t in threads: t.join(30) - assert not t.isAlive() + if self.use_greenlets: + # bool(Greenlet) is True if it's alive. + assert not t + else: + assert not t.isAlive() def trial(self, reset, target, test): + """Test concurrent operations on a lazily-connecting client. + + `reset` takes a collection and resets it for the next trial. + + `target` takes a lazily-connecting collection and an index from + 0 to nthreads, and performs some operation, e.g. an insert. + + `test` takes a collection and asserts a post-condition to prove + `target` succeeded. + """ + if self.use_greenlets and not has_gevent: + raise SkipTest('Gevent not installed') + collection = self._get_client().pymongo_test.test - for i in range(self.ntrials): - reset(collection) - lazy_client = self._get_client(_connect=False) - lazy_collection = lazy_client.pymongo_test.test - self.run_threads(lazy_collection, target) - test(collection) + # Make concurrency bugs more likely to manifest. + if PY3: + self.interval = sys.getswitchinterval() + sys.setswitchinterval(1e-6) + else: + self.interval = sys.getcheckinterval() + sys.setcheckinterval(1) + + try: + for i in range(self.ntrials): + reset(collection) + lazy_client = self._get_client( + _connect=False, use_greenlets=self.use_greenlets) + + lazy_collection = lazy_client.pymongo_test.test + self.run_threads(lazy_collection, target) + test(collection) + + finally: + if PY3: + sys.setswitchinterval(self.interval) + else: + sys.setcheckinterval(self.interval) def test_insert(self): def reset(collection):