PYTHON-724 Remove Gevent-specific code.

PyMongo 2.x has Gevent-specific code to support Gevent with or without
patch_thread. However, patch_socket is always required with Gevent.

In PyMongo 3, we remove all Gevent-specific code and rely on Gevent's
patch_all.

Remove the "use_greenlets" option and attribute for MongoClient and
MongoReplicaSetClient.
This commit is contained in:
A. Jesse Jiryu Davis 2014-07-15 19:05:14 -04:00
parent 2177256da0
commit 1bfac99f08
7 changed files with 81 additions and 392 deletions

View File

@ -279,7 +279,6 @@ VALIDATORS = {
'latencythresholdms': validate_positive_float,
'secondaryacceptablelatencyms': validate_positive_float,
'auto_start_request': validate_boolean,
'use_greenlets': validate_boolean,
'authmechanism': validate_auth_mechanism,
'authsource': validate_string,
'gssapiservicename': validate_string,

View File

@ -292,7 +292,6 @@ class MongoClient(common.BaseObject):
socket_timeout = options.get('sockettimeoutms')
wait_queue_timeout = options.get('waitqueuetimeoutms')
wait_queue_multiple = options.get('waitqueuemultiple')
use_greenlets = options.get('use_greenlets', False)
socket_keepalive = options.get('socketkeepalive', False)
ssl_context = None
@ -330,26 +329,17 @@ class MongoClient(common.BaseObject):
wait_queue_timeout=wait_queue_timeout,
wait_queue_multiple=wait_queue_multiple,
ssl_context=ssl_context,
use_greenlets=use_greenlets,
socket_keepalive=socket_keepalive)
self.__pool_class = pool_class
self.__connecting = False
if use_greenlets:
# Greenlets don't need to lock around access to the Member;
# they're only interrupted when they do I/O.
self.__connecting_lock = thread_util.DummyLock()
else:
self.__connecting_lock = threading.Lock()
self.__connecting_lock = threading.Lock()
if event_class:
self.__event_class = event_class
else:
# Prevent a cycle; this lambda shouldn't refer to self.
g = use_greenlets
event_class = lambda: thread_util.create_event(g)
self.__event_class = event_class
self.__event_class = threading.Event
self.__future_member = None
self.__document_class = document_class
@ -556,15 +546,6 @@ class MongoClient(common.BaseObject):
"""
return self.__pool_opts.max_pool_size
@property
def use_greenlets(self):
"""Whether calling :meth:`start_request` assigns greenlet-local,
rather than thread-local, sockets.
.. versionadded:: 2.4.2
"""
return self.__pool_opts.use_greenlets
@property
def nodes(self):
"""List of all known nodes.

View File

