Use Pool.generation and topologyVersion to reduce race conditions SDAM error handling. Implement SDAM error handling spec tests.
194 lines
6.9 KiB
Python
194 lines
6.9 KiB
Python
# Backport of the threading.Barrier class from python 3.8, with small
|
|
# changes to support python 2.7.
|
|
# https://github.com/python/cpython/blob/v3.8.2/Lib/threading.py#L562-L728
|
|
|
|
from threading import (Condition,
|
|
Lock)
|
|
|
|
from pymongo.monotonic import time as _time
|
|
|
|
|
|
# Backport Condition.wait_for from 3.8.2
|
|
# https://github.com/python/cpython/blob/v3.8.2/Lib/threading.py#L318-L339
|
|
def wait_for(condition, predicate, timeout=None):
|
|
"""Wait until a condition evaluates to True.
|
|
|
|
predicate should be a callable which result will be interpreted as a
|
|
boolean value. A timeout may be provided giving the maximum time to
|
|
wait.
|
|
|
|
"""
|
|
endtime = None
|
|
waittime = timeout
|
|
result = predicate()
|
|
while not result:
|
|
if waittime is not None:
|
|
if endtime is None:
|
|
endtime = _time() + waittime
|
|
else:
|
|
waittime = endtime - _time()
|
|
if waittime <= 0:
|
|
break
|
|
condition.wait(waittime)
|
|
result = predicate()
|
|
return result
|
|
|
|
|
|
# A barrier class. Inspired in part by the pthread_barrier_* api and
|
|
# the CyclicBarrier class from Java. See
|
|
# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
|
|
# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
|
|
# CyclicBarrier.html
|
|
# for information.
|
|
# We maintain two main states, 'filling' and 'draining' enabling the barrier
|
|
# to be cyclic. Threads are not allowed into it until it has fully drained
|
|
# since the previous cycle. In addition, a 'resetting' state exists which is
|
|
# similar to 'draining' except that threads leave with a BrokenBarrierError,
|
|
# and a 'broken' state in which all threads get the exception.
|
|
class Barrier(object):
|
|
"""Implements a Barrier.
|
|
Useful for synchronizing a fixed number of threads at known synchronization
|
|
points. Threads block on 'wait()' and are simultaneously awoken once they
|
|
have all made that call.
|
|
"""
|
|
|
|
def __init__(self, parties, action=None, timeout=None):
|
|
"""Create a barrier, initialised to 'parties' threads.
|
|
'action' is a callable which, when supplied, will be called by one of
|
|
the threads after they have all entered the barrier and just prior to
|
|
releasing them all. If a 'timeout' is provided, it is used as the
|
|
default for all subsequent 'wait()' calls.
|
|
"""
|
|
self._cond = Condition(Lock())
|
|
self._action = action
|
|
self._timeout = timeout
|
|
self._parties = parties
|
|
self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
|
|
self._count = 0
|
|
|
|
def wait(self, timeout=None):
|
|
"""Wait for the barrier.
|
|
When the specified number of threads have started waiting, they are all
|
|
simultaneously awoken. If an 'action' was provided for the barrier, one
|
|
of the threads will have executed that callback prior to returning.
|
|
Returns an individual index number from 0 to 'parties-1'.
|
|
"""
|
|
if timeout is None:
|
|
timeout = self._timeout
|
|
with self._cond:
|
|
self._enter() # Block while the barrier drains.
|
|
index = self._count
|
|
self._count += 1
|
|
try:
|
|
if index + 1 == self._parties:
|
|
# We release the barrier
|
|
self._release()
|
|
else:
|
|
# We wait until someone releases us
|
|
self._wait(timeout)
|
|
return index
|
|
finally:
|
|
self._count -= 1
|
|
# Wake up any threads waiting for barrier to drain.
|
|
self._exit()
|
|
|
|
# Block until the barrier is ready for us, or raise an exception
|
|
# if it is broken.
|
|
def _enter(self):
|
|
while self._state in (-1, 1):
|
|
# It is draining or resetting, wait until done
|
|
self._cond.wait()
|
|
#see if the barrier is in a broken state
|
|
if self._state < 0:
|
|
raise BrokenBarrierError
|
|
assert self._state == 0
|
|
|
|
# Optionally run the 'action' and release the threads waiting
|
|
# in the barrier.
|
|
def _release(self):
|
|
try:
|
|
if self._action:
|
|
self._action()
|
|
# enter draining state
|
|
self._state = 1
|
|
self._cond.notify_all()
|
|
except:
|
|
#an exception during the _action handler. Break and reraise
|
|
self._break()
|
|
raise
|
|
|
|
# Wait in the barrier until we are released. Raise an exception
|
|
# if the barrier is reset or broken.
|
|
def _wait(self, timeout):
|
|
if not wait_for(self._cond, lambda : self._state != 0, timeout):
|
|
#timed out. Break the barrier
|
|
self._break()
|
|
raise BrokenBarrierError
|
|
if self._state < 0:
|
|
raise BrokenBarrierError
|
|
assert self._state == 1
|
|
|
|
# If we are the last thread to exit the barrier, signal any threads
|
|
# waiting for the barrier to drain.
|
|
def _exit(self):
|
|
if self._count == 0:
|
|
if self._state in (-1, 1):
|
|
#resetting or draining
|
|
self._state = 0
|
|
self._cond.notify_all()
|
|
|
|
def reset(self):
|
|
"""Reset the barrier to the initial state.
|
|
Any threads currently waiting will get the BrokenBarrier exception
|
|
raised.
|
|
"""
|
|
with self._cond:
|
|
if self._count > 0:
|
|
if self._state == 0:
|
|
#reset the barrier, waking up threads
|
|
self._state = -1
|
|
elif self._state == -2:
|
|
#was broken, set it to reset state
|
|
#which clears when the last thread exits
|
|
self._state = -1
|
|
else:
|
|
self._state = 0
|
|
self._cond.notify_all()
|
|
|
|
def abort(self):
|
|
"""Place the barrier into a 'broken' state.
|
|
Useful in case of error. Any currently waiting threads and threads
|
|
attempting to 'wait()' will have BrokenBarrierError raised.
|
|
"""
|
|
with self._cond:
|
|
self._break()
|
|
|
|
def _break(self):
|
|
# An internal error was detected. The barrier is set to
|
|
# a broken state all parties awakened.
|
|
self._state = -2
|
|
self._cond.notify_all()
|
|
|
|
@property
|
|
def parties(self):
|
|
"""Return the number of threads required to trip the barrier."""
|
|
return self._parties
|
|
|
|
@property
|
|
def n_waiting(self):
|
|
"""Return the number of threads currently waiting at the barrier."""
|
|
# We don't need synchronization here since this is an ephemeral result
|
|
# anyway. It returns the correct value in the steady state.
|
|
if self._state == 0:
|
|
return self._count
|
|
return 0
|
|
|
|
@property
|
|
def broken(self):
|
|
"""Return True if the barrier is in a broken state."""
|
|
return self._state == -2
|
|
|
|
# exception raised by the Barrier class
|
|
class BrokenBarrierError(RuntimeError):
|
|
pass
|