From 7e8c01f8746b3f1d0c5a0d2c8703f93ab62f3467 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 1 May 2026 19:22:31 -0400 Subject: [PATCH] PYTHON-5676 Phase 2: Extract shared APM/logging helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate the ~13-line _COMMAND_LOGGER/_debug_log boilerplate that was copy-pasted identically across five command execution paths. Add three helpers to pymongo/command_helpers.py and call them from all five sites. - New pymongo/command_helpers.py: _log_command_started, _log_command_succeeded, _log_command_failed — pure functions, no I/O, no async/sync split needed - network.py, server.py, bulk.py, client_bulk.py: replace inline _debug_log blocks with helper calls; remove _COMMAND_LOGGER/_CommandStatusMessage imports where no longer needed --- pymongo/asynchronous/bulk.py | 139 +++++++--------------------- pymongo/asynchronous/client_bulk.py | 139 +++++++--------------------- pymongo/asynchronous/network.py | 82 ++++++---------- pymongo/asynchronous/server.py | 71 ++++---------- pymongo/command_helpers.py | 110 ++++++++++++++++++++++ pymongo/synchronous/bulk.py | 139 +++++++--------------------- pymongo/synchronous/client_bulk.py | 139 +++++++--------------------- pymongo/synchronous/network.py | 82 ++++++---------- pymongo/synchronous/server.py | 71 ++++---------- 9 files changed, 342 insertions(+), 630 deletions(-) create mode 100644 pymongo/command_helpers.py diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 4a54f9eb3..f429daf8b 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -20,7 +20,6 @@ from __future__ import annotations import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -45,6 +44,11 @@ from pymongo.bulk_shared import ( _raise_bulk_write_error, _Run, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -57,7 +61,6 @@ from pymongo.errors import ( OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, @@ -252,44 +255,15 @@ class _AsyncBulk: ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, docs) try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] await client._process_response(reply, bwc.session) # type: ignore[arg-type] @@ -299,24 +273,17 @@ class _AsyncBulk: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -337,22 +304,7 @@ class _AsyncBulk: client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, docs) try: @@ -363,23 +315,9 @@ class _AsyncBulk: else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -390,24 +328,17 @@ class _AsyncBulk: failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 015947d7e..c29bb64c6 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -20,7 +20,6 @@ from __future__ import annotations import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -48,6 +47,11 @@ from pymongo._client_bulk_shared import ( _merge_command, _throw_client_bulk_write_exception, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -63,7 +67,6 @@ from pymongo.errors import ( WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, @@ -239,44 +242,15 @@ class _AsyncClientBulk: """A proxy for AsyncConnection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, op_docs, ns_docs) try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] # Process the response from the server. @@ -287,24 +261,17 @@ class _AsyncClientBulk: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -328,22 +295,7 @@ class _AsyncClientBulk: client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, op_docs, ns_docs) try: @@ -354,23 +306,9 @@ class _AsyncClientBulk: else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -381,24 +319,17 @@ class _AsyncClientBulk: failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 5a5dc7fa2..e1d2e3e6a 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -30,12 +29,16 @@ from typing import ( from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import ( NotPrimaryError, OperationFailure, ) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( @@ -160,22 +163,7 @@ async def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None @@ -222,24 +210,17 @@ async def command( else: failure = message._convert_exception(exc) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + spec, + dbname, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None assert address is not None @@ -256,24 +237,17 @@ async def command( raise duration = datetime.datetime.now() - start if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) + _log_command_succeeded( + client, + conn, + spec, + dbname, + request_id, + request_id, + response_doc, + duration, + "speculativeAuthenticate" in orig, + ) if publish: assert listeners is not None assert address is not None diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index aaf7ef82b..3e37dae24 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -28,12 +28,15 @@ from typing import ( from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) @@ -170,22 +173,7 @@ class Server: message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, cmd, dbn, request_id, request_id) if publish: if "$db" not in cmd: @@ -224,24 +212,17 @@ class Server: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + cmd, + dbn, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None listeners.publish_command_failure( @@ -258,23 +239,7 @@ class Server: duration = datetime.now() - start # Must publish in find / getMore / explain command response format. res = docs[0] - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) if publish: assert listeners is not None listeners.publish_command_success( diff --git a/pymongo/command_helpers.py b/pymongo/command_helpers.py new file mode 100644 index 000000000..b09bae797 --- /dev/null +++ b/pymongo/command_helpers.py @@ -0,0 +1,110 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helpers for command monitoring and logging.""" +from __future__ import annotations + +import datetime +import logging +from typing import Any, Mapping + +from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log + + +def _log_command_started( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.STARTED, + clientId=client._topology_settings._topology_id, + command=cmd, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + ) + + +def _log_command_succeeded( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, + reply: Any, + duration: datetime.timedelta, + speculative_authenticate: bool = False, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.SUCCEEDED, + clientId=client._topology_settings._topology_id, + durationMS=duration, + reply=reply, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + speculative_authenticate=speculative_authenticate, + ) + + +def _log_command_failed( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, + failure: Any, + duration: datetime.timedelta, + is_server_side_error: bool, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.FAILED, + clientId=client._topology_settings._topology_id, + durationMS=duration, + failure=failure, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + isServerSideError=is_server_side_error, + ) diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 22d6a7a76..73929e064 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -20,7 +20,6 @@ from __future__ import annotations import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -43,6 +42,11 @@ from pymongo.bulk_shared import ( _raise_bulk_write_error, _Run, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -55,7 +59,6 @@ from pymongo.errors import ( OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, @@ -252,44 +255,15 @@ class _Bulk: ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, docs) try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] client._process_response(reply, bwc.session) # type: ignore[arg-type] @@ -299,24 +273,17 @@ class _Bulk: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -337,22 +304,7 @@ class _Bulk: client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, docs) try: @@ -363,23 +315,9 @@ class _Bulk: else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -390,24 +328,17 @@ class _Bulk: failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 1134594ae..285701e8f 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -20,7 +20,6 @@ from __future__ import annotations import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -48,6 +47,11 @@ from pymongo._client_bulk_shared import ( _merge_command, _throw_client_bulk_write_exception, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -63,7 +67,6 @@ from pymongo.errors import ( WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, @@ -239,44 +242,15 @@ class _ClientBulk: """A proxy for Connection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, op_docs, ns_docs) try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] # Process the response from the server. @@ -287,24 +261,17 @@ class _ClientBulk: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -328,22 +295,7 @@ class _ClientBulk: client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, op_docs, ns_docs) try: @@ -354,23 +306,9 @@ class _ClientBulk: else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -381,24 +319,17 @@ class _ClientBulk: failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7d9bca4d5..3ab3664e7 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -30,12 +29,16 @@ from typing import ( from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import ( NotPrimaryError, OperationFailure, ) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( @@ -160,22 +163,7 @@ def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None @@ -222,24 +210,17 @@ def command( else: failure = message._convert_exception(exc) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + spec, + dbname, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None assert address is not None @@ -256,24 +237,17 @@ def command( raise duration = datetime.datetime.now() - start if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) + _log_command_succeeded( + client, + conn, + spec, + dbname, + request_id, + request_id, + response_doc, + duration, + "speculativeAuthenticate" in orig, + ) if publish: assert listeners is not None assert address is not None diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 6073117f5..9d033df67 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -27,12 +27,15 @@ from typing import ( ) from bson import _decode_all_selective +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) @@ -170,22 +173,7 @@ class Server: message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, cmd, dbn, request_id, request_id) if publish: if "$db" not in cmd: @@ -224,24 +212,17 @@ class Server: failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + cmd, + dbn, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None listeners.publish_command_failure( @@ -258,23 +239,7 @@ class Server: duration = datetime.now() - start # Must publish in find / getMore / explain command response format. res = docs[0] - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) if publish: assert listeners is not None listeners.publish_command_success(