This commit is contained in:
Sophia Yang 2026-05-18 10:41:38 -04:00 committed by GitHub
commit 2bbc276caa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 226 additions and 154 deletions

View File

@ -1,6 +1,13 @@
Changelog
=========
Changes in Version 4.18.0 (2026/XX/XX)
--------------------------------------
PyMongo 4.18 brings a number of changes including:
- Improved connection pool throughput under concurrent load by replacing the
single pool lock with fine-grained locks to reduce lock contention.
Changes in Version 4.17.0 (2026/04/20)
--------------------------------------

View File

@ -108,3 +108,4 @@ The following is a list of people who have contributed to
- Steven Silvester (blink1073)
- Noah Stapp (NoahStapp)
- Cal Jacobson (cj81499)
- Sophia Yang (sophiayangDB)

View File

@ -709,6 +709,11 @@ class Pool:
:param options: a PoolOptions instance
:param is_sdam: whether to call hello for each new AsyncConnection
"""
# Main lock only used to protect updating attributes.
# Avoid any additional work while holding the lock.
# If looping over an attribute, copy the container and do not take the lock.
self.lock = _async_create_lock()
if options.pause_enabled:
self.state = PoolState.PAUSED
else:
@ -720,10 +725,9 @@ class Pool:
# and returned to pool from the left side. Stale sockets removed
# from the right side.
self.conns: collections.deque[AsyncConnection] = collections.deque()
self.active_contexts: set[_CancellationContext] = set()
self.lock = _async_create_lock()
self._max_connecting_cond = _async_create_condition(self.lock)
self.active_sockets = 0
# This lock should only be contended by threads adding/removing connections.
self._conns_lock = _async_create_lock()
# Monotonically increasing connection ID required for CMAP Events.
self.next_connection_id = 1
# Track whether the sockets in this pool are writeable or not.
@ -748,16 +752,19 @@ class Pool:
# The first portion of the wait queue.
# Enforces: maxPoolSize
# Also used for: clearing the wait queue
self.size_cond = _async_create_condition(self.lock)
self.requests = 0
# This lock should only be contended by threads adding/removing self.requests.
self.size_cond = _async_create_condition(_async_create_lock())
self.max_pool_size = self.opts.max_pool_size
if not self.max_pool_size:
self.max_pool_size = float("inf")
# The second portion of the wait queue.
# Enforces: maxConnecting
# Also used for: clearing the wait queue
self._max_connecting_cond = _async_create_condition(self.lock)
self._pending = 0
# This lock should only be contended by threads adding/removing self._pending.
self._max_connecting_cond = _async_create_condition(_async_create_lock())
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
# Log before publishing event to prevent potential listener preemption in tests
@ -777,29 +784,41 @@ class Pool:
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# This lock should be contended on every operation.
self._operation_count_lock = _async_create_lock()
self.active_contexts: set[_CancellationContext] = set()
self.active_sockets = 0
# Retain references to pinned connections to prevent the CPython GC
# from thinking that a cursor's pinned connection can be GC'd when the
# cursor is GC'd (see PYTHON-2751).
self.__pinned_sockets: set[AsyncConnection] = set()
self.ncursors = 0
self.ntxns = 0
# This lock protects self.active_contexts, self.active_sockets,
# self.__pinned_sockets, self.ncursors, self.next_connection_id, and self.ntxns.
self._active_contexts_lock = _async_create_lock()
async def ready(self) -> None:
# Take the lock to avoid the race condition described in PYTHON-2699.
async with self.lock:
if self.state != PoolState.READY:
state_changed = False
if self.state != PoolState.READY:
async with self.lock:
self.state = PoolState.READY
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_READY,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
state_changed = True
if not state_changed:
return
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_READY,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
@property
def closed(self) -> bool:
@ -813,39 +832,49 @@ class Pool:
interrupt_connections: bool = False,
) -> None:
old_state = self.state
async with self.size_cond:
if self.closed:
return
if self.closed:
return
is_fork = False
async with self.lock:
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
newpid = os.getpid()
if self.pid != newpid:
self.pid = newpid
is_fork = True
if is_fork:
async with self._active_contexts_lock:
self.active_sockets = 0
async with self._operation_count_lock:
self.operation_count = 0
async with self._conns_lock:
if service_id is None:
sockets, self.conns = self.conns, collections.deque()
else:
discard: collections.deque = collections.deque() # type: ignore[type-arg]
keep: collections.deque = collections.deque() # type: ignore[type-arg]
for conn in self.conns:
if conn.service_id == service_id:
discard.append(conn)
else:
keep.append(conn)
sockets = discard
if service_id is not None:
discard: collections.deque = collections.deque() # type: ignore[type-arg]
keep: collections.deque = collections.deque() # type: ignore[type-arg]
for conn in self.conns:
if conn.service_id == service_id:
discard.append(conn)
else:
keep.append(conn)
sockets = discard
async with self._conns_lock:
self.conns = keep
if close:
if close:
async with self.lock:
self.state = PoolState.CLOSED
# Clear the wait queue
# Clear the wait queue
async with self._max_connecting_cond:
self._max_connecting_cond.notify_all()
async with self.size_cond:
self.size_cond.notify_all()
if interrupt_connections:
for context in self.active_contexts:
context.cancel()
if interrupt_connections:
for context in self.active_contexts.copy():
context.cancel()
listeners = self.opts._event_listeners
# CMAP spec says that close() MUST close sockets before publishing the
@ -903,9 +932,8 @@ class Pool:
Pool.
"""
self.is_writable = is_writable
async with self.lock:
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
async def reset(
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
@ -952,12 +980,12 @@ class Pool:
await conn.close_conn(ConnectionClosedReason.IDLE)
while True:
# There are enough sockets in the pool.
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
return
if self.requests >= self.opts.min_pool_size:
return
async with self.size_cond:
# There are enough sockets in the pool.
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
return
if self.requests >= self.opts.min_pool_size:
return
self.requests += 1
incremented = False
try:
@ -970,13 +998,14 @@ class Pool:
incremented = True
conn = await self.connect()
close_conn = False
async with self.lock:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
close_conn = True
if not close_conn:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
close_conn = True
if not close_conn:
async with self._conns_lock:
self.conns.appendleft(conn)
async with self._active_contexts_lock:
self.active_contexts.discard(conn.cancel_context)
if close_conn:
await conn.close_conn(ConnectionClosedReason.STALE)
@ -1015,11 +1044,11 @@ class Pool:
Note that the pool does not keep a reference to the socket -- you
must call checkin() when you're done with it.
"""
async with self.lock:
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
async with self._active_contexts_lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
self.active_contexts.add(tmp_context)
listeners = self.opts._event_listeners
@ -1040,7 +1069,7 @@ class Pool:
networking_interface = await _configured_protocol_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as error:
async with self.lock:
async with self._active_contexts_lock:
self.active_contexts.discard(tmp_context)
if self.enabled_for_cmap:
assert listeners is not None
@ -1065,7 +1094,7 @@ class Pool:
raise
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
async with self.lock:
async with self._active_contexts_lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
@ -1082,7 +1111,7 @@ class Pool:
await conn.authenticate()
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as e:
async with self.lock:
async with self._active_contexts_lock:
self.active_contexts.discard(conn.cancel_context)
if not completed_hello:
self._handle_connection_error(e)
@ -1144,7 +1173,7 @@ class Pool:
durationMS=duration,
)
try:
async with self.lock:
async with self._active_contexts_lock:
self.active_contexts.add(conn.cancel_context)
yield conn
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@ -1163,11 +1192,11 @@ class Pool:
await self.checkin(conn)
raise
if conn.pinned_txn:
async with self.lock:
async with self._active_contexts_lock:
self.__pinned_sockets.add(conn)
self.ntxns += 1
elif conn.pinned_cursor:
async with self.lock:
async with self._active_contexts_lock:
self.__pinned_sockets.add(conn)
self.ncursors += 1
elif conn.active:
@ -1231,7 +1260,7 @@ class Pool:
"Attempted to check out a connection from closed connection pool"
)
async with self.lock:
async with self._operation_count_lock:
self.operation_count += 1
# Get a free socket or create one.
@ -1260,9 +1289,9 @@ class Pool:
incremented = False
emitted_event = False
try:
async with self.lock:
async with self._active_contexts_lock:
self.active_sockets += 1
incremented = True
incremented = True
while conn is None:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
@ -1280,7 +1309,8 @@ class Pool:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
try:
conn = self.conns.popleft()
async with self._conns_lock:
conn = self.conns.popleft()
except IndexError:
self._pending += 1
if conn: # We got a socket from the pool
@ -1301,9 +1331,10 @@ class Pool:
await conn.close_conn(ConnectionClosedReason.ERROR)
async with self.size_cond:
self.requests -= 1
if incremented:
self.active_sockets -= 1
self.size_cond.notify()
if incremented:
async with self._active_contexts_lock:
self.active_sockets -= 1
if not emitted_event:
duration = time.monotonic() - checkout_started_time
@ -1338,9 +1369,9 @@ class Pool:
conn.active = False
conn.pinned_txn = False
conn.pinned_cursor = False
self.__pinned_sockets.discard(conn)
listeners = self.opts._event_listeners
async with self.lock:
async with self._active_contexts_lock:
self.__pinned_sockets.discard(conn)
self.active_contexts.discard(conn.cancel_context)
if self.enabled_for_cmap:
assert listeners is not None
@ -1379,28 +1410,29 @@ class Pool:
)
else:
close_conn = False
async with self.lock:
# Hold the lock to ensure this section does not race with
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
async with self._conns_lock:
self.conns.appendleft(conn)
async with self._max_connecting_cond:
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()
if close_conn:
await conn.close_conn(ConnectionClosedReason.STALE)
async with self.size_cond:
async with self._active_contexts_lock:
if txn:
self.ntxns -= 1
elif cursor:
self.ncursors -= 1
self.requests -= 1
self.active_sockets -= 1
async with self._operation_count_lock:
self.operation_count -= 1
async with self.size_cond:
self.requests -= 1
self.size_cond.notify()
async def _perished(self, conn: AsyncConnection) -> bool:

View File

@ -707,6 +707,11 @@ class Pool:
:param options: a PoolOptions instance
:param is_sdam: whether to call hello for each new Connection
"""
# Main lock only used to protect updating attributes.
# Avoid any additional work while holding the lock.
# If looping over an attribute, copy the container and do not take the lock.
self.lock = _create_lock()
if options.pause_enabled:
self.state = PoolState.PAUSED
else:
@ -718,10 +723,9 @@ class Pool:
# and returned to pool from the left side. Stale sockets removed
# from the right side.
self.conns: collections.deque[Connection] = collections.deque()
self.active_contexts: set[_CancellationContext] = set()
self.lock = _create_lock()
self._max_connecting_cond = _create_condition(self.lock)
self.active_sockets = 0
# This lock should only be contended by threads adding/removing connections.
self._conns_lock = _create_lock()
# Monotonically increasing connection ID required for CMAP Events.
self.next_connection_id = 1
# Track whether the sockets in this pool are writeable or not.
@ -746,16 +750,19 @@ class Pool:
# The first portion of the wait queue.
# Enforces: maxPoolSize
# Also used for: clearing the wait queue
self.size_cond = _create_condition(self.lock)
self.requests = 0
# This lock should only be contended by threads adding/removing self.requests.
self.size_cond = _create_condition(_create_lock())
self.max_pool_size = self.opts.max_pool_size
if not self.max_pool_size:
self.max_pool_size = float("inf")
# The second portion of the wait queue.
# Enforces: maxConnecting
# Also used for: clearing the wait queue
self._max_connecting_cond = _create_condition(self.lock)
self._pending = 0
# This lock should only be contended by threads adding/removing self._pending.
self._max_connecting_cond = _create_condition(_create_lock())
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
# Log before publishing event to prevent potential listener preemption in tests
@ -775,29 +782,41 @@ class Pool:
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# This lock should be contended on every operation.
self._operation_count_lock = _create_lock()
self.active_contexts: set[_CancellationContext] = set()
self.active_sockets = 0
# Retain references to pinned connections to prevent the CPython GC
# from thinking that a cursor's pinned connection can be GC'd when the
# cursor is GC'd (see PYTHON-2751).
self.__pinned_sockets: set[Connection] = set()
self.ncursors = 0
self.ntxns = 0
# This lock protects self.active_contexts, self.active_sockets,
# self.__pinned_sockets, self.ncursors, self.next_connection_id, and self.ntxns.
self._active_contexts_lock = _create_lock()
def ready(self) -> None:
# Take the lock to avoid the race condition described in PYTHON-2699.
with self.lock:
if self.state != PoolState.READY:
state_changed = False
if self.state != PoolState.READY:
with self.lock:
self.state = PoolState.READY
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_READY,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
state_changed = True
if not state_changed:
return
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
message=_ConnectionStatusMessage.POOL_READY,
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
@property
def closed(self) -> bool:
@ -811,39 +830,49 @@ class Pool:
interrupt_connections: bool = False,
) -> None:
old_state = self.state
with self.size_cond:
if self.closed:
return
if self.closed:
return
is_fork = False
with self.lock:
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
self.gen.inc(service_id)
newpid = os.getpid()
if self.pid != newpid:
self.pid = newpid
is_fork = True
if is_fork:
with self._active_contexts_lock:
self.active_sockets = 0
with self._operation_count_lock:
self.operation_count = 0
with self._conns_lock:
if service_id is None:
sockets, self.conns = self.conns, collections.deque()
else:
discard: collections.deque = collections.deque() # type: ignore[type-arg]
keep: collections.deque = collections.deque() # type: ignore[type-arg]
for conn in self.conns:
if conn.service_id == service_id:
discard.append(conn)
else:
keep.append(conn)
sockets = discard
if service_id is not None:
discard: collections.deque = collections.deque() # type: ignore[type-arg]
keep: collections.deque = collections.deque() # type: ignore[type-arg]
for conn in self.conns:
if conn.service_id == service_id:
discard.append(conn)
else:
keep.append(conn)
sockets = discard
with self._conns_lock:
self.conns = keep
if close:
if close:
with self.lock:
self.state = PoolState.CLOSED
# Clear the wait queue
# Clear the wait queue
with self._max_connecting_cond:
self._max_connecting_cond.notify_all()
with self.size_cond:
self.size_cond.notify_all()
if interrupt_connections:
for context in self.active_contexts:
context.cancel()
if interrupt_connections:
for context in self.active_contexts.copy():
context.cancel()
listeners = self.opts._event_listeners
# CMAP spec says that close() MUST close sockets before publishing the
@ -901,9 +930,8 @@ class Pool:
Pool.
"""
self.is_writable = is_writable
with self.lock:
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
for _socket in self.conns:
_socket.update_is_writable(self.is_writable) # type: ignore[arg-type]
def reset(
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
@ -948,12 +976,12 @@ class Pool:
conn.close_conn(ConnectionClosedReason.IDLE)
while True:
# There are enough sockets in the pool.
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
return
if self.requests >= self.opts.min_pool_size:
return
with self.size_cond:
# There are enough sockets in the pool.
if len(self.conns) + self.active_sockets >= self.opts.min_pool_size:
return
if self.requests >= self.opts.min_pool_size:
return
self.requests += 1
incremented = False
try:
@ -966,13 +994,14 @@ class Pool:
incremented = True
conn = self.connect()
close_conn = False
with self.lock:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
close_conn = True
if not close_conn:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
close_conn = True
if not close_conn:
with self._conns_lock:
self.conns.appendleft(conn)
with self._active_contexts_lock:
self.active_contexts.discard(conn.cancel_context)
if close_conn:
conn.close_conn(ConnectionClosedReason.STALE)
@ -1011,11 +1040,11 @@ class Pool:
Note that the pool does not keep a reference to the socket -- you
must call checkin() when you're done with it.
"""
with self.lock:
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
with self._active_contexts_lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
# Use a temporary context so that interrupt_connections can cancel creating the socket.
tmp_context = _CancellationContext()
self.active_contexts.add(tmp_context)
listeners = self.opts._event_listeners
@ -1036,7 +1065,7 @@ class Pool:
networking_interface = _configured_socket_interface(self.address, self.opts)
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as error:
with self.lock:
with self._active_contexts_lock:
self.active_contexts.discard(tmp_context)
if self.enabled_for_cmap:
assert listeners is not None
@ -1061,7 +1090,7 @@ class Pool:
raise
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
with self.lock:
with self._active_contexts_lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
@ -1078,7 +1107,7 @@ class Pool:
conn.authenticate()
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
except BaseException as e:
with self.lock:
with self._active_contexts_lock:
self.active_contexts.discard(conn.cancel_context)
if not completed_hello:
self._handle_connection_error(e)
@ -1140,7 +1169,7 @@ class Pool:
durationMS=duration,
)
try:
with self.lock:
with self._active_contexts_lock:
self.active_contexts.add(conn.cancel_context)
yield conn
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@ -1159,11 +1188,11 @@ class Pool:
self.checkin(conn)
raise
if conn.pinned_txn:
with self.lock:
with self._active_contexts_lock:
self.__pinned_sockets.add(conn)
self.ntxns += 1
elif conn.pinned_cursor:
with self.lock:
with self._active_contexts_lock:
self.__pinned_sockets.add(conn)
self.ncursors += 1
elif conn.active:
@ -1227,7 +1256,7 @@ class Pool:
"Attempted to check out a connection from closed connection pool"
)
with self.lock:
with self._operation_count_lock:
self.operation_count += 1
# Get a free socket or create one.
@ -1256,9 +1285,9 @@ class Pool:
incremented = False
emitted_event = False
try:
with self.lock:
with self._active_contexts_lock:
self.active_sockets += 1
incremented = True
incremented = True
while conn is None:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
@ -1276,7 +1305,8 @@ class Pool:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
try:
conn = self.conns.popleft()
with self._conns_lock:
conn = self.conns.popleft()
except IndexError:
self._pending += 1
if conn: # We got a socket from the pool
@ -1297,9 +1327,10 @@ class Pool:
conn.close_conn(ConnectionClosedReason.ERROR)
with self.size_cond:
self.requests -= 1
if incremented:
self.active_sockets -= 1
self.size_cond.notify()
if incremented:
with self._active_contexts_lock:
self.active_sockets -= 1
if not emitted_event:
duration = time.monotonic() - checkout_started_time
@ -1334,9 +1365,9 @@ class Pool:
conn.active = False
conn.pinned_txn = False
conn.pinned_cursor = False
self.__pinned_sockets.discard(conn)
listeners = self.opts._event_listeners
with self.lock:
with self._active_contexts_lock:
self.__pinned_sockets.discard(conn)
self.active_contexts.discard(conn.cancel_context)
if self.enabled_for_cmap:
assert listeners is not None
@ -1375,28 +1406,29 @@ class Pool:
)
else:
close_conn = False
with self.lock:
# Hold the lock to ensure this section does not race with
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
if self.stale_generation(conn.generation, conn.service_id):
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
with self._conns_lock:
self.conns.appendleft(conn)
with self._max_connecting_cond:
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()
if close_conn:
conn.close_conn(ConnectionClosedReason.STALE)
with self.size_cond:
with self._active_contexts_lock:
if txn:
self.ntxns -= 1
elif cursor:
self.ncursors -= 1
self.requests -= 1
self.active_sockets -= 1
with self._operation_count_lock:
self.operation_count -= 1
with self.size_cond:
self.requests -= 1
self.size_cond.notify()
def _perished(self, conn: Connection) -> bool: