PYTHON-4499 Log pymongo.connection at DEBUG without EventListeners (#1703)

This commit is contained in:
Noah Stapp 2024-06-25 13:20:17 -07:00 committed by GitHub
parent 1d9adfa3b9
commit ff1d903bf2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 314 additions and 288 deletions

View File

@ -757,6 +757,7 @@ class Connection:
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.socket_checker: SocketChecker = SocketChecker()
@ -1120,20 +1121,20 @@ class Connection:
await auth.authenticate(creds, self, reauthenticate=reauthenticate)
self.ready = True
duration = time.monotonic() - self.creation_time
if self.enabled_for_cmap:
assert self.listeners is not None
duration = time.monotonic() - self.creation_time
self.listeners.publish_connection_ready(self.address, self.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_READY,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_READY,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
durationMS=duration,
)
def validate_session(
self, client: Optional[AsyncMongoClient], session: Optional[ClientSession]
@ -1153,10 +1154,11 @@ class Connection:
if self.closed:
return
self._close_conn()
if reason and self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_closed(self.address, self.id, reason)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if reason:
if self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_closed(self.address, self.id, reason)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1468,12 +1470,13 @@ class Pool:
self.address = address
self.opts = options
self.handshake = handshake
# Don't publish events in Monitor pools.
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
# The first portion of the wait queue.
# Enforces: maxPoolSize
@ -1495,15 +1498,15 @@ class Pool:
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
**self.opts.non_default_options,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
**self.opts.non_default_options,
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# Retain references to pinned connections to prevent the CPython GC
@ -1521,14 +1524,14 @@ class Pool:
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_READY,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_READY,
serverHost=self.address[0],
serverPort=self.address[1],
)
@property
def closed(self) -> bool:
@ -1586,23 +1589,24 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
)
else:
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1712,15 +1716,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_created(self.address, conn_id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
)
try:
sock = await _configured_socket(self.address, self.opts)
@ -1730,17 +1734,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn_id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if isinstance(error, (IOError, OSError, SSLError)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
@ -1788,31 +1792,31 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_check_out_started(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_STARTED,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_STARTED,
serverHost=self.address[0],
serverPort=self.address[1],
)
conn = await self._get_conn(checkout_started_time, handler=handler)
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert listeners is not None
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_checked_out(self.address, conn.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
durationMS=duration,
)
try:
async with self.lock:
self.active_contexts.add(conn.cancel_context)
@ -1844,13 +1848,14 @@ class Pool:
def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None:
if self.state != PoolState.READY:
if self.enabled_for_cmap and emit_event:
assert self.opts._event_listeners is not None
if emit_event:
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1878,23 +1883,23 @@ class Pool:
await self.reset_without_pause()
if self.closed:
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
durationMS=duration,
)
raise _PoolClosedError(
"Attempted to check out a connection from closed connection pool"
)
@ -1970,13 +1975,14 @@ class Pool:
self.active_sockets -= 1
self.size_cond.notify()
if self.enabled_for_cmap and not emitted_event:
assert self.opts._event_listeners is not None
if not emitted_event:
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -2009,15 +2015,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_checked_in(self.address, conn.id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKEDIN,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKEDIN,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
if self.pid != os.getpid():
await self.reset_without_pause()
else:
@ -2030,17 +2036,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn.id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
else:
async with self.lock:
# Hold the lock to ensure this section does not race with
@ -2102,23 +2108,23 @@ class Pool:
def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn:
listeners = self.opts._event_listeners
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert listeners is not None
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
durationMS=duration,
)
timeout = _csot.get_timeout() or self.opts.wait_queue_timeout
if self.opts.load_balanced:
other_ops = self.active_sockets - self.ncursors - self.ntxns

View File

