Remove MongoClient's methods start_request(), in_request(), and end_request(). The purpose of requests was to provide read-your-writes consistency when using w=0 write concern. Starting a request pins a socket to a thread so any operations on that thread end up in the same queue on the server side. Justification for removing: mongos 2.6+ doesn't support socket pinning by default, and mongos 2.8+ doesn't support it at all (SERVER-12273), so whatever weak consistency guarantees a request was supposed to provide are not provided with sharding. It's unnecessary with MongoDB 2.6+ since write commands always block and send a response. It's a confusing feature that should rarely be used, if ever, yet people incorrectly use it all the time. It makes our connection pool unmaintainable to any but the most expert developers.
203 lines
5.9 KiB
Python
203 lines
5.9 KiB
Python
# Copyright 2012-2014 MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Utilities for multi-threading support."""
|
|
|
|
import threading
|
|
try:
|
|
from time import monotonic as _time
|
|
except ImportError:
|
|
from time import time as _time
|
|
|
|
from pymongo.monotonic import time as _time
|
|
from pymongo.errors import ExceededMaxWaiters
|
|
|
|
|
|
### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
|
|
class Semaphore:
|
|
|
|
# After Tim Peters' semaphore class, but not quite the same (no maximum)
|
|
|
|
def __init__(self, value=1):
|
|
if value < 0:
|
|
raise ValueError("semaphore initial value must be >= 0")
|
|
self._cond = threading.Condition(threading.Lock())
|
|
self._value = value
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
if not blocking and timeout is not None:
|
|
raise ValueError("can't specify timeout for non-blocking acquire")
|
|
rc = False
|
|
endtime = None
|
|
self._cond.acquire()
|
|
while self._value == 0:
|
|
if not blocking:
|
|
break
|
|
if timeout is not None:
|
|
if endtime is None:
|
|
endtime = _time() + timeout
|
|
else:
|
|
timeout = endtime - _time()
|
|
if timeout <= 0:
|
|
break
|
|
self._cond.wait(timeout)
|
|
else:
|
|
self._value = self._value - 1
|
|
rc = True
|
|
self._cond.release()
|
|
return rc
|
|
|
|
__enter__ = acquire
|
|
|
|
def release(self):
|
|
self._cond.acquire()
|
|
self._value = self._value + 1
|
|
self._cond.notify()
|
|
self._cond.release()
|
|
|
|
def __exit__(self, t, v, tb):
|
|
self.release()
|
|
|
|
@property
|
|
def counter(self):
|
|
return self._value
|
|
|
|
|
|
class BoundedSemaphore(Semaphore):
|
|
"""Semaphore that checks that # releases is <= # acquires"""
|
|
def __init__(self, value=1):
|
|
Semaphore.__init__(self, value)
|
|
self._initial_value = value
|
|
|
|
def release(self):
|
|
if self._value >= self._initial_value:
|
|
raise ValueError("Semaphore released too many times")
|
|
return Semaphore.release(self)
|
|
### End backport from CPython 3.2
|
|
|
|
|
|
class DummySemaphore(object):
|
|
def __init__(self, value=None):
|
|
pass
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
return True
|
|
|
|
def release(self):
|
|
pass
|
|
|
|
|
|
class MaxWaitersBoundedSemaphore(object):
|
|
def __init__(self, semaphore_class, value=1, max_waiters=1):
|
|
self.waiter_semaphore = semaphore_class(max_waiters)
|
|
self.semaphore = semaphore_class(value)
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
if not self.waiter_semaphore.acquire(False):
|
|
raise ExceededMaxWaiters()
|
|
try:
|
|
return self.semaphore.acquire(blocking, timeout)
|
|
finally:
|
|
self.waiter_semaphore.release()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.semaphore, name)
|
|
|
|
|
|
class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
|
|
def __init__(self, value=1, max_waiters=1):
|
|
MaxWaitersBoundedSemaphore.__init__(
|
|
self, BoundedSemaphore, value, max_waiters)
|
|
|
|
|
|
def create_semaphore(max_size, max_waiters):
|
|
if max_size is None:
|
|
return DummySemaphore()
|
|
else:
|
|
if max_waiters is None:
|
|
return BoundedSemaphore(max_size)
|
|
else:
|
|
return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)
|
|
|
|
|
|
class Event(object):
|
|
"""Copy of standard threading.Event, but uses a custom condition class.
|
|
|
|
Allows async frameworks to override monitors' synchronization behavior
|
|
with TopologySettings.condition_class.
|
|
|
|
Copied from CPython's threading.py at hash c7960cc9.
|
|
"""
|
|
|
|
def __init__(self, condition_class):
|
|
self._cond = condition_class(threading.Lock())
|
|
self._flag = False
|
|
|
|
def is_set(self):
|
|
"""Return true if and only if the internal flag is true."""
|
|
return self._flag
|
|
|
|
isSet = is_set
|
|
|
|
def set(self):
|
|
"""Set the internal flag to true.
|
|
|
|
All threads waiting for it to become true are awakened. Threads
|
|
that call wait() once the flag is true will not block at all.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
self._flag = True
|
|
self._cond.notify_all()
|
|
finally:
|
|
self._cond.release()
|
|
|
|
def clear(self):
|
|
"""Reset the internal flag to false.
|
|
|
|
Subsequently, threads calling wait() will block until set() is called to
|
|
set the internal flag to true again.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
self._flag = False
|
|
finally:
|
|
self._cond.release()
|
|
|
|
def wait(self, timeout=None):
|
|
"""Block until the internal flag is true.
|
|
|
|
If the internal flag is true on entry, return immediately. Otherwise,
|
|
block until another thread calls set() to set the flag to true, or until
|
|
the optional timeout occurs.
|
|
|
|
When the timeout argument is present and not None, it should be a
|
|
floating point number specifying a timeout for the operation in seconds
|
|
(or fractions thereof).
|
|
|
|
This method returns the internal flag on exit, so it will always return
|
|
True except if a timeout is given and the operation times out.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
signaled = self._flag
|
|
if not signaled:
|
|
signaled = self._cond.wait(timeout)
|
|
return signaled
|
|
finally:
|
|
self._cond.release()
|