initial lock refactor and race fixes
This commit is contained in:
parent
0adf6df131
commit
093652866a
@ -709,6 +709,11 @@ class Pool:
|
||||
:param options: a PoolOptions instance
|
||||
:param is_sdam: whether to call hello for each new AsyncConnection
|
||||
"""
|
||||
# Main lock only used to protect updating attributes.
|
||||
# Avoid any additional work while holding the lock.
|
||||
# If looping over an attribute, copy the container and do not take the lock.
|
||||
self.lock = _async_create_lock()
|
||||
|
||||
if options.pause_enabled:
|
||||
self.state = PoolState.PAUSED
|
||||
else:
|
||||
@ -720,10 +725,9 @@ class Pool:
|
||||
# and returned to pool from the left side. Stale sockets removed
|
||||
# from the right side.
|
||||
self.conns: collections.deque[AsyncConnection] = collections.deque()
|
||||
self.active_contexts: set[_CancellationContext] = set()
|
||||
self.lock = _async_create_lock()
|
||||
self._max_connecting_cond = _async_create_condition(self.lock)
|
||||
self.active_sockets = 0
|
||||
# This lock should only be contended by threads adding/removing connections.
|
||||
self._conns_lock = _async_create_lock()
|
||||
|
||||
# Monotonically increasing connection ID required for CMAP Events.
|
||||
self.next_connection_id = 1
|
||||
# Track whether the sockets in this pool are writeable or not.
|
||||
@ -748,16 +752,19 @@ class Pool:
|
||||
# The first portion of the wait queue.
|
||||
# Enforces: maxPoolSize
|
||||
# Also used for: clearing the wait queue
|
||||
self.size_cond = _async_create_condition(self.lock)
|
||||
self.requests = 0
|
||||
# This lock should only be contended by threads adding/removing self.requests.
|
||||
self.size_cond = _async_create_condition(_async_create_lock())
|
||||
self.max_pool_size = self.opts.max_pool_size
|
||||
if not self.max_pool_size:
|
||||
self.max_pool_size = float("inf")
|
||||
|
||||
# The second portion of the wait queue.
|
||||
# Enforces: maxConnecting
|
||||
# Also used for: clearing the wait queue
|
||||
self._max_connecting_cond = _async_create_condition(self.lock)
|
||||
self._pending = 0
|
||||
# This lock should only be contended by threads adding/removing self._pending.
|
||||
self._max_connecting_cond = _async_create_condition(_async_create_lock())
|
||||
self._max_connecting = self.opts.max_connecting
|
||||
self._client_id = client_id
|
||||
# Log before publishing event to prevent potential listener preemption in tests
|
||||
@ -777,29 +784,41 @@ class Pool:
|
||||
)
|
||||
# Similar to active_sockets but includes threads in the wait queue.
|
||||
self.operation_count: int = 0
|
||||
# This lock should be contended on every operation.
|
||||
self._operation_count_lock = _async_create_lock()
|
||||
|
||||
self.active_contexts: set[_CancellationContext] = set()
|
||||
self.active_sockets = 0
|
||||
# Retain references to pinned connections to prevent the CPython GC
|
||||
# from thinking that a cursor's pinned connection can be GC'd when the
|
||||
# cursor is GC'd (see PYTHON-2751).
|
||||
self.__pinned_sockets: set[AsyncConnection] = set()
|
||||
self.ncursors = 0
|
||||
self.ntxns = 0
|
||||
# This lock protects self.active_contexts, self.active_sockets,
|
||||
# self.__pinned_sockets, self.ncursors, and self.ntxns.
|
||||
self._active_contexts_lock = _async_create_lock()
|
||||
|
||||
async def ready(self) -> None:
|
||||
# Take the lock to avoid the race condition described in PYTHON-2699.
|
||||
async with self.lock:
|
||||
if self.state != PoolState.READY:
|
||||
state_changed = False
|
||||
if self.state != PoolState.READY:
|
||||
async with self.lock:
|
||||
self.state = PoolState.READY
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
message=_ConnectionStatusMessage.POOL_READY,
|
||||
clientId=self._client_id,
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
state_changed = True
|
||||
if not state_changed:
|
||||
return
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
message=_ConnectionStatusMessage.POOL_READY,
|
||||
clientId=self._client_id,
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
@ -813,38 +832,46 @@ class Pool:
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
old_state = self.state
|
||||
async with self.size_cond:
|
||||
if self.closed:
|
||||
return
|
||||
if self.closed:
|
||||
return
|
||||
is_fork = False
|
||||
async with self.lock:
|
||||
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
|
||||
old_state, self.state = self.state, PoolState.PAUSED
|
||||
self.gen.inc(service_id)
|
||||
newpid = os.getpid()
|
||||
if self.pid != newpid:
|
||||
self.pid = newpid
|
||||
is_fork = True
|
||||
if is_fork:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_sockets = 0
|
||||
async with self._operation_count_lock:
|
||||
self.operation_count = 0
|
||||
async with self._conns_lock:
|
||||
if service_id is None:
|
||||
sockets, self.conns = self.conns, collections.deque()
|
||||
else:
|
||||
discard: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
keep: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
for conn in self.conns:
|
||||
if conn.service_id == service_id:
|
||||
discard.append(conn)
|
||||
else:
|
||||
keep.append(conn)
|
||||
sockets = discard
|
||||
if service_id is not None:
|
||||
discard: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
keep: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
for conn in self.conns:
|
||||
if conn.service_id == service_id:
|
||||
discard.append(conn)
|
||||
else:
|
||||
keep.append(conn)
|
||||
sockets = discard
|
||||
async with self._conns_lock:
|
||||
self.conns = keep
|
||||
|
||||
if close:
|
||||
self.state = PoolState.CLOSED
|
||||
async with self.lock:
|
||||
self.state = PoolState.CLOSED
|
||||
# Clear the wait queue
|
||||
self._max_connecting_cond.notify_all()
|
||||
self.size_cond.notify_all()
|
||||
|
||||
if interrupt_connections:
|
||||
for context in self.active_contexts:
|
||||
for context in self.active_contexts.copy():
|
||||
context.cancel()
|
||||
|
||||
listeners = self.opts._event_listeners
|
||||
@ -903,9 +930,8 @@ class Pool:
|
||||
Pool.
|
||||
"""
|
||||
self.is_writable = is_writable
|
||||
async with self.lock:
|
||||
for _socket in self.conns:
|
||||
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
|
||||
for _socket in self.conns:
|
||||
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
|
||||
|
||||
async def reset(
|
||||
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
|
||||
@ -936,12 +962,9 @@ class Pool:
|
||||
|
||||
if self.opts.max_idle_time_seconds is not None:
|
||||
close_conns = []
|
||||
async with self.lock:
|
||||
while (
|
||||
self.conns
|
||||
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
|
||||
):
|
||||
close_conns.append(self.conns.pop())
|
||||
conns = self.conns.copy()
|
||||
while conns and conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds:
|
||||
close_conns.append(conns.pop())
|
||||
if not _IS_SYNC:
|
||||
await asyncio.gather(
|
||||
*[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], # type: ignore[func-returns-value]
|
||||
@ -952,12 +975,12 @@ class Pool:
|
||||
await conn.close_conn(ConnectionClosedReason.IDLE)
|
||||
|
||||
while True:
|
||||
# There are enough sockets in the pool.
|
||||
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
|
||||
return
|
||||
if self.requests >= self.opts.min_pool_size:
|
||||
return
|
||||
async with self.size_cond:
|
||||
# There are enough sockets in the pool.
|
||||
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
|
||||
return
|
||||
if self.requests >= self.opts.min_pool_size:
|
||||
return
|
||||
self.requests += 1
|
||||
incremented = False
|
||||
try:
|
||||
@ -970,13 +993,14 @@ class Pool:
|
||||
incremented = True
|
||||
conn = await self.connect()
|
||||
close_conn = False
|
||||
async with self.lock:
|
||||
# Close connection and return if the pool was reset during
|
||||
# socket creation or while acquiring the pool lock.
|
||||
if self.gen.get_overall() != reference_generation:
|
||||
close_conn = True
|
||||
if not close_conn:
|
||||
# Close connection and return if the pool was reset during
|
||||
# socket creation or while acquiring the pool lock.
|
||||
if self.gen.get_overall() != reference_generation:
|
||||
close_conn = True
|
||||
if not close_conn:
|
||||
async with self._conns_lock:
|
||||
self.conns.appendleft(conn)
|
||||
async with self._active_contexts_lock:
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if close_conn:
|
||||
await conn.close_conn(ConnectionClosedReason.STALE)
|
||||
@ -1015,11 +1039,11 @@ class Pool:
|
||||
Note that the pool does not keep a reference to the socket -- you
|
||||
must call checkin() when you're done with it.
|
||||
"""
|
||||
async with self.lock:
|
||||
# Use a temporary context so that interrupt_connections can cancel creating the socket.
|
||||
tmp_context = _CancellationContext()
|
||||
async with self._active_contexts_lock:
|
||||
conn_id = self.next_connection_id
|
||||
self.next_connection_id += 1
|
||||
# Use a temporary context so that interrupt_connections can cancel creating the socket.
|
||||
tmp_context = _CancellationContext()
|
||||
self.active_contexts.add(tmp_context)
|
||||
|
||||
listeners = self.opts._event_listeners
|
||||
@ -1040,7 +1064,7 @@ class Pool:
|
||||
networking_interface = await _configured_protocol_interface(self.address, self.opts)
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
except BaseException as error:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
@ -1065,7 +1089,7 @@ class Pool:
|
||||
raise
|
||||
|
||||
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if tmp_context.cancelled:
|
||||
@ -1082,7 +1106,7 @@ class Pool:
|
||||
await conn.authenticate()
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
except BaseException as e:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if not completed_hello:
|
||||
self._handle_connection_error(e)
|
||||
@ -1144,7 +1168,7 @@ class Pool:
|
||||
durationMS=duration,
|
||||
)
|
||||
try:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
yield conn
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
@ -1163,11 +1187,11 @@ class Pool:
|
||||
await self.checkin(conn)
|
||||
raise
|
||||
if conn.pinned_txn:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.__pinned_sockets.add(conn)
|
||||
self.ntxns += 1
|
||||
elif conn.pinned_cursor:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.__pinned_sockets.add(conn)
|
||||
self.ncursors += 1
|
||||
elif conn.active:
|
||||
@ -1231,7 +1255,7 @@ class Pool:
|
||||
"Attempted to check out a connection from closed connection pool"
|
||||
)
|
||||
|
||||
async with self.lock:
|
||||
async with self._operation_count_lock:
|
||||
self.operation_count += 1
|
||||
|
||||
# Get a free socket or create one.
|
||||
@ -1260,9 +1284,9 @@ class Pool:
|
||||
incremented = False
|
||||
emitted_event = False
|
||||
try:
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_sockets += 1
|
||||
incremented = True
|
||||
incremented = True
|
||||
while conn is None:
|
||||
# CMAP: we MUST wait for either maxConnecting OR for a socket
|
||||
# to be checked back into the pool.
|
||||
@ -1280,7 +1304,8 @@ class Pool:
|
||||
self._raise_if_not_ready(checkout_started_time, emit_event=False)
|
||||
|
||||
try:
|
||||
conn = self.conns.popleft()
|
||||
async with self._conns_lock:
|
||||
conn = self.conns.popleft()
|
||||
except IndexError:
|
||||
self._pending += 1
|
||||
if conn: # We got a socket from the pool
|
||||
@ -1301,9 +1326,10 @@ class Pool:
|
||||
await conn.close_conn(ConnectionClosedReason.ERROR)
|
||||
async with self.size_cond:
|
||||
self.requests -= 1
|
||||
if incremented:
|
||||
self.active_sockets -= 1
|
||||
self.size_cond.notify()
|
||||
if incremented:
|
||||
async with self._active_contexts_lock:
|
||||
self.active_sockets -= 1
|
||||
|
||||
if not emitted_event:
|
||||
duration = time.monotonic() - checkout_started_time
|
||||
@ -1338,9 +1364,9 @@ class Pool:
|
||||
conn.active = False
|
||||
conn.pinned_txn = False
|
||||
conn.pinned_cursor = False
|
||||
self.__pinned_sockets.discard(conn)
|
||||
listeners = self.opts._event_listeners
|
||||
async with self.lock:
|
||||
async with self._active_contexts_lock:
|
||||
self.__pinned_sockets.discard(conn)
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
@ -1379,28 +1405,29 @@ class Pool:
|
||||
)
|
||||
else:
|
||||
close_conn = False
|
||||
async with self.lock:
|
||||
# Hold the lock to ensure this section does not race with
|
||||
# Pool.reset().
|
||||
if self.stale_generation(conn.generation, conn.service_id):
|
||||
close_conn = True
|
||||
else:
|
||||
conn.update_last_checkin_time()
|
||||
conn.update_is_writable(bool(self.is_writable))
|
||||
if self.stale_generation(conn.generation, conn.service_id):
|
||||
close_conn = True
|
||||
else:
|
||||
conn.update_last_checkin_time()
|
||||
conn.update_is_writable(bool(self.is_writable))
|
||||
async with self._conns_lock:
|
||||
self.conns.appendleft(conn)
|
||||
async with self._max_connecting_cond:
|
||||
# Notify any threads waiting to create a connection.
|
||||
self._max_connecting_cond.notify()
|
||||
if close_conn:
|
||||
await conn.close_conn(ConnectionClosedReason.STALE)
|
||||
|
||||
async with self.size_cond:
|
||||
async with self._active_contexts_lock:
|
||||
if txn:
|
||||
self.ntxns -= 1
|
||||
elif cursor:
|
||||
self.ncursors -= 1
|
||||
self.requests -= 1
|
||||
self.active_sockets -= 1
|
||||
async with self._operation_count_lock:
|
||||
self.operation_count -= 1
|
||||
async with self.size_cond:
|
||||
self.requests -= 1
|
||||
self.size_cond.notify()
|
||||
|
||||
async def _perished(self, conn: AsyncConnection) -> bool:
|
||||
|
||||
@ -707,6 +707,11 @@ class Pool:
|
||||
:param options: a PoolOptions instance
|
||||
:param is_sdam: whether to call hello for each new Connection
|
||||
"""
|
||||
# Main lock only used to protect updating attributes.
|
||||
# Avoid any additional work while holding the lock.
|
||||
# If looping over an attribute, copy the container and do not take the lock.
|
||||
self.lock = _create_lock()
|
||||
|
||||
if options.pause_enabled:
|
||||
self.state = PoolState.PAUSED
|
||||
else:
|
||||
@ -718,10 +723,9 @@ class Pool:
|
||||
# and returned to pool from the left side. Stale sockets removed
|
||||
# from the right side.
|
||||
self.conns: collections.deque[Connection] = collections.deque()
|
||||
self.active_contexts: set[_CancellationContext] = set()
|
||||
self.lock = _create_lock()
|
||||
self._max_connecting_cond = _create_condition(self.lock)
|
||||
self.active_sockets = 0
|
||||
# This lock should only be contended by threads adding/removing connections.
|
||||
self._conns_lock = _create_lock()
|
||||
|
||||
# Monotonically increasing connection ID required for CMAP Events.
|
||||
self.next_connection_id = 1
|
||||
# Track whether the sockets in this pool are writeable or not.
|
||||
@ -746,16 +750,19 @@ class Pool:
|
||||
# The first portion of the wait queue.
|
||||
# Enforces: maxPoolSize
|
||||
# Also used for: clearing the wait queue
|
||||
self.size_cond = _create_condition(self.lock)
|
||||
self.requests = 0
|
||||
# This lock should only be contended by threads adding/removing self.requests.
|
||||
self.size_cond = _create_condition(_create_lock())
|
||||
self.max_pool_size = self.opts.max_pool_size
|
||||
if not self.max_pool_size:
|
||||
self.max_pool_size = float("inf")
|
||||
|
||||
# The second portion of the wait queue.
|
||||
# Enforces: maxConnecting
|
||||
# Also used for: clearing the wait queue
|
||||
self._max_connecting_cond = _create_condition(self.lock)
|
||||
self._pending = 0
|
||||
# This lock should only be contended by threads adding/removing self._pending.
|
||||
self._max_connecting_cond = _create_condition(_create_lock())
|
||||
self._max_connecting = self.opts.max_connecting
|
||||
self._client_id = client_id
|
||||
# Log before publishing event to prevent potential listener preemption in tests
|
||||
@ -775,29 +782,41 @@ class Pool:
|
||||
)
|
||||
# Similar to active_sockets but includes threads in the wait queue.
|
||||
self.operation_count: int = 0
|
||||
# This lock should be contended on every operation.
|
||||
self._operation_count_lock = _create_lock()
|
||||
|
||||
self.active_contexts: set[_CancellationContext] = set()
|
||||
self.active_sockets = 0
|
||||
# Retain references to pinned connections to prevent the CPython GC
|
||||
# from thinking that a cursor's pinned connection can be GC'd when the
|
||||
# cursor is GC'd (see PYTHON-2751).
|
||||
self.__pinned_sockets: set[Connection] = set()
|
||||
self.ncursors = 0
|
||||
self.ntxns = 0
|
||||
# This lock protects self.active_contexts, self.active_sockets,
|
||||
# self.__pinned_sockets, self.ncursors, and self.ntxns.
|
||||
self._active_contexts_lock = _create_lock()
|
||||
|
||||
def ready(self) -> None:
|
||||
# Take the lock to avoid the race condition described in PYTHON-2699.
|
||||
with self.lock:
|
||||
if self.state != PoolState.READY:
|
||||
state_changed = False
|
||||
if self.state != PoolState.READY:
|
||||
with self.lock:
|
||||
self.state = PoolState.READY
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
message=_ConnectionStatusMessage.POOL_READY,
|
||||
clientId=self._client_id,
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
state_changed = True
|
||||
if not state_changed:
|
||||
return
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
message=_ConnectionStatusMessage.POOL_READY,
|
||||
clientId=self._client_id,
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
@ -811,38 +830,46 @@ class Pool:
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
old_state = self.state
|
||||
with self.size_cond:
|
||||
if self.closed:
|
||||
return
|
||||
if self.closed:
|
||||
return
|
||||
is_fork = False
|
||||
with self.lock:
|
||||
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
|
||||
old_state, self.state = self.state, PoolState.PAUSED
|
||||
self.gen.inc(service_id)
|
||||
newpid = os.getpid()
|
||||
if self.pid != newpid:
|
||||
self.pid = newpid
|
||||
is_fork = True
|
||||
if is_fork:
|
||||
with self._active_contexts_lock:
|
||||
self.active_sockets = 0
|
||||
with self._operation_count_lock:
|
||||
self.operation_count = 0
|
||||
with self._conns_lock:
|
||||
if service_id is None:
|
||||
sockets, self.conns = self.conns, collections.deque()
|
||||
else:
|
||||
discard: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
keep: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
for conn in self.conns:
|
||||
if conn.service_id == service_id:
|
||||
discard.append(conn)
|
||||
else:
|
||||
keep.append(conn)
|
||||
sockets = discard
|
||||
if service_id is not None:
|
||||
discard: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
keep: collections.deque = collections.deque() # type: ignore[type-arg]
|
||||
for conn in self.conns:
|
||||
if conn.service_id == service_id:
|
||||
discard.append(conn)
|
||||
else:
|
||||
keep.append(conn)
|
||||
sockets = discard
|
||||
with self._conns_lock:
|
||||
self.conns = keep
|
||||
|
||||
if close:
|
||||
self.state = PoolState.CLOSED
|
||||
with self.lock:
|
||||
self.state = PoolState.CLOSED
|
||||
# Clear the wait queue
|
||||
self._max_connecting_cond.notify_all()
|
||||
self.size_cond.notify_all()
|
||||
|
||||
if interrupt_connections:
|
||||
for context in self.active_contexts:
|
||||
for context in self.active_contexts.copy():
|
||||
context.cancel()
|
||||
|
||||
listeners = self.opts._event_listeners
|
||||
@ -901,9 +928,8 @@ class Pool:
|
||||
Pool.
|
||||
"""
|
||||
self.is_writable = is_writable
|
||||
with self.lock:
|
||||
for _socket in self.conns:
|
||||
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
|
||||
for _socket in self.conns:
|
||||
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
|
||||
|
||||
def reset(
|
||||
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
|
||||
@ -932,12 +958,9 @@ class Pool:
|
||||
|
||||
if self.opts.max_idle_time_seconds is not None:
|
||||
close_conns = []
|
||||
with self.lock:
|
||||
while (
|
||||
self.conns
|
||||
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
|
||||
):
|
||||
close_conns.append(self.conns.pop())
|
||||
conns = self.conns.copy()
|
||||
while conns and conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds:
|
||||
close_conns.append(conns.pop())
|
||||
if not _IS_SYNC:
|
||||
asyncio.gather(
|
||||
*[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], # type: ignore[func-returns-value]
|
||||
@ -948,12 +971,12 @@ class Pool:
|
||||
conn.close_conn(ConnectionClosedReason.IDLE)
|
||||
|
||||
while True:
|
||||
# There are enough sockets in the pool.
|
||||
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
|
||||
return
|
||||
if self.requests >= self.opts.min_pool_size:
|
||||
return
|
||||
with self.size_cond:
|
||||
# There are enough sockets in the pool.
|
||||
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
|
||||
return
|
||||
if self.requests >= self.opts.min_pool_size:
|
||||
return
|
||||
self.requests += 1
|
||||
incremented = False
|
||||
try:
|
||||
@ -966,13 +989,14 @@ class Pool:
|
||||
incremented = True
|
||||
conn = self.connect()
|
||||
close_conn = False
|
||||
with self.lock:
|
||||
# Close connection and return if the pool was reset during
|
||||
# socket creation or while acquiring the pool lock.
|
||||
if self.gen.get_overall() != reference_generation:
|
||||
close_conn = True
|
||||
if not close_conn:
|
||||
# Close connection and return if the pool was reset during
|
||||
# socket creation or while acquiring the pool lock.
|
||||
if self.gen.get_overall() != reference_generation:
|
||||
close_conn = True
|
||||
if not close_conn:
|
||||
with self._conns_lock:
|
||||
self.conns.appendleft(conn)
|
||||
with self._active_contexts_lock:
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if close_conn:
|
||||
conn.close_conn(ConnectionClosedReason.STALE)
|
||||
@ -1011,11 +1035,11 @@ class Pool:
|
||||
Note that the pool does not keep a reference to the socket -- you
|
||||
must call checkin() when you're done with it.
|
||||
"""
|
||||
with self.lock:
|
||||
# Use a temporary context so that interrupt_connections can cancel creating the socket.
|
||||
tmp_context = _CancellationContext()
|
||||
with self._active_contexts_lock:
|
||||
conn_id = self.next_connection_id
|
||||
self.next_connection_id += 1
|
||||
# Use a temporary context so that interrupt_connections can cancel creating the socket.
|
||||
tmp_context = _CancellationContext()
|
||||
self.active_contexts.add(tmp_context)
|
||||
|
||||
listeners = self.opts._event_listeners
|
||||
@ -1036,7 +1060,7 @@ class Pool:
|
||||
networking_interface = _configured_socket_interface(self.address, self.opts)
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
except BaseException as error:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
@ -1061,7 +1085,7 @@ class Pool:
|
||||
raise
|
||||
|
||||
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
self.active_contexts.discard(tmp_context)
|
||||
if tmp_context.cancelled:
|
||||
@ -1078,7 +1102,7 @@ class Pool:
|
||||
conn.authenticate()
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
except BaseException as e:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if not completed_hello:
|
||||
self._handle_connection_error(e)
|
||||
@ -1140,7 +1164,7 @@ class Pool:
|
||||
durationMS=duration,
|
||||
)
|
||||
try:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
yield conn
|
||||
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
|
||||
@ -1159,11 +1183,11 @@ class Pool:
|
||||
self.checkin(conn)
|
||||
raise
|
||||
if conn.pinned_txn:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.__pinned_sockets.add(conn)
|
||||
self.ntxns += 1
|
||||
elif conn.pinned_cursor:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.__pinned_sockets.add(conn)
|
||||
self.ncursors += 1
|
||||
elif conn.active:
|
||||
@ -1227,7 +1251,7 @@ class Pool:
|
||||
"Attempted to check out a connection from closed connection pool"
|
||||
)
|
||||
|
||||
with self.lock:
|
||||
with self._operation_count_lock:
|
||||
self.operation_count += 1
|
||||
|
||||
# Get a free socket or create one.
|
||||
@ -1256,9 +1280,9 @@ class Pool:
|
||||
incremented = False
|
||||
emitted_event = False
|
||||
try:
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.active_sockets += 1
|
||||
incremented = True
|
||||
incremented = True
|
||||
while conn is None:
|
||||
# CMAP: we MUST wait for either maxConnecting OR for a socket
|
||||
# to be checked back into the pool.
|
||||
@ -1276,7 +1300,8 @@ class Pool:
|
||||
self._raise_if_not_ready(checkout_started_time, emit_event=False)
|
||||
|
||||
try:
|
||||
conn = self.conns.popleft()
|
||||
with self._conns_lock:
|
||||
conn = self.conns.popleft()
|
||||
except IndexError:
|
||||
self._pending += 1
|
||||
if conn: # We got a socket from the pool
|
||||
@ -1297,9 +1322,10 @@ class Pool:
|
||||
conn.close_conn(ConnectionClosedReason.ERROR)
|
||||
with self.size_cond:
|
||||
self.requests -= 1
|
||||
if incremented:
|
||||
self.active_sockets -= 1
|
||||
self.size_cond.notify()
|
||||
if incremented:
|
||||
with self._active_contexts_lock:
|
||||
self.active_sockets -= 1
|
||||
|
||||
if not emitted_event:
|
||||
duration = time.monotonic() - checkout_started_time
|
||||
@ -1334,9 +1360,9 @@ class Pool:
|
||||
conn.active = False
|
||||
conn.pinned_txn = False
|
||||
conn.pinned_cursor = False
|
||||
self.__pinned_sockets.discard(conn)
|
||||
listeners = self.opts._event_listeners
|
||||
with self.lock:
|
||||
with self._active_contexts_lock:
|
||||
self.__pinned_sockets.discard(conn)
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
@ -1375,28 +1401,29 @@ class Pool:
|
||||
)
|
||||
else:
|
||||
close_conn = False
|
||||
with self.lock:
|
||||
# Hold the lock to ensure this section does not race with
|
||||
# Pool.reset().
|
||||
if self.stale_generation(conn.generation, conn.service_id):
|
||||
close_conn = True
|
||||
else:
|
||||
conn.update_last_checkin_time()
|
||||
conn.update_is_writable(bool(self.is_writable))
|
||||
if self.stale_generation(conn.generation, conn.service_id):
|
||||
close_conn = True
|
||||
else:
|
||||
conn.update_last_checkin_time()
|
||||
conn.update_is_writable(bool(self.is_writable))
|
||||
with self._conns_lock:
|
||||
self.conns.appendleft(conn)
|
||||
with self._max_connecting_cond:
|
||||
# Notify any threads waiting to create a connection.
|
||||
self._max_connecting_cond.notify()
|
||||
if close_conn:
|
||||
conn.close_conn(ConnectionClosedReason.STALE)
|
||||
|
||||
with self.size_cond:
|
||||
with self._active_contexts_lock:
|
||||
if txn:
|
||||
self.ntxns -= 1
|
||||
elif cursor:
|
||||
self.ncursors -= 1
|
||||
self.requests -= 1
|
||||
self.active_sockets -= 1
|
||||
with self._operation_count_lock:
|
||||
self.operation_count -= 1
|
||||
with self.size_cond:
|
||||
self.requests -= 1
|
||||
self.size_cond.notify()
|
||||
|
||||
def _perished(self, conn: Connection) -> bool:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user