PYTHON-5676 Phase 2: Extract shared APM/logging helpers

Eliminate the ~13-line _COMMAND_LOGGER/_debug_log boilerplate that was
copy-pasted identically across five command execution paths. Add three
helpers to pymongo/command_helpers.py and call them from all five sites.

- New pymongo/command_helpers.py: _log_command_started, _log_command_succeeded,
  _log_command_failed — pure functions, no I/O, no async/sync split needed
- network.py, server.py, bulk.py, client_bulk.py: replace inline _debug_log
  blocks with helper calls; remove _COMMAND_LOGGER/_CommandStatusMessage
  imports where no longer needed
This commit is contained in:
Jeffrey 'Alex' Clark 2026-05-01 19:22:31 -04:00
parent d6e7a82bdb
commit 7e8c01f874
9 changed files with 342 additions and 630 deletions

View File

@ -20,7 +20,6 @@ from __future__ import annotations
import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
@ -45,6 +44,11 @@ from pymongo.bulk_shared import (
_raise_bulk_write_error,
_Run,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
@ -57,7 +61,6 @@ from pymongo.errors import (
OperationFailure,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_DELETE,
_INSERT,
@ -252,44 +255,15 @@ class _AsyncBulk:
) -> dict[str, Any]:
"""A proxy for SocketInfo.write_command that handles event publishing."""
cmd[bwc.field] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, docs)
try:
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
@ -299,24 +273,17 @@ class _AsyncBulk:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
bwc._fail(request_id, failure, duration)
@ -337,22 +304,7 @@ class _AsyncBulk:
client: AsyncMongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, docs)
try:
@ -363,23 +315,9 @@ class _AsyncBulk:
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
@ -390,24 +328,17 @@ class _AsyncBulk:
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)

View File

@ -20,7 +20,6 @@ from __future__ import annotations
import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
@ -48,6 +47,11 @@ from pymongo._client_bulk_shared import (
_merge_command,
_throw_client_bulk_write_exception,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
@ -63,7 +67,6 @@ from pymongo.errors import (
WaitQueueTimeoutError,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_ClientBulkWriteContext,
_convert_client_bulk_exception,
@ -239,44 +242,15 @@ class _AsyncClientBulk:
"""A proxy for AsyncConnection.write_command that handles event publishing."""
cmd["ops"] = op_docs
cmd["nsInfo"] = ns_docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, op_docs, ns_docs)
try:
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
@ -287,24 +261,17 @@ class _AsyncClientBulk:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
bwc._fail(request_id, failure, duration)
@ -328,22 +295,7 @@ class _AsyncClientBulk:
client: AsyncMongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, op_docs, ns_docs)
try:
@ -354,23 +306,9 @@ class _AsyncClientBulk:
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
@ -381,24 +319,17 @@ class _AsyncClientBulk:
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)

View File

