PYTHON-3457 & PYTHON-3458 - Improve performance when logging is disabled (#1480)
This commit is contained in:
parent
765b424a08
commit
6022cd005c
@ -14,6 +14,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
@ -21,7 +22,7 @@ from bson import UuidRepresentation, json_util
|
||||
from bson.json_util import JSONOptions, _truncate_documents
|
||||
|
||||
|
||||
class _LogMessageStatus(str, enum.Enum):
|
||||
class _CommandStatusMessage(str, enum.Enum):
|
||||
STARTED = "Command started"
|
||||
SUCCEEDED = "Command succeeded"
|
||||
FAILED = "Command failed"
|
||||
@ -43,6 +44,12 @@ _HELLO_COMMANDS = ["hello", "ismaster", "isMaster"]
|
||||
_REDACTED_FAILURE_FIELDS = ["code", "codeName", "errorLabels"]
|
||||
_DOCUMENT_NAMES = ["command", "reply", "failure"]
|
||||
_JSON_OPTIONS = JSONOptions(uuid_representation=UuidRepresentation.STANDARD)
|
||||
_COMMAND_LOGGER = logging.getLogger("pymongo.command")
|
||||
|
||||
|
||||
def _debug_log(logger: logging.Logger, **fields: Any) -> None:
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(LogMessage(**fields))
|
||||
|
||||
|
||||
class LogMessage:
|
||||
|
||||
@ -22,7 +22,6 @@ MongoDB.
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import struct
|
||||
from io import BytesIO as _BytesIO
|
||||
@ -66,7 +65,7 @@ from pymongo.errors import (
|
||||
)
|
||||
from pymongo.hello import HelloCompat
|
||||
from pymongo.helpers import _handle_reauth
|
||||
from pymongo.logger import LogMessage, _LogMessageStatus
|
||||
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.write_concern import WriteConcern
|
||||
|
||||
@ -1006,23 +1005,20 @@ class _BulkWriteContext:
|
||||
client: MongoClient,
|
||||
) -> Optional[Mapping[str, Any]]:
|
||||
"""A proxy for Connection.unack_write that handles event publishing."""
|
||||
command_logger = logging.getLogger("pymongo.command")
|
||||
# TODO: add serverConnectionId
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.STARTED,
|
||||
command=cmd,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.STARTED,
|
||||
command=cmd,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
if self.publish:
|
||||
cmd = self._start(cmd, request_id, docs)
|
||||
@ -1034,10 +1030,10 @@ class _BulkWriteContext:
|
||||
else:
|
||||
# Comply with APM spec.
|
||||
reply = {"ok": 1}
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.SUCCEEDED,
|
||||
message=_CommandStatusMessage.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=reply,
|
||||
commandName=next(iter(cmd)),
|
||||
@ -1050,7 +1046,6 @@ class _BulkWriteContext:
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
)
|
||||
if self.publish:
|
||||
self._succeed(request_id, reply, duration)
|
||||
except Exception as exc:
|
||||
@ -1061,23 +1056,22 @@ class _BulkWriteContext:
|
||||
failure = exc.details # type: ignore[assignment]
|
||||
else:
|
||||
failure = _convert_exception(exc)
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
if self.publish:
|
||||
assert self.start_time is not None
|
||||
@ -1097,14 +1091,33 @@ class _BulkWriteContext:
|
||||
client: MongoClient,
|
||||
) -> dict[str, Any]:
|
||||
"""A proxy for SocketInfo.write_command that handles event publishing."""
|
||||
command_logger = logging.getLogger("pymongo.command")
|
||||
# TODO: add serverConnectionId
|
||||
cmd[self.field] = docs
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.STARTED,
|
||||
command=cmd,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
if self.publish:
|
||||
self._start(cmd, request_id, docs)
|
||||
try:
|
||||
reply = self.conn.write_command(request_id, msg, self.codec)
|
||||
duration = datetime.datetime.now() - self.start_time
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.STARTED,
|
||||
command=cmd,
|
||||
message=_CommandStatusMessage.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=reply,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
@ -1115,29 +1128,6 @@ class _BulkWriteContext:
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
)
|
||||
if self.publish:
|
||||
self._start(cmd, request_id, docs)
|
||||
try:
|
||||
reply = self.conn.write_command(request_id, msg, self.codec)
|
||||
duration = datetime.datetime.now() - self.start_time
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=reply,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
)
|
||||
)
|
||||
if self.publish:
|
||||
self._succeed(request_id, reply, duration)
|
||||
except Exception as exc:
|
||||
@ -1146,24 +1136,24 @@ class _BulkWriteContext:
|
||||
failure: _DocumentOut = exc.details # type: ignore[assignment]
|
||||
else:
|
||||
failure = _convert_exception(exc)
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=self.db_name,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=self.conn.id,
|
||||
serverConnectionId=self.conn.server_connection_id,
|
||||
serverHost=self.conn.address[0],
|
||||
serverPort=self.conn.address[1],
|
||||
serviceId=self.conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
|
||||
if self.publish:
|
||||
self._fail(request_id, failure, duration)
|
||||
raise
|
||||
|
||||
@ -17,7 +17,6 @@ from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import errno
|
||||
import logging
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
@ -42,7 +41,7 @@ from pymongo.errors import (
|
||||
ProtocolError,
|
||||
_OperationCancelled,
|
||||
)
|
||||
from pymongo.logger import LogMessage, _LogMessageStatus
|
||||
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
|
||||
from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply
|
||||
from pymongo.monitoring import _is_speculative_authenticate
|
||||
from pymongo.socket_checker import _errno_from_exception
|
||||
@ -163,24 +162,21 @@ def command(
|
||||
|
||||
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
|
||||
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
|
||||
command_logger = logging.getLogger("pymongo.command")
|
||||
# TODO: add serverConnectionId
|
||||
if client is not None:
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.STARTED,
|
||||
command=spec,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.STARTED,
|
||||
command=spec,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
if publish:
|
||||
assert listeners is not None
|
||||
@ -224,23 +220,22 @@ def command(
|
||||
else:
|
||||
failure = message._convert_exception(exc)
|
||||
if client is not None:
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
if publish:
|
||||
assert listeners is not None
|
||||
@ -258,23 +253,22 @@ def command(
|
||||
raise
|
||||
duration = datetime.datetime.now() - start
|
||||
if client is not None:
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=response_doc,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
speculative_authenticate="speculativeAuthenticate" in orig,
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=response_doc,
|
||||
commandName=next(iter(spec)),
|
||||
databaseName=dbname,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
speculative_authenticate="speculativeAuthenticate" in orig,
|
||||
)
|
||||
if publish:
|
||||
assert listeners is not None
|
||||
|
||||
@ -15,14 +15,13 @@
|
||||
"""Communicate with one MongoDB server in a topology."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Optional, Union
|
||||
|
||||
from bson import _decode_all_selective
|
||||
from pymongo.errors import NotPrimaryError, OperationFailure
|
||||
from pymongo.helpers import _check_command_response, _handle_reauth
|
||||
from pymongo.logger import LogMessage, _LogMessageStatus
|
||||
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
|
||||
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
|
||||
from pymongo.response import PinnedResponse, Response
|
||||
|
||||
@ -132,23 +131,20 @@ class Server:
|
||||
request_id, data, max_doc_size = self._split_message(message)
|
||||
|
||||
cmd, dbn = operation.as_command(conn)
|
||||
command_logger = logging.getLogger("pymongo.command")
|
||||
# TODO: add serverConnection
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.STARTED,
|
||||
command=cmd,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.STARTED,
|
||||
command=cmd,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
|
||||
if publish:
|
||||
@ -196,23 +192,22 @@ class Server:
|
||||
failure: _DocumentOut = exc.details # type: ignore[assignment]
|
||||
else:
|
||||
failure = _convert_exception(exc)
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.FAILED,
|
||||
durationMS=duration,
|
||||
failure=failure,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
isServerSideError=isinstance(exc, OperationFailure),
|
||||
)
|
||||
if publish:
|
||||
assert listeners is not None
|
||||
@ -240,22 +235,21 @@ class Server:
|
||||
res["cursor"]["firstBatch"] = docs
|
||||
else:
|
||||
res["cursor"]["nextBatch"] = docs
|
||||
command_logger.debug(
|
||||
LogMessage(
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_LogMessageStatus.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=res,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
_debug_log(
|
||||
_COMMAND_LOGGER,
|
||||
clientId=client._topology_settings._topology_id,
|
||||
message=_CommandStatusMessage.SUCCEEDED,
|
||||
durationMS=duration,
|
||||
reply=res,
|
||||
commandName=next(iter(cmd)),
|
||||
databaseName=dbn,
|
||||
requestId=request_id,
|
||||
operationId=request_id,
|
||||
driverConnectionId=conn.id,
|
||||
serverConnectionId=conn.server_connection_id,
|
||||
serverHost=conn.address[0],
|
||||
serverPort=conn.address[1],
|
||||
serviceId=conn.service_id,
|
||||
)
|
||||
if publish:
|
||||
assert listeners is not None
|
||||
|
||||
@ -92,9 +92,6 @@ class UndecipherableInt64Type:
|
||||
# Does not compare equal to integers.
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
class UndecipherableIntDecoder(TypeDecoder):
|
||||
bson_type = Int64
|
||||
|
||||
Loading…
Reference in New Issue
Block a user