PYTHON-5821 - Fix ordering issue between event publish and logging for Pool monitoring tests (#2796)
This commit is contained in:
parent
f4219bdca2
commit
8dc7efade2
@ -760,11 +760,7 @@ class Pool:
|
||||
self._pending = 0
|
||||
self._max_connecting = self.opts.max_connecting
|
||||
self._client_id = client_id
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_created(
|
||||
self.address, self.opts.non_default_options
|
||||
)
|
||||
# Log before publishing event to prevent potential listener preemption in tests
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -774,6 +770,11 @@ class Pool:
|
||||
serverPort=self.address[1],
|
||||
**self.opts.non_default_options,
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_created(
|
||||
self.address, self.opts.non_default_options
|
||||
)
|
||||
# Similar to active_sockets but includes threads in the wait queue.
|
||||
self.operation_count: int = 0
|
||||
# Retain references to pinned connections to prevent the CPython GC
|
||||
@ -788,9 +789,6 @@ class Pool:
|
||||
async with self.lock:
|
||||
if self.state != PoolState.READY:
|
||||
self.state = PoolState.READY
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -799,6 +797,9 @@ class Pool:
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
@ -859,9 +860,6 @@ class Pool:
|
||||
else:
|
||||
for conn in sockets:
|
||||
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_closed(self.address)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -870,15 +868,11 @@ class Pool:
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_closed(self.address)
|
||||
else:
|
||||
if old_state != PoolState.PAUSED:
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_cleared(
|
||||
self.address,
|
||||
service_id=service_id,
|
||||
interrupt_connections=interrupt_connections,
|
||||
)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -888,6 +882,13 @@ class Pool:
|
||||
serverPort=self.address[1],
|
||||
serviceId=service_id,
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_cleared(
|
||||
self.address,
|
||||
service_id=service_id,
|
||||
interrupt_connections=interrupt_connections,
|
||||
)
|
||||
if not _IS_SYNC:
|
||||
await asyncio.gather(
|
||||
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]
|
||||
|
||||
@ -758,11 +758,7 @@ class Pool:
|
||||
self._pending = 0
|
||||
self._max_connecting = self.opts.max_connecting
|
||||
self._client_id = client_id
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_created(
|
||||
self.address, self.opts.non_default_options
|
||||
)
|
||||
# Log before publishing event to prevent potential listener preemption in tests
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -772,6 +768,11 @@ class Pool:
|
||||
serverPort=self.address[1],
|
||||
**self.opts.non_default_options,
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_created(
|
||||
self.address, self.opts.non_default_options
|
||||
)
|
||||
# Similar to active_sockets but includes threads in the wait queue.
|
||||
self.operation_count: int = 0
|
||||
# Retain references to pinned connections to prevent the CPython GC
|
||||
@ -786,9 +787,6 @@ class Pool:
|
||||
with self.lock:
|
||||
if self.state != PoolState.READY:
|
||||
self.state = PoolState.READY
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -797,6 +795,9 @@ class Pool:
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert self.opts._event_listeners is not None
|
||||
self.opts._event_listeners.publish_pool_ready(self.address)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
@ -857,9 +858,6 @@ class Pool:
|
||||
else:
|
||||
for conn in sockets:
|
||||
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_closed(self.address)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -868,15 +866,11 @@ class Pool:
|
||||
serverHost=self.address[0],
|
||||
serverPort=self.address[1],
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_closed(self.address)
|
||||
else:
|
||||
if old_state != PoolState.PAUSED:
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_cleared(
|
||||
self.address,
|
||||
service_id=service_id,
|
||||
interrupt_connections=interrupt_connections,
|
||||
)
|
||||
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
|
||||
_debug_log(
|
||||
_CONNECTION_LOGGER,
|
||||
@ -886,6 +880,13 @@ class Pool:
|
||||
serverPort=self.address[1],
|
||||
serviceId=service_id,
|
||||
)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_cleared(
|
||||
self.address,
|
||||
service_id=service_id,
|
||||
interrupt_connections=interrupt_connections,
|
||||
)
|
||||
if not _IS_SYNC:
|
||||
asyncio.gather(
|
||||
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user