@ -757,6 +757,7 @@ class Connection:
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.socket_checker: SocketChecker = SocketChecker()
@ -1120,20 +1121,20 @@ class Connection:
auth.authenticate(creds, self, reauthenticate=reauthenticate)
self.ready = True
duration = time.monotonic() - self.creation_time
if self.enabled_for_cmap:
assert self.listeners is not None
duration = time.monotonic() - self.creation_time
self.listeners.publish_connection_ready(self.address, self.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_READY,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_READY,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
durationMS=duration,
)
def validate_session(
self, client: Optional[MongoClient], session: Optional[ClientSession]
@ -1151,10 +1152,11 @@ class Connection:
if self.closed:
return
self._close_conn()
if reason and self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_closed(self.address, self.id, reason)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if reason:
if self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_closed(self.address, self.id, reason)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1464,12 +1466,13 @@ class Pool:
self.address = address
self.opts = options
self.handshake = handshake
# Don't publish events in Monitor pools.
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
# The first portion of the wait queue.
# Enforces: maxPoolSize
@ -1491,15 +1494,15 @@ class Pool:
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
**self.opts.non_default_options,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
**self.opts.non_default_options,
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# Retain references to pinned connections to prevent the CPython GC
@ -1517,14 +1520,14 @@ class Pool:
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_READY,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_READY,
serverHost=self.address[0],
serverPort=self.address[1],
)
@property
def closed(self) -> bool:
@ -1582,23 +1585,24 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
)
else:
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1706,15 +1710,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_created(self.address, conn_id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
)
try:
sock = _configured_socket(self.address, self.opts)
@ -1724,17 +1728,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn_id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if isinstance(error, (IOError, OSError, SSLError)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
@ -1782,31 +1786,31 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_check_out_started(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_STARTED,
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_STARTED,
serverHost=self.address[0],
serverPort=self.address[1],
)
conn = self._get_conn(checkout_started_time, handler=handler)
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert listeners is not None
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_checked_out(self.address, conn.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
durationMS=duration,
)
try:
with self.lock:
self.active_contexts.add(conn.cancel_context)
@ -1838,13 +1842,14 @@ class Pool:
def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None:
if self.state != PoolState.READY:
if self.enabled_for_cmap and emit_event:
assert self.opts._event_listeners is not None
if emit_event:
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -1872,23 +1877,23 @@ class Pool:
self.reset_without_pause()
if self.closed:
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
durationMS=duration,
)
raise _PoolClosedError(
"Attempted to check out a connection from closed connection pool"
)
@ -1964,13 +1969,14 @@ class Pool:
self.active_sockets -= 1
self.size_cond.notify()
if self.enabled_for_cmap and not emitted_event:
assert self.opts._event_listeners is not None
if not emitted_event:
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
@ -2003,15 +2009,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_checked_in(self.address, conn.id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKEDIN,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKEDIN,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
if self.pid != os.getpid():
self.reset_without_pause()
else:
@ -2024,17 +2030,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn.id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
else:
with self.lock:
# Hold the lock to ensure this section does not race with
@ -2096,23 +2102,23 @@ class Pool:
def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn:
listeners = self.opts._event_listeners
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
assert listeners is not None
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
durationMS=duration,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
durationMS=duration,
)
timeout = _csot.get_timeout() or self.opts.wait_queue_timeout
if self.opts.load_balanced:
other_ops = self.active_sockets - self.ncursors - self.ntxns

View File

@ -16,6 +16,7 @@ from __future__ import annotations
import os
from test import unittest
from test.test_client import IntegrationTest
from test.utils import single_client
from unittest.mock import patch
from bson import json_util
@ -82,6 +83,19 @@ class TestLogger(IntegrationTest):
self.assertEqual(last_3_bytes, str_to_repeat)
def test_logging_without_listeners(self):
c = single_client()
self.assertEqual(len(c._event_listeners.event_listeners()), 0)
with self.assertLogs("pymongo.connection", level="DEBUG") as cm:
c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
with self.assertLogs("pymongo.serverSelection", level="DEBUG") as cm:
c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
if __name__ == "__main__":
unittest.main()