@ -109,7 +109,7 @@ def _partition_node(node):
# Concurrency notes: A MongoReplicaSetClient keeps its view of the replica-set
# state in an RSState instance. RSStates are immutable, except for
# host-pinning. Pools, which are internally thread / greenlet safe, can be
# host-pinning. Pools, which are internally thread-safe, can be
# copied from old to new RSStates safely. The client updates its view of the
# set's state not by modifying its RSState but by replacing it with an updated
# copy.
@ -133,7 +133,7 @@ def _partition_node(node):
class RSState(object):
def __init__(
self, threadlocal, hosts=None, host_to_member=None, arbiters=None,
self, threadlocal=None, hosts=None, host_to_member=None, arbiters=None,
writer=None, error_message='No primary available', exc=None,
initial=False):
"""An immutable snapshot of the client's view of the replica set state.
@ -143,7 +143,7 @@ class RSState(object):
in the most recent ismaster response.
:Parameters:
- `threadlocal`: Thread-local storage
- `threadlocal`: Optional thread-local storage
- `hosts`: Sequence of (host, port) pairs
- `host_to_member`: Optional dict: (host, port) -> Member instance
- `arbiters`: Optional sequence of arbiters as (host, port)
@ -152,7 +152,7 @@ class RSState(object):
- `exc`: Optional error if state is unusable
- `initial`: Whether this is the initial client state
"""
self._threadlocal = threadlocal # threading.local
self._threadlocal = threadlocal or threading.local()
self._arbiters = frozenset(arbiters or []) # set of (host, port)
self._writer = writer # (host, port) of the primary, or None
self._error_message = error_message
@ -191,17 +191,12 @@ class RSState(object):
self._error_message,
self._exc)
def clone_without_writer(self, threadlocal):
"""Get a clone without a primary. Unpins all threads.
:Parameters:
- `threadlocal`: Thread-local storage
"""
def clone_without_writer(self):
"""Get a clone without a primary. Unpins all threads."""
return RSState(
threadlocal,
self._hosts,
self._host_to_member,
self._arbiters)
hosts=self._hosts,
host_to_member=self._host_to_member,
arbiters=self._arbiters)
def clone_with_error(self, exc):
return RSState(
@ -387,48 +382,6 @@ class MonitorThread(threading.Thread, Monitor):
self.monitor()
have_gevent = False
try:
from gevent import Greenlet
from gevent.event import Event
# Used by ReplicaSetConnection
from gevent.local import local as gevent_local
have_gevent = True
class MonitorGreenlet(Monitor, Greenlet):
"""Greenlet based replica set monitor.
"""
def __init__(self, rsc):
self.monitor_greenlet_alive = False
Monitor.__init__(self, rsc, Event)
Greenlet.__init__(self)
def start_sync(self):
self.monitor_greenlet_alive = True
# Call superclass.
Monitor.start_sync(self)
# Don't override `run` in a Greenlet. Add _run instead.
# Refer to gevent's Greenlet docs and source for more
# information.
def _run(self):
"""Define Greenlet's _run method.
"""
self.monitor()
def isAlive(self):
# bool(self) isn't immediately True after someone calls start(),
# but isAlive() is. Thus it's safe for greenlets to do:
# "if not monitor.isAlive(): monitor.start()"
# ... and be guaranteed only one greenlet starts the monitor.
return self.monitor_greenlet_alive
except ImportError:
pass
class MongoReplicaSetClient(common.BaseObject):
"""Connection to a MongoDB replica set.
"""
@ -619,15 +572,10 @@ class MongoReplicaSetClient(common.BaseObject):
socket_timeout = self.__opts.get('sockettimeoutms')
wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
wait_queue_multiple = self.__opts.get('waitqueuemultiple')
self.__use_greenlets = self.__opts.get('use_greenlets', False)
if self.__use_greenlets and not have_gevent:
raise ConfigurationError(
"The gevent module is not available. "
"Install the gevent package from PyPI.")
self.__rs_state = RSState(self.__make_threadlocal(), initial=True)
self.__rs_state = RSState(initial=True)
self.__request_counter = thread_util.Counter(self.__use_greenlets)
self.__request_counter = thread_util.Counter()
self.__auto_start_request = self.__opts.get('auto_start_request', False)
if self.__auto_start_request:
@ -674,9 +622,7 @@ class MongoReplicaSetClient(common.BaseObject):
wait_queue_timeout=wait_queue_timeout,
wait_queue_multiple=wait_queue_multiple,
ssl_context=ssl_context,
use_greenlets=self.__use_greenlets,
socket_keepalive=socket_keepalive
)
socket_keepalive=socket_keepalive)
super(MongoReplicaSetClient, self).__init__(**self.__opts)
@ -706,21 +652,9 @@ class MongoReplicaSetClient(common.BaseObject):
# Start the monitor after we know the configuration is correct.
if not self.__monitor_class:
if self.__use_greenlets:
self.__monitor_class = MonitorGreenlet
else:
# Common case: monitor RS with a background thread.
self.__monitor_class = MonitorThread
self.__monitor_class = MonitorThread
if self.__use_greenlets:
# Greenlets don't need to lock around access to the monitor.
# A Greenlet can safely do:
# "if not self.__monitor: self.__monitor = monitor_class()"
# because it won't be interrupted between the check and the
# assignment.
self.__monitor_lock = DummyLock()
else:
self.__monitor_lock = threading.Lock()
self.__monitor_lock = threading.Lock()
if _connect:
self.__ensure_monitor()
@ -902,15 +836,6 @@ class MongoReplicaSetClient(common.BaseObject):
"""
return self.__pool_opts.max_pool_size
@property
def use_greenlets(self):
"""Whether calling :meth:`start_request` assigns greenlet-local,
rather than thread-local, sockets.
.. versionadded:: 2.4.2
"""
return self.__pool_opts.use_greenlets
def get_document_class(self):
"""document_class getter"""
return self.__document_class
@ -1059,12 +984,6 @@ class MongoReplicaSetClient(common.BaseObject):
finally:
self.__monitor_lock.release()
def __make_threadlocal(self):
if self.__use_greenlets:
return gevent_local()
else:
return threading.local()
def refresh(self, initial=False):
"""Iterate through the existing host list, or possibly the
seed list, to update the list of hosts and arbiters in this
@ -1201,7 +1120,7 @@ class MongoReplicaSetClient(common.BaseObject):
else:
# We unpin threads from members if the primary has changed, since
# no monotonic consistency can be promised now anyway.
threadlocal = self.__make_threadlocal()
threadlocal = None
# Get list of hosts in the RS config, including unreachable ones.
# Prefer the primary's list, otherwise any member's list.
@ -1286,8 +1205,7 @@ class MongoReplicaSetClient(common.BaseObject):
if rs_state.primary_member:
rs_state.primary_member.pool.reset()
threadlocal = self.__make_threadlocal()
self.__rs_state = rs_state.clone_without_writer(threadlocal)
self.__rs_state = rs_state.clone_without_writer()
self.__schedule_refresh()
def close(self):
@ -1307,7 +1225,7 @@ class MongoReplicaSetClient(common.BaseObject):
The :meth:`close` method now terminates the replica set monitor.
"""
self.__closed = True
self.__rs_state = RSState(self.__make_threadlocal())
self.__rs_state = RSState()
monitor, self.__monitor = self.__monitor, None
if monitor:

View File

@ -50,12 +50,12 @@ class PoolOptions(object):
__slots__ = ('__max_pool_size', '__connect_timeout', '__socket_timeout',
'__wait_queue_timeout', '__wait_queue_multiple',
'__ssl_context', '__use_greenlets', '__socket_keepalive')
'__ssl_context', '__socket_keepalive')
def __init__(self, max_pool_size=100, connect_timeout=None,
socket_timeout=None, wait_queue_timeout=None,
wait_queue_multiple=None, ssl_context=None,
use_greenlets=False, socket_keepalive=False):
socket_keepalive=False):
self.__max_pool_size = max_pool_size
self.__connect_timeout = connect_timeout
@ -63,7 +63,6 @@ class PoolOptions(object):
self.__wait_queue_timeout = wait_queue_timeout
self.__wait_queue_multiple = wait_queue_multiple
self.__ssl_context = ssl_context
self.__use_greenlets = use_greenlets
self.__socket_keepalive = socket_keepalive
@property
@ -106,12 +105,6 @@ class PoolOptions(object):
"""
return self.__ssl_context
@property
def use_greenlets(self):
"""Use greenlet ids for "thread affinity" in requests.
"""
return self.__use_greenlets
@property
def socket_keepalive(self):
"""Whether to send periodic messages to determine if a connection
@ -251,17 +244,10 @@ class Pool:
# Map self._ident.get() -> request socket
self._tid_to_sock = {}
if self.opts.use_greenlets and not thread_util.have_gevent:
raise ConfigurationError(
"The Gevent module is not available. "
"Install the gevent package from PyPI."
)
self._ident = thread_util.create_ident(self.opts.use_greenlets)
self._ident = thread_util.ThreadIdent()
# Count the number of calls to start_request() per thread.
self._request_counter = thread_util.Counter(self.opts.use_greenlets)
self._request_counter = thread_util.Counter()
if (self.opts.wait_queue_multiple is None or
self.opts.max_pool_size is None):
@ -271,7 +257,7 @@ class Pool:
self.opts.max_pool_size * self.opts.wait_queue_multiple)
self._socket_semaphore = thread_util.create_semaphore(
self.opts.max_pool_size, max_waiters, self.opts.use_greenlets)
self.opts.max_pool_size, max_waiters)
def reset(self):
# Ignore this race condition -- if many threads are resetting at once,

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities to abstract the differences between threads and greenlets."""
"""Utilities for multi-threading support."""
import threading
import sys
@ -22,21 +22,6 @@ try:
except ImportError:
from time import time as _time
have_gevent = True
try:
import greenlet
try:
# gevent-1.0rc2 and later.
from gevent.lock import BoundedSemaphore as GeventBoundedSemaphore
except ImportError:
from gevent.coros import BoundedSemaphore as GeventBoundedSemaphore
from gevent.greenlet import SpawnedLink
from gevent.event import Event as GeventEvent
except ImportError:
have_gevent = False
from pymongo.errors import ExceededMaxWaiters
@ -53,36 +38,28 @@ class DummyLock(object):
pass
class Ident(object):
class ThreadIdent(object):
def __init__(self):
self._refs = {}
self._local = threading.local()
if issue1868:
self._lock = threading.Lock()
else:
self._lock = DummyLock()
def watching(self):
"""Is the current thread or greenlet being watched for death?"""
"""Is the current thread being watched for death?"""
return self.get() in self._refs
def unwatch(self, tid):
self._refs.pop(tid, None)
def get(self):
"""An id for this thread or greenlet"""
raise NotImplementedError
return id(self._make_vigil())
def watch(self, callback):
"""Run callback when this thread or greenlet dies. callback takes
one meaningless argument.
"""
raise NotImplementedError
class ThreadIdent(Ident):
def __init__(self):
super(ThreadIdent, self).__init__()
self._local = threading.local()
if issue1868:
self._lock = threading.Lock()
else:
self._lock = DummyLock()
vigil = self._make_vigil()
self._refs[id(vigil)] = weakref.ref(vigil, callback)
# We watch for thread-death using a weakref callback to a thread local.
# Weakrefs are permitted on subclasses of object but not object() itself.
@ -103,56 +80,11 @@ class ThreadIdent(Ident):
return vigil
def get(self):
return id(self._make_vigil())
def watch(self, callback):
vigil = self._make_vigil()
self._refs[id(vigil)] = weakref.ref(vigil, callback)
class GreenletIdent(Ident):
def get(self):
return id(greenlet.getcurrent())
def watch(self, callback):
current = greenlet.getcurrent()
tid = self.get()
if hasattr(current, 'link'):
# This is a Gevent Greenlet (capital G), which inherits from
# greenlet and provides a 'link' method to detect when the
# Greenlet exits.
link = SpawnedLink(callback)
current.rawlink(link)
self._refs[tid] = link
else:
# This is a non-Gevent greenlet (small g), or it's the main
# greenlet.
self._refs[tid] = weakref.ref(current, callback)
def unwatch(self, tid):
""" call unlink if link before """
link = self._refs.pop(tid, None)
current = greenlet.getcurrent()
if hasattr(current, 'unlink'):
# This is a Gevent enhanced Greenlet. Remove the SpawnedLink we
# linked to it.
current.unlink(link)
def create_ident(use_greenlets):
if use_greenlets:
return GreenletIdent()
else:
return ThreadIdent()
class Counter(object):
"""A thread- or greenlet-local counter.
"""
def __init__(self, use_greenlets):
self.ident = create_ident(use_greenlets)
"""A thread-local counter."""
def __init__(self):
self.ident = ThreadIdent()
self._counters = {}
def inc(self):
@ -189,7 +121,7 @@ class Counter(object):
class Future(object):
"""Minimal backport of concurrent.futures.Future.
event_class makes this Future adaptable for Gevent and other frameworks.
event_class makes this Future adaptable for async clients.
"""
def __init__(self, event_class):
self._event = event_class()
@ -313,30 +245,11 @@ class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
self, BoundedSemaphore, value, max_waiters)
if have_gevent:
class MaxWaitersBoundedSemaphoreGevent(MaxWaitersBoundedSemaphore):
def __init__(self, value=1, max_waiters=1):
MaxWaitersBoundedSemaphore.__init__(
self, GeventBoundedSemaphore, value, max_waiters)
def create_semaphore(max_size, max_waiters, use_greenlets):
def create_semaphore(max_size, max_waiters):
if max_size is None:
return DummySemaphore()
elif use_greenlets:
if max_waiters is None:
return GeventBoundedSemaphore(max_size)
else:
return MaxWaitersBoundedSemaphoreGevent(max_size, max_waiters)
else:
if max_waiters is None:
return BoundedSemaphore(max_size)
else:
return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)
def create_event(use_greenlets):
if use_greenlets:
return GeventEvent()
else:
return threading.Event()

