203 lines
5.9 KiB
Python
203 lines
5.9 KiB
Python
# Copyright 2012-2015 MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Utilities for multi-threading support."""
|
|
|
|
import threading
|
|
try:
|
|
from time import monotonic as _time
|
|
except ImportError:
|
|
from time import time as _time
|
|
|
|
from pymongo.monotonic import time as _time
|
|
from pymongo.errors import ExceededMaxWaiters
|
|
|
|
|
|
### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
|
|
class Semaphore:
|
|
|
|
# After Tim Peters' semaphore class, but not quite the same (no maximum)
|
|
|
|
def __init__(self, value=1):
|
|
if value < 0:
|
|
raise ValueError("semaphore initial value must be >= 0")
|
|
self._cond = threading.Condition(threading.Lock())
|
|
self._value = value
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
if not blocking and timeout is not None:
|
|
raise ValueError("can't specify timeout for non-blocking acquire")
|
|
rc = False
|
|
endtime = None
|
|
self._cond.acquire()
|
|
while self._value == 0:
|
|
if not blocking:
|
|
break
|
|
if timeout is not None:
|
|
if endtime is None:
|
|
endtime = _time() + timeout
|
|
else:
|
|
timeout = endtime - _time()
|
|
if timeout <= 0:
|
|
break
|
|
self._cond.wait(timeout)
|
|
else:
|
|
self._value = self._value - 1
|
|
rc = True
|
|
self._cond.release()
|
|
return rc
|
|
|
|
__enter__ = acquire
|
|
|
|
def release(self):
|
|
self._cond.acquire()
|
|
self._value = self._value + 1
|
|
self._cond.notify()
|
|
self._cond.release()
|
|
|
|
def __exit__(self, t, v, tb):
|
|
self.release()
|
|
|
|
@property
|
|
def counter(self):
|
|
return self._value
|
|
|
|
|
|
class BoundedSemaphore(Semaphore):
|
|
"""Semaphore that checks that # releases is <= # acquires"""
|
|
def __init__(self, value=1):
|
|
Semaphore.__init__(self, value)
|
|
self._initial_value = value
|
|
|
|
def release(self):
|
|
if self._value >= self._initial_value:
|
|
raise ValueError("Semaphore released too many times")
|
|
return Semaphore.release(self)
|
|
### End backport from CPython 3.2
|
|
|
|
|
|
class DummySemaphore(object):
|
|
def __init__(self, value=None):
|
|
pass
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
return True
|
|
|
|
def release(self):
|
|
pass
|
|
|
|
|
|
class MaxWaitersBoundedSemaphore(object):
|
|
def __init__(self, semaphore_class, value=1, max_waiters=1):
|
|
self.waiter_semaphore = semaphore_class(max_waiters)
|
|
self.semaphore = semaphore_class(value)
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
if not self.waiter_semaphore.acquire(False):
|
|
raise ExceededMaxWaiters()
|
|
try:
|
|
return self.semaphore.acquire(blocking, timeout)
|
|
finally:
|
|
self.waiter_semaphore.release()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.semaphore, name)
|
|
|
|
|
|
class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
|
|
def __init__(self, value=1, max_waiters=1):
|
|
MaxWaitersBoundedSemaphore.__init__(
|
|
self, BoundedSemaphore, value, max_waiters)
|
|
|
|
|
|
def create_semaphore(max_size, max_waiters):
|
|
if max_size is None:
|
|
return DummySemaphore()
|
|
else:
|
|
if max_waiters is None:
|
|
return BoundedSemaphore(max_size)
|
|
else:
|
|
return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)
|
|
|
|
|
|
class Event(object):
|
|
"""Copy of standard threading.Event, but uses a custom condition class.
|
|
|
|
Allows async frameworks to override monitors' synchronization behavior
|
|
with TopologySettings.condition_class.
|
|
|
|
Copied from CPython's threading.py at hash c7960cc9.
|
|
"""
|
|
|
|
def __init__(self, condition_class):
|
|
self._cond = condition_class(threading.Lock())
|
|
self._flag = False
|
|
|
|
def is_set(self):
|
|
"""Return true if and only if the internal flag is true."""
|
|
return self._flag
|
|
|
|
isSet = is_set
|
|
|
|
def set(self):
|
|
"""Set the internal flag to true.
|
|
|
|
All threads waiting for it to become true are awakened. Threads
|
|
that call wait() once the flag is true will not block at all.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
self._flag = True
|
|
self._cond.notify_all()
|
|
finally:
|
|
self._cond.release()
|
|
|
|
def clear(self):
|
|
"""Reset the internal flag to false.
|
|
|
|
Subsequently, threads calling wait() will block until set() is called to
|
|
set the internal flag to true again.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
self._flag = False
|
|
finally:
|
|
self._cond.release()
|
|
|
|
def wait(self, timeout=None):
|
|
"""Block until the internal flag is true.
|
|
|
|
If the internal flag is true on entry, return immediately. Otherwise,
|
|
block until another thread calls set() to set the flag to true, or until
|
|
the optional timeout occurs.
|
|
|
|
When the timeout argument is present and not None, it should be a
|
|
floating point number specifying a timeout for the operation in seconds
|
|
(or fractions thereof).
|
|
|
|
This method returns the internal flag on exit, so it will always return
|
|
True except if a timeout is given and the operation times out.
|
|
|
|
"""
|
|
self._cond.acquire()
|
|
try:
|
|
signaled = self._flag
|
|
if not signaled:
|
|
signaled = self._cond.wait(timeout)
|
|
return signaled
|
|
finally:
|
|
self._cond.release()
|