@ -16,7 +16,6 @@
from __future__ import annotations
import datetime
import logging
from typing import (
TYPE_CHECKING,
Any,
@ -30,12 +29,16 @@ from typing import (
from bson import _decode_all_selective
from pymongo import _csot, helpers_shared, message
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.compression_support import _NO_COMPRESSION
from pymongo.errors import (
NotPrimaryError,
OperationFailure,
)
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import _OpMsg
from pymongo.monitoring import _is_speculative_authenticate
from pymongo.network_layer import (
@ -160,22 +163,7 @@ async def command(
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=spec,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_started(client, conn, spec, dbname, request_id, request_id)
if publish:
assert listeners is not None
assert address is not None
@ -222,24 +210,17 @@ async def command(
else:
failure = message._convert_exception(exc)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
conn,
spec,
dbname,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if publish:
assert listeners is not None
assert address is not None
@ -256,24 +237,17 @@ async def command(
raise
duration = datetime.datetime.now() - start
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=response_doc,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
speculative_authenticate="speculativeAuthenticate" in orig,
)
_log_command_succeeded(
client,
conn,
spec,
dbname,
request_id,
request_id,
response_doc,
duration,
"speculativeAuthenticate" in orig,
)
if publish:
assert listeners is not None
assert address is not None

View File

@ -28,12 +28,15 @@ from typing import (
from bson import _decode_all_selective
from pymongo.asynchronous.helpers import _handle_reauth
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.errors import NotPrimaryError, OperationFailure
from pymongo.helpers_shared import _check_command_response
from pymongo.logger import (
_COMMAND_LOGGER,
_SDAM_LOGGER,
_CommandStatusMessage,
_debug_log,
_SDAMStatusMessage,
)
@ -170,22 +173,7 @@ class Server:
message = operation.get_message(read_preference, conn, True)
request_id, data, max_doc_size = self._split_message(message)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_started(client, conn, cmd, dbn, request_id, request_id)
if publish:
if "$db" not in cmd:
@ -224,24 +212,17 @@ class Server:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
conn,
cmd,
dbn,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if publish:
assert listeners is not None
listeners.publish_command_failure(
@ -258,23 +239,7 @@ class Server:
duration = datetime.now() - start
# Must publish in find / getMore / explain command response format.
res = docs[0]
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=res,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration)
if publish:
assert listeners is not None
listeners.publish_command_success(

110
pymongo/command_helpers.py Normal file
View File

@ -0,0 +1,110 @@
# Copyright 2025-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared helpers for command monitoring and logging."""
from __future__ import annotations
import datetime
import logging
from typing import Any, Mapping
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
def _log_command_started(
client: Any,
conn: Any,
cmd: Mapping[str, Any],
dbname: str,
request_id: int,
operation_id: int,
) -> None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=dbname,
requestId=request_id,
operationId=operation_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
def _log_command_succeeded(
client: Any,
conn: Any,
cmd: Mapping[str, Any],
dbname: str,
request_id: int,
operation_id: int,
reply: Any,
duration: datetime.timedelta,
speculative_authenticate: bool = False,
) -> None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=dbname,
requestId=request_id,
operationId=operation_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
speculative_authenticate=speculative_authenticate,
)
def _log_command_failed(
client: Any,
conn: Any,
cmd: Mapping[str, Any],
dbname: str,
request_id: int,
operation_id: int,
failure: Any,
duration: datetime.timedelta,
is_server_side_error: bool,
) -> None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=dbname,
requestId=request_id,
operationId=operation_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=is_server_side_error,
)

View File

@ -20,7 +20,6 @@ from __future__ import annotations
import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
@ -43,6 +42,11 @@ from pymongo.bulk_shared import (
_raise_bulk_write_error,
_Run,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
@ -55,7 +59,6 @@ from pymongo.errors import (
OperationFailure,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_DELETE,
_INSERT,
@ -252,44 +255,15 @@ class _Bulk:
) -> dict[str, Any]:
"""A proxy for SocketInfo.write_command that handles event publishing."""
cmd[bwc.field] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, docs)
try:
reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
client._process_response(reply, bwc.session) # type: ignore[arg-type]
@ -299,24 +273,17 @@ class _Bulk:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
bwc._fail(request_id, failure, duration)
@ -337,22 +304,7 @@ class _Bulk:
client: MongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for Connection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, docs)
try:
@ -363,23 +315,9 @@ class _Bulk:
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
@ -390,24 +328,17 @@ class _Bulk:
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)

View File

@ -20,7 +20,6 @@ from __future__ import annotations
import copy
import datetime
import logging
from collections.abc import MutableMapping
from itertools import islice
from typing import (
@ -48,6 +47,11 @@ from pymongo._client_bulk_shared import (
_merge_command,
_throw_client_bulk_write_exception,
)
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.common import (
validate_is_document_type,
validate_ok_for_replace,
@ -63,7 +67,6 @@ from pymongo.errors import (
WaitQueueTimeoutError,
)
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import (
_ClientBulkWriteContext,
_convert_client_bulk_exception,
@ -239,44 +242,15 @@ class _ClientBulk:
"""A proxy for Connection.write_command that handles event publishing."""
cmd["ops"] = op_docs
cmd["nsInfo"] = ns_docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
bwc._start(cmd, request_id, op_docs, ns_docs)
try:
reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type]
duration = datetime.datetime.now() - bwc.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
@ -287,24 +261,17 @@ class _ClientBulk:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
bwc._fail(request_id, failure, duration)
@ -328,22 +295,7 @@ class _ClientBulk:
client: MongoClient[Any],
) -> Optional[Mapping[str, Any]]:
"""A proxy for Connection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id)
if bwc.publish:
cmd = bwc._start(cmd, request_id, op_docs, ns_docs)
try:
@ -354,23 +306,9 @@ class _ClientBulk:
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
)
_log_command_succeeded(
client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration
)
if bwc.publish:
bwc._succeed(request_id, reply, duration)
except Exception as exc:
@ -381,24 +319,17 @@ class _ClientBulk:
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=bwc.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=bwc.conn.id,
serverConnectionId=bwc.conn.server_connection_id,
serverHost=bwc.conn.address[0],
serverPort=bwc.conn.address[1],
serviceId=bwc.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
bwc.conn,
cmd,
bwc.db_name,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if bwc.publish:
assert bwc.start_time is not None
bwc._fail(request_id, failure, duration)

View File

@ -16,7 +16,6 @@
from __future__ import annotations
import datetime
import logging
from typing import (
TYPE_CHECKING,
Any,
@ -30,12 +29,16 @@ from typing import (
from bson import _decode_all_selective
from pymongo import _csot, helpers_shared, message
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.compression_support import _NO_COMPRESSION
from pymongo.errors import (
NotPrimaryError,
OperationFailure,
)
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import _OpMsg
from pymongo.monitoring import _is_speculative_authenticate
from pymongo.network_layer import (
@ -160,22 +163,7 @@ def command(
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=spec,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_started(client, conn, spec, dbname, request_id, request_id)
if publish:
assert listeners is not None
assert address is not None
@ -222,24 +210,17 @@ def command(
else:
failure = message._convert_exception(exc)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
conn,
spec,
dbname,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if publish:
assert listeners is not None
assert address is not None
@ -256,24 +237,17 @@ def command(
raise
duration = datetime.datetime.now() - start
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=response_doc,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
speculative_authenticate="speculativeAuthenticate" in orig,
)
_log_command_succeeded(
client,
conn,
spec,
dbname,
request_id,
request_id,
response_doc,
duration,
"speculativeAuthenticate" in orig,
)
if publish:
assert listeners is not None
assert address is not None

View File

@ -27,12 +27,15 @@ from typing import (
)
from bson import _decode_all_selective
from pymongo.command_helpers import (
_log_command_failed,
_log_command_started,
_log_command_succeeded,
)
from pymongo.errors import NotPrimaryError, OperationFailure
from pymongo.helpers_shared import _check_command_response
from pymongo.logger import (
_COMMAND_LOGGER,
_SDAM_LOGGER,
_CommandStatusMessage,
_debug_log,
_SDAMStatusMessage,
)
@ -170,22 +173,7 @@ class Server:
message = operation.get_message(read_preference, conn, True)
request_id, data, max_doc_size = self._split_message(message)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.STARTED,
clientId=client._topology_settings._topology_id,
command=cmd,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_started(client, conn, cmd, dbn, request_id, request_id)
if publish:
if "$db" not in cmd:
@ -224,24 +212,17 @@ class Server:
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.FAILED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
_log_command_failed(
client,
conn,
cmd,
dbn,
request_id,
request_id,
failure,
duration,
isinstance(exc, OperationFailure),
)
if publish:
assert listeners is not None
listeners.publish_command_failure(
@ -258,23 +239,7 @@ class Server:
duration = datetime.now() - start
# Must publish in find / getMore / explain command response format.
res = docs[0]
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
message=_CommandStatusMessage.SUCCEEDED,
clientId=client._topology_settings._topology_id,
durationMS=duration,
reply=res,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
_log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration)
if publish:
assert listeners is not None
listeners.publish_command_success(