View File

@ -22,48 +22,36 @@ import time
sys.path[0:0] = [""]
from pymongo import thread_util
if thread_util.have_gevent:
import greenlet # Plain greenlets.
import gevent.greenlet # Gevent's enhanced Greenlets.
import gevent.hub
from test import SkipTest, unittest
from test.utils import looplet, my_partial, RendezvousThread
from test.utils import my_partial, RendezvousThread
class TestIdent(unittest.TestCase):
"""Ensure thread_util.Ident works for threads and greenlets. This has
gotten intricate from refactoring: we have classes, Watched and Unwatched,
that implement the logic for the two child threads / greenlets. For the
greenlet case it's easy to ensure the two children are alive at once, so
we run the Watched and Unwatched logic directly. For the thread case we
mix in the RendezvousThread class so we're sure both children are alive
when they call Ident.get().
def test_thread_ident(self):
# 1. Store main thread's id.
# 2. Start 2 child threads.
# 3. Store their values for Ident.get().
# 4. Children reach rendezvous point.
# 5. Children call Ident.watch().
# 6. One of the children calls Ident.unwatch().
# 7. Children terminate.
# 8. Assert that children got different ids from each other and from
# main, and assert watched child's callback was executed, and that
# unwatched child's callback was not.
1. Store main thread's / greenlet's id
2. Start 2 child threads / greenlets
3. Store their values for Ident.get()
4. Children reach rendezvous point
5. Children call Ident.watch()
6. One of the children calls Ident.unwatch()
7. Children terminate
8. Assert that children got different ids from each other and from main,
and assert watched child's callback was executed, and that unwatched
child's callback was not
"""
def _test_ident(self, use_greenlets):
if 'java' in sys.platform:
raise SkipTest("Can't rely on weakref callbacks in Jython")
ident = thread_util.create_ident(use_greenlets)
ident = thread_util.ThreadIdent()
ids = set([ident.get()])
unwatched_id = []
done = set([ident.get()]) # Start with main thread's / greenlet's id.
done = set([ident.get()]) # Start with main thread's id.
died = set()
class Watched(object):
def __init__(self, ident):
class WatchedThread(RendezvousThread):
def __init__(self, ident, state):
super(WatchedThread, self).__init__(state)
self._my_ident = ident
def before_rendezvous(self):
@ -76,56 +64,31 @@ class TestIdent(unittest.TestCase):
assert self._my_ident.watching()
done.add(self.my_id)
class Unwatched(Watched):
class UnwatchedThread(WatchedThread):
def before_rendezvous(self):
Watched.before_rendezvous(self)
super(UnwatchedThread, self).before_rendezvous()
unwatched_id.append(self.my_id)
def after_rendezvous(self):
Watched.after_rendezvous(self)
super(UnwatchedThread, self).after_rendezvous()
self._my_ident.unwatch(self.my_id)
assert not self._my_ident.watching()
if use_greenlets:
class WatchedGreenlet(Watched):
def run(self):
self.before_rendezvous()
self.after_rendezvous()
state = RendezvousThread.create_shared_state(2)
t_watched = WatchedThread(ident, state)
t_watched.start()
class UnwatchedGreenlet(Unwatched):
def run(self):
self.before_rendezvous()
self.after_rendezvous()
t_unwatched = UnwatchedThread(ident, state)
t_unwatched.start()
t_watched = greenlet.greenlet(WatchedGreenlet(ident).run)
t_unwatched = greenlet.greenlet(UnwatchedGreenlet(ident).run)
looplet([t_watched, t_unwatched])
else:
class WatchedThread(Watched, RendezvousThread):
def __init__(self, ident, state):
Watched.__init__(self, ident)
RendezvousThread.__init__(self, state)
RendezvousThread.wait_for_rendezvous(state)
RendezvousThread.resume_after_rendezvous(state)
class UnwatchedThread(Unwatched, RendezvousThread):
def __init__(self, ident, state):
Unwatched.__init__(self, ident)
RendezvousThread.__init__(self, state)
t_watched.join()
t_unwatched.join()
state = RendezvousThread.create_shared_state(2)
t_watched = WatchedThread(ident, state)
t_watched.start()
t_unwatched = UnwatchedThread(ident, state)
t_unwatched.start()
RendezvousThread.wait_for_rendezvous(state)
RendezvousThread.resume_after_rendezvous(state)
t_watched.join()
t_unwatched.join()
self.assertTrue(t_watched.passed)
self.assertTrue(t_unwatched.passed)
self.assertTrue(t_watched.passed)
self.assertTrue(t_unwatched.passed)
# Remove references, let weakref callbacks run
del t_watched
@ -147,57 +110,10 @@ class TestIdent(unittest.TestCase):
self.assertEqual(1, len(died))
self.assertFalse(unwatched_id[0] in died)
def test_thread_ident(self):
self._test_ident(False)
def test_greenlet_ident(self):
if not thread_util.have_gevent:
raise SkipTest('greenlet not installed')
self._test_ident(True)
class TestGreenletIdent(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not thread_util.have_gevent:
raise SkipTest("need Gevent")
def test_unwatch_cleans_up(self):
# GreenletIdent.unwatch() should remove the on_thread_died callback
# from an enhanced Gevent Greenlet's list of links.
callback_ran = [False]
def on_greenlet_died(_):
callback_ran[0] = True
ident = thread_util.create_ident(use_greenlets=True)
def watch_and_unwatch():
ident.watch(on_greenlet_died)
ident.unwatch(ident.get())
g = gevent.greenlet.Greenlet(run=watch_and_unwatch)
g.start()
g.join(10)
the_hub = gevent.hub.get_hub()
if hasattr(the_hub, 'join'):
# Gevent 1.0
the_hub.join()
else:
# Gevent 0.13 and less
the_hub.shutdown()
self.assertTrue(g.successful())
# unwatch() canceled the callback.
self.assertFalse(callback_ran[0])
class TestCounter(unittest.TestCase):
def _test_counter(self, use_greenlets):
counter = thread_util.Counter(use_greenlets)
def test_counter(self):
counter = thread_util.Counter()
self.assertEqual(0, counter.dec())
self.assertEqual(0, counter.get())
@ -225,28 +141,17 @@ class TestCounter(unittest.TestCase):
done.add(n)
if use_greenlets:
greenlets = [
greenlet.greenlet(my_partial(f, i)) for i in range(10)]
looplet(greenlets)
else:
threads = [
threading.Thread(target=my_partial(f, i)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
threads = [threading.Thread(target=my_partial(f, i))
for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(10, len(done))
def test_thread_counter(self):
self._test_counter(False)
def test_greenlet_counter(self):
if not thread_util.have_gevent:
raise SkipTest('greenlet not installed')
self._test_counter(True)
if __name__ == "__main__":
unittest.main()

View File

@ -176,19 +176,6 @@ def assertRaisesExactly(cls, fn, *args, **kwargs):
else:
raise AssertionError("%s not raised" % cls)
def looplet(greenlets):
"""World's smallest event loop; run until all greenlets are done
"""
while True:
done = True
for g in greenlets:
if not g.dead:
done = False
g.switch()
if done:
return
class RendezvousThread(threading.Thread):
"""A thread that starts and pauses at a rendezvous point before resuming.