Compare commits

...

11 Commits

84 changed files with 7109 additions and 184 deletions

View File

@ -98,13 +98,27 @@ do
cpjson client-side-encryption/limits/ client-side-encryption/limits
cpjson client-side-encryption/etc/data client-side-encryption/etc/data
;;
connection-monitoring|connection_monitoring)
cpjson connection-monitoring-and-pooling/tests/cmap-format connection_monitoring
;;
connection-logging|connection_logging)
cpjson connection-monitoring-and-pooling/tests/logging connection_logging
;;
cmap|CMAP|connection-monitoring-and-pooling)
cpjson connection-monitoring-and-pooling/tests cmap
rm $PYMONGO/test/cmap/wait-queue-fairness.json # PYTHON-1873
cpjson connection-monitoring-and-pooling/tests/logging connection_logging
cpjson connection-monitoring-and-pooling/tests/cmap-format connection_monitoring
rm $PYMONGO/test/connection_monitoring/wait-queue-fairness.json # PYTHON-1873
;;
apm|APM|command-monitoring|command_monitoring)
cpjson command-logging-and-monitoring/tests/monitoring command_monitoring
;;
command-logging|command_logging)
cpjson command-logging-and-monitoring/tests/logging command_logging
;;
clam|CLAM|command-logging-and-monitoring|command_logging_and_monitoring)
cpjson command-logging-and-monitoring/tests/logging command_logging
cpjson command-logging-and-monitoring/tests/monitoring command_monitoring
;;
crud|CRUD)
cpjson crud/tests/ crud
;;
@ -151,6 +165,11 @@ do
;;
server-selection|server_selection)
cpjson server-selection/tests/ server_selection
rm -rf $PYMONGO/test/server_selection/logging
cpjson server-selection/tests/logging server_selection_logging
;;
server-selection-logging|server_selection_logging)
cpjson server-selection/tests/logging server_selection_logging
;;
sessions)
cpjson sessions/tests/ sessions

View File

@ -236,6 +236,8 @@ if TYPE_CHECKING:
else:
_BASE_CLASS = CodecOptions
_INT32_MAX = 2**31
class JSONOptions(_BASE_CLASS):
json_mode: int
@ -893,7 +895,7 @@ def _encode_regex(obj: Any, json_options: JSONOptions) -> dict:
def _encode_int(obj: int, json_options: JSONOptions) -> Any:
if json_options.json_mode == JSONMode.CANONICAL:
if -(2**31) <= obj < 2**31:
if -_INT32_MAX <= obj < _INT32_MAX:
return {"$numberInt": str(obj)}
return {"$numberLong": str(obj)}
return obj
@ -1041,3 +1043,119 @@ def default(obj: Any, json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any:
return func(obj, json_options)
raise TypeError("%r is not JSON serializable" % obj)
def _get_str_size(obj: Any) -> int:
return len(obj)
def _get_datetime_size(obj: datetime.datetime) -> int:
return 5 + len(str(obj.time()))
def _get_regex_size(obj: Regex) -> int:
return 18 + len(obj.pattern)
def _get_dbref_size(obj: DBRef) -> int:
return 34 + len(obj.collection)
_CONSTANT_SIZE_TABLE: dict[Any, int] = {
ObjectId: 28,
int: 11,
Int64: 11,
Decimal128: 11,
Timestamp: 14,
MinKey: 8,
MaxKey: 8,
}
_VARIABLE_SIZE_TABLE: dict[Any, Callable[[Any], int]] = {
str: _get_str_size,
bytes: _get_str_size,
datetime.datetime: _get_datetime_size,
Regex: _get_regex_size,
DBRef: _get_dbref_size,
}
def get_size(obj: Any, max_size: int, current_size: int = 0) -> int:
"""Recursively finds size of objects"""
if current_size >= max_size:
return current_size
obj_type = type(obj)
# Check to see if the obj has a constant size estimate
try:
return _CONSTANT_SIZE_TABLE[obj_type]
except KeyError:
pass
# Check to see if the obj has a variable but simple size estimate
try:
return _VARIABLE_SIZE_TABLE[obj_type](obj)
except KeyError:
pass
# Special cases that require recursion
if obj_type == Code:
if obj.scope:
current_size += (
5 + get_size(obj.scope, max_size, current_size) + len(obj) - len(obj.scope)
)
else:
current_size += 5 + len(obj)
elif obj_type == dict:
for k, v in obj.items():
current_size += get_size(k, max_size, current_size)
current_size += get_size(v, max_size, current_size)
if current_size >= max_size:
return current_size
elif hasattr(obj, "__iter__"):
for i in obj:
current_size += get_size(i, max_size, current_size)
if current_size >= max_size:
return current_size
return current_size
def _truncate_documents(obj: Any, max_length: int) -> Tuple[Any, int]:
"""Recursively truncate documents as needed to fit inside max_length characters."""
if max_length <= 0:
return None, 0
remaining = max_length
if hasattr(obj, "items"):
truncated: Any = {}
for k, v in obj.items():
truncated_v, remaining = _truncate_documents(v, remaining)
if truncated_v:
truncated[k] = truncated_v
if remaining <= 0:
break
return truncated, remaining
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)):
truncated: Any = [] # type:ignore[no-redef]
for v in obj:
truncated_v, remaining = _truncate_documents(v, remaining)
if truncated_v:
truncated.append(truncated_v)
if remaining <= 0:
break
return truncated, remaining
else:
return _truncate(obj, remaining)
def _truncate(obj: Any, remaining: int) -> Tuple[Any, int]:
size = get_size(obj, remaining)
if size <= remaining:
return obj, remaining - size
else:
try:
truncated = obj[:remaining]
except TypeError:
truncated = obj
return truncated, remaining - size

View File

@ -27,6 +27,7 @@ MongoDB, you can start it like so:
gevent
gridfs
high_availability
logging
mod_wsgi
network_compression
server_selection

63
doc/examples/logging.rst Normal file
View File

@ -0,0 +1,63 @@
Logging
========
Starting in 4.8, **PyMongo** supports `Python's native logging library <https://docs.python.org/3/howto/logging.html>`_,
enabling developers to customize the verbosity of log messages for their applications.
Components
-------------
There are currently three different **PyMongo** components with logging support: ``pymongo.command``, ``pymongo.connection``, and ``pymongo.serverSelection``.
These components deal with command operations, connection management, and server selection, respectively.
Each can be configured separately or they can all be configured together.
Configuration
-------------
Currently, the above components each support ``DEBUG`` logging. To enable a single component, do the following::
import logging
logging.getLogger('pymongo.<componentName>').setLevel(logging.DEBUG)
For example, to enable command logging::
import logging
logging.getLogger('pymongo.command').setLevel(logging.DEBUG)
You can also enable all ``DEBUG`` logs at once::
import logging
logging.getLogger('pymongo').setLevel(logging.DEBUG)
Truncation
-------------
When ``pymongo.command`` debug logs are enabled, every command sent to the server and every response sent back will be included as part of the logs.
By default, these command and response documents are truncated after 1000 bytes.
You can configure a higher truncation limit by setting the ``MONGOB_LOG_MAX_DOCUMENT_LENGTH`` environment variable to your desired length.
Note that by default, only sensitive authentication command contents are redacted.
All commands containing user data will be logged, including the actual contents of your queries.
To prevent this behavior, set ``MONGOB_LOG_MAX_DOCUMENT_LENGTH`` to 0. This will omit the command and response bodies from the logs.
Example
-------------
Here's a simple example that enables ``pymongo.command`` debug logs and performs two database operations::
import logging
import pymongo
# Automatically writes all logs to stdout
logging.basicConfig()
logging.getLogger('pymongo.command').setLevel(logging.DEBUG)
client = pymongo.MongoClient()
client.db.test.insert_one({"x": 1})
client.db.test.find_one({"x": 1})
---------------------------------
DEBUG:pymongo.command:{"clientId": {"$oid": "65cbe82614be1fc2beb4e4a9"}, "message": "Command started", "command": "{\"insert\": \"test\", \"ordered\": true, \"lsid\": {\"id\": {\"$binary\": {\"base64\": \"GI7ubVhPSsWd7+OwHEFx6Q==\", \"subType\": \"04\"}}}, \"$db\": \"db\", \"documents\": [{\"x\": 1, \"_id\": {\"$oid\": \"65cbe82614be1fc2beb4e4aa\"}}]}", "commandName": "insert", "databaseName": "db", "requestId": 1144108930, "operationId": 1144108930, "driverConnectionId": 1, "serverConnectionId": 3554, "serverHost": "localhost", "serverPort": 27017}
DEBUG:pymongo.command:{"clientId": {"$oid": "65cbe82614be1fc2beb4e4a9"}, "message": "Command succeeded", "durationMS": 0.515, "reply": "{\"n\": 1, \"ok\": 1.0}", "commandName": "insert", "databaseName": "db", "requestId": 1144108930, "operationId": 1144108930, "driverConnectionId": 1, "serverConnectionId": 3554, "serverHost": "localhost", "serverPort": 27017}
DEBUG:pymongo.command:{"clientId": {"$oid": "65cbe82614be1fc2beb4e4a9"}, "message": "Command started", "command": "{\"find\": \"test\", \"filter\": {\"x\": 1}, \"limit\": 1, \"singleBatch\": true, \"lsid\": {\"id\": {\"$binary\": {\"base64\": \"GI7ubVhPSsWd7+OwHEFx6Q==\", \"subType\": \"04\"}}}, \"$db\": \"db\"}", "commandName": "find", "databaseName": "db", "requestId": 470211272, "operationId": 470211272, "driverConnectionId": 1, "serverConnectionId": 3554, "serverHost": "localhost", "serverPort": 27017}
DEBUG:pymongo.command:{"clientId": {"$oid": "65cbe82614be1fc2beb4e4a9"}, "message": "Command succeeded", "durationMS": 0.621, "reply": "{\"cursor\": {\"firstBatch\": [{\"_id\": {\"$oid\": \"65cbdf391a957ed280001417\"}, \"x\": 1}], \"ns\": \"db.test\"}, \"ok\": 1.0}", "commandName": "find", "databaseName": "db", "requestId": 470211272, "operationId": 470211272, "driverConnectionId": 1, "serverConnectionId": 3554, "serverHost": "localhost", "serverPort": 27017}

View File

@ -31,6 +31,9 @@ everything you need to know to use **PyMongo**.
:doc:`examples/type_hints`
Using PyMongo with type hints.
:doc:`examples/logging`
Using PyMongo's logging capabilities.
:doc:`faq`
Some questions that come up often.

View File

@ -408,6 +408,7 @@ class _Bulk:
generator: Iterator[Any],
write_concern: WriteConcern,
session: Optional[ClientSession],
operation: str,
) -> dict[str, Any]:
"""Execute using write commands."""
# nModified is only reported for write commands, not legacy ops.
@ -437,7 +438,14 @@ class _Bulk:
)
client = self.collection.database.client
client._retryable_write(self.is_retryable, retryable_bulk, session, bulk=self)
client._retryable_write(
self.is_retryable,
retryable_bulk,
session,
operation,
bulk=self,
operation_id=op_id,
)
if full_result["writeErrors"] or full_result["writeConcernErrors"]:
_raise_bulk_write_error(full_result)
@ -547,7 +555,12 @@ class _Bulk:
return self.execute_command_no_results(conn, generator, write_concern)
return self.execute_op_msg_no_results(conn, generator)
def execute(self, write_concern: WriteConcern, session: Optional[ClientSession]) -> Any:
def execute(
self,
write_concern: WriteConcern,
session: Optional[ClientSession],
operation: str,
) -> Any:
"""Execute operations."""
if not self.ops:
raise InvalidOperation("No operations to execute")
@ -564,8 +577,8 @@ class _Bulk:
client = self.collection.database.client
if not write_concern.acknowledged:
with client._conn_for_writes(session) as connection:
with client._conn_for_writes(session, operation) as connection:
self.execute_no_results(connection, generator, write_concern)
return None
else:
return self.execute_command(generator, write_concern, session)
return self.execute_command(generator, write_concern, session, operation)

View File

@ -36,6 +36,7 @@ from pymongo.errors import (
OperationFailure,
PyMongoError,
)
from pymongo.operations import _Op
from pymongo.typings import _CollationIn, _DocumentType, _Pipeline
# The change streams spec considers the following server errors from the
@ -244,7 +245,10 @@ class ChangeStream(Generic[_DocumentType]):
comment=self._comment,
)
return self._client._retryable_read(
cmd.get_cursor, self._target._read_preference_for(session), session
cmd.get_cursor,
self._target._read_preference_for(session),
session,
operation=_Op.AGGREGATE,
)
def _create_cursor(self) -> CommandCursor:

View File

@ -166,6 +166,7 @@ from pymongo.errors import (
WTimeoutError,
)
from pymongo.helpers import _RETRYABLE_ERROR_CODES
from pymongo.operations import _Op
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.server_type import SERVER_TYPE
@ -843,7 +844,7 @@ class ClientSession:
) -> dict[str, Any]:
return self._finish_transaction(conn, command_name)
return self._client._retry_internal(func, self, None, retryable=True)
return self._client._retry_internal(func, self, None, retryable=True, operation=_Op.ABORT)
def _finish_transaction(self, conn: Connection, command_name: str) -> dict[str, Any]:
self._transaction.attempt += 1

View File

@ -70,6 +70,7 @@ from pymongo.operations import (
UpdateOne,
_IndexKeyHint,
_IndexList,
_Op,
)
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.results import (
@ -252,13 +253,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
else:
self.__create(name, kwargs, collation, session)
def _conn_for_reads(
self, session: ClientSession
) -> ContextManager[tuple[Connection, _ServerMode]]:
return self.__database.client._conn_for_reads(self._read_preference_for(session), session)
def _conn_for_writes(self, session: Optional[ClientSession]) -> ContextManager[Connection]:
return self.__database.client._conn_for_writes(session)
def _conn_for_writes(
self, session: Optional[ClientSession], operation: str
) -> ContextManager[Connection]:
return self.__database.client._conn_for_writes(session, operation)
def _command(
self,
@ -336,7 +334,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
if "size" in options:
options["size"] = float(options["size"])
cmd.update(options)
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.CREATE) as conn:
if qev2_required and conn.max_wire_version < 21:
raise ConfigurationError(
"Driver support of Queryable Encryption is incompatible with server. "
@ -558,7 +556,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
raise TypeError(f"{request!r} is not a valid request") from None
write_concern = self._write_concern_for(session)
bulk_api_result = blk.execute(write_concern, session)
bulk_api_result = blk.execute(write_concern, session, _Op.INSERT)
if bulk_api_result is not None:
return BulkWriteResult(bulk_api_result, True)
return BulkWriteResult({}, False)
@ -598,7 +596,9 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
_check_write_command_response(result)
self.__database.client._retryable_write(acknowledged, _insert_command, session)
self.__database.client._retryable_write(
acknowledged, _insert_command, session, operation=_Op.INSERT
)
if not isinstance(doc, RawBSONDocument):
return doc.get("_id")
@ -740,7 +740,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
write_concern = self._write_concern_for(session)
blk = _Bulk(self, ordered, bypass_document_validation, comment=comment)
blk.ops = list(gen())
blk.execute(write_concern, session=session)
blk.execute(write_concern, session, _Op.INSERT)
return InsertManyResult(inserted_ids, write_concern.acknowledged)
def _update(
@ -832,6 +832,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
self,
criteria: Mapping[str, Any],
document: Union[Mapping[str, Any], _Pipeline],
operation: str,
upsert: bool = False,
multi: bool = False,
write_concern: Optional[WriteConcern] = None,
@ -870,7 +871,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
)
return self.__database.client._retryable_write(
(write_concern or self.write_concern).acknowledged and not multi, _update, session
(write_concern or self.write_concern).acknowledged and not multi,
_update,
session,
operation,
)
def replace_one(
@ -962,6 +966,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
self._update_retryable(
filter,
replacement,
_Op.UPDATE,
upsert,
write_concern=write_concern,
bypass_doc_val=bypass_document_validation,
@ -1073,6 +1078,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
self._update_retryable(
filter,
update,
_Op.UPDATE,
upsert,
write_concern=write_concern,
bypass_doc_val=bypass_document_validation,
@ -1172,6 +1178,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
self._update_retryable(
filter,
update,
_Op.UPDATE,
upsert,
multi=True,
write_concern=write_concern,
@ -1319,7 +1326,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
)
return self.__database.client._retryable_write(
(write_concern or self.write_concern).acknowledged and not multi, _delete, session
(write_concern or self.write_concern).acknowledged and not multi,
_delete,
session,
operation=_Op.DELETE,
)
def delete_one(
@ -1798,7 +1808,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.update(kwargs)
return self._count_cmd(session, conn, read_preference, cmd, collation=None)
return self._retryable_non_cursor_read(_cmd, None)
return self._retryable_non_cursor_read(_cmd, None, operation=_Op.COUNT)
def count_documents(
self,
@ -1887,17 +1897,18 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
return 0
return result["n"]
return self._retryable_non_cursor_read(_cmd, session)
return self._retryable_non_cursor_read(_cmd, session, _Op.COUNT)
def _retryable_non_cursor_read(
self,
func: Callable[[Optional[ClientSession], Server, Connection, Optional[_ServerMode]], T],
session: Optional[ClientSession],
operation: str,
) -> T:
"""Non-cursor read helper to handle implicit session creation."""
client = self.__database.client
with client._tmp_session(session) as s:
return client._retryable_read(func, self._read_preference_for(s), s)
return client._retryable_read(func, self._read_preference_for(s), s, operation)
def create_indexes(
self,
@ -1960,7 +1971,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
command (like maxTimeMS) can be passed as keyword arguments.
"""
names = []
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.CREATE_INDEXES) as conn:
supports_quorum = conn.max_wire_version >= 9
def gen_indexes() -> Iterator[Mapping[str, Any]]:
@ -2200,7 +2211,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.DROP_INDEXES) as conn:
self._command(
conn,
cmd,
@ -2277,7 +2288,9 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
return cmd_cursor
with self.__database.client._tmp_session(session, False) as s:
return self.__database.client._retryable_read(_cmd, read_pref, s)
return self.__database.client._retryable_read(
_cmd, read_pref, s, operation=_Op.LIST_INDEXES
)
def index_information(
self,
@ -2367,6 +2380,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.get_read_preference(session), # type: ignore[arg-type]
session,
retryable=not cmd._performs_write,
operation=_Op.LIST_SEARCH_INDEX,
)
def create_search_index(
@ -2435,7 +2449,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd = {"createSearchIndexes": self.name, "indexes": list(gen_indexes())}
cmd.update(kwargs)
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.CREATE_SEARCH_INDEXES) as conn:
resp = self._command(
conn,
cmd,
@ -2469,7 +2483,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.DROP_SEARCH_INDEXES) as conn:
self._command(
conn,
cmd,
@ -2505,7 +2519,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.UPDATE_SEARCH_INDEX) as conn:
self._command(
conn,
cmd,
@ -2589,6 +2603,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd.get_read_preference(session), # type: ignore[arg-type]
session,
retryable=not cmd._performs_write,
operation=_Op.AGGREGATE,
)
def aggregate(
@ -2925,7 +2940,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
cmd["comment"] = comment
write_concern = self._write_concern_for_cmd(cmd, session)
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.RENAME) as conn:
with self.__database.client._tmp_session(session) as s:
return conn.command(
"admin",
@ -3006,7 +3021,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
user_fields={"values": 1},
)["values"]
return self._retryable_non_cursor_read(_cmd, session)
return self._retryable_non_cursor_read(_cmd, session, operation=_Op.DISTINCT)
def _write_concern_for_cmd(
self, cmd: Mapping[str, Any], session: Optional[ClientSession]
@ -3090,7 +3105,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
return out.get("value")
return self.__database.client._retryable_write(
write_concern.acknowledged, _find_and_modify, session
write_concern.acknowledged,
_find_and_modify,
session,
operation=_Op.FIND_AND_MODIFY,
)
def find_one_and_delete(

View File

@ -41,6 +41,7 @@ from pymongo.collection import Collection
from pymongo.command_cursor import CommandCursor
from pymongo.common import _ecoc_coll_name, _esc_coll_name
from pymongo.errors import CollectionInvalid, InvalidName, InvalidOperation
from pymongo.operations import _Op
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
@ -546,6 +547,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
cmd.get_read_preference(s), # type: ignore[arg-type]
s,
retryable=not cmd._performs_write,
operation=_Op.AGGREGATE,
)
def watch(
@ -881,9 +883,14 @@ class Database(common.BaseObject, Generic[_DocumentType]):
if comment is not None:
kwargs["comment"] = comment
if isinstance(command, str):
command_name = command
else:
command_name = next(iter(command))
if read_preference is None:
read_preference = (session and session._txn_read_preference()) or ReadPreference.PRIMARY
with self.__client._conn_for_reads(read_preference, session) as (
with self.__client._conn_for_reads(read_preference, session, operation=command_name) as (
connection,
read_preference,
):
@ -959,6 +966,11 @@ class Database(common.BaseObject, Generic[_DocumentType]):
.. seealso:: The MongoDB documentation on `commands <https://dochub.mongodb.org/core/commands>`_.
"""
if isinstance(command, str):
command_name = command
else:
command_name = next(iter(command))
with self.__client._tmp_session(session, close=False) as tmp_session:
opts = codec_options or DEFAULT_CODEC_OPTIONS
@ -966,7 +978,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
read_preference = (
tmp_session and tmp_session._txn_read_preference()
) or ReadPreference.PRIMARY
with self.__client._conn_for_reads(read_preference, tmp_session) as (
with self.__client._conn_for_reads(read_preference, tmp_session, command_name) as (
conn,
read_preference,
):
@ -1000,6 +1012,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
def _retryable_read_command(
self,
command: Union[str, MutableMapping[str, Any]],
operation: str,
session: Optional[ClientSession] = None,
) -> dict[str, Any]:
"""Same as command but used for retryable read commands."""
@ -1018,7 +1031,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
session=session,
)
return self.__client._retryable_read(_cmd, read_preference, session)
return self.__client._retryable_read(_cmd, read_preference, session, operation)
def _list_collections(
self,
@ -1089,7 +1102,9 @@ class Database(common.BaseObject, Generic[_DocumentType]):
) -> CommandCursor[MutableMapping[str, Any]]:
return self._list_collections(conn, session, read_preference=read_preference, **kwargs)
return self.__client._retryable_read(_cmd, read_pref, session)
return self.__client._retryable_read(
_cmd, read_pref, session, operation=_Op.LIST_COLLECTIONS
)
def list_collection_names(
self,
@ -1145,7 +1160,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
if comment is not None:
command["comment"] = comment
with self.__client._conn_for_writes(session) as connection:
with self.__client._conn_for_writes(session, operation=_Op.DROP) as connection:
return self._command(
connection,
command,

155
pymongo/logger.py Normal file
View File

@ -0,0 +1,155 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import enum
import logging
import os
from typing import Any
from bson import UuidRepresentation, json_util
from bson.json_util import JSONOptions, _truncate_documents
from pymongo.monitoring import ConnectionCheckOutFailedReason, ConnectionClosedReason
class _CommandStatusMessage(str, enum.Enum):
STARTED = "Command started"
SUCCEEDED = "Command succeeded"
FAILED = "Command failed"
class _ServerSelectionStatusMessage(str, enum.Enum):
STARTED = "Server selection started"
SUCCEEDED = "Server selection succeeded"
FAILED = "Server selection failed"
WAITING = "Waiting for suitable server to become available"
class _ConnectionStatusMessage(str, enum.Enum):
POOL_CREATED = "Connection pool created"
POOL_READY = "Connection pool ready"
POOL_CLOSED = "Connection pool closed"
POOL_CLEARED = "Connection pool cleared"
CONN_CREATED = "Connection created"
CONN_READY = "Connection ready"
CONN_CLOSED = "Connection closed"
CHECKOUT_STARTED = "Connection checkout started"
CHECKOUT_SUCCEEDED = "Connection checked out"
CHECKOUT_FAILED = "Connection checkout failed"
CHECKEDIN = "Connection checked in"
_DEFAULT_DOCUMENT_LENGTH = 1000
_SENSITIVE_COMMANDS = [
"authenticate",
"saslStart",
"saslContinue",
"getnonce",
"createUser",
"updateUser",
"copydbgetnonce",
"copydbsaslstart",
"copydb",
]
_HELLO_COMMANDS = ["hello", "ismaster", "isMaster"]
_REDACTED_FAILURE_FIELDS = ["code", "codeName", "errorLabels"]
_DOCUMENT_NAMES = ["command", "reply", "failure"]
_JSON_OPTIONS = JSONOptions(uuid_representation=UuidRepresentation.STANDARD)
_COMMAND_LOGGER = logging.getLogger("pymongo.command")
_CONNECTION_LOGGER = logging.getLogger("pymongo.connection")
_SERVER_SELECTION_LOGGER = logging.getLogger("pymongo.serverSelection")
_VERBOSE_CONNECTION_ERROR_REASONS = {
ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed",
ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed",
ConnectionClosedReason.STALE: "Connection pool was stale",
ConnectionClosedReason.ERROR: "An error occurred while using the connection",
ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection",
ConnectionClosedReason.IDLE: "Connection was idle too long",
ConnectionCheckOutFailedReason.TIMEOUT: "Connection exceeded the specified timeout",
}
def _debug_log(logger: logging.Logger, **fields: Any) -> None:
logger.debug(LogMessage(**fields))
def _verbose_connection_error_reason(reason: str) -> str:
return _VERBOSE_CONNECTION_ERROR_REASONS.get(reason, reason)
def _info_log(logger: logging.Logger, **fields: Any) -> None:
logger.info(LogMessage(**fields))
class LogMessage:
__slots__ = ["_kwargs"]
def __init__(self, **kwargs: Any):
self._kwargs = {k: v for k, v in kwargs.items() if v is not None}
if "durationMS" in self._kwargs:
self._kwargs["durationMS"] = self._kwargs["durationMS"].total_seconds() * 1000
if "serviceId" in self._kwargs and self._kwargs["serviceId"] is None:
del self._kwargs["serviceId"]
def __str__(self) -> str:
self._redact()
return "%s" % (
json_util.dumps(
self._kwargs, json_options=_JSON_OPTIONS, default=lambda o: o.__repr__()
)
)
def _is_sensitive(self, doc_name: str) -> bool:
is_speculative_authenticate = (
self._kwargs.pop("speculative_authenticate", False)
or "speculativeAuthenticate" in self._kwargs[doc_name]
)
is_sensitive_command = (
"commandName" in self._kwargs and self._kwargs["commandName"] in _SENSITIVE_COMMANDS
)
is_sensitive_hello = (
self._kwargs["commandName"] in _HELLO_COMMANDS and is_speculative_authenticate
)
return is_sensitive_command or is_sensitive_hello
def _redact(self) -> None:
document_length = int(os.getenv("MONGOB_LOG_MAX_DOCUMENT_LENGTH", _DEFAULT_DOCUMENT_LENGTH))
if document_length < 0:
document_length = _DEFAULT_DOCUMENT_LENGTH
is_server_side_error = self._kwargs.pop("isServerSideError", False)
for doc_name in _DOCUMENT_NAMES:
doc = self._kwargs.get(doc_name)
if doc:
if doc_name == "failure" and is_server_side_error:
doc = {k: v for k, v in doc.items() if k in _REDACTED_FAILURE_FIELDS}
if doc_name != "failure" and self._is_sensitive(doc_name):
doc = json_util.dumps({})
else:
truncated_doc = _truncate_documents(doc, document_length)[0]
doc = json_util.dumps(
truncated_doc,
json_options=_JSON_OPTIONS,
default=lambda o: o.__repr__(),
)
if len(doc) > document_length:
doc = (
doc.encode()[:document_length].decode("unicode-escape", "ignore")
) + "..."
self._kwargs[doc_name] = doc

View File

@ -22,6 +22,7 @@ MongoDB.
from __future__ import annotations
import datetime
import logging
import random
import struct
from io import BytesIO as _BytesIO
@ -65,6 +66,7 @@ from pymongo.errors import (
)
from pymongo.hello import HelloCompat
from pymongo.helpers import _handle_reauth
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
@ -934,7 +936,7 @@ class _BulkWriteContext:
self.publish = listeners.enabled_for_commands
self.name = cmd_name
self.field = _FIELD_MAP[self.name]
self.start_time = datetime.datetime.now() if self.publish else None
self.start_time = datetime.datetime.now()
self.session = session
self.compress = bool(conn.compression_context)
self.op_type = op_type
@ -955,7 +957,7 @@ class _BulkWriteContext:
self, cmd: MutableMapping[str, Any], docs: list[Mapping[str, Any]], client: MongoClient
) -> tuple[Mapping[str, Any], list[Mapping[str, Any]]]:
request_id, msg, to_send = self.__batch_command(cmd, docs)
result = self.write_command(cmd, request_id, msg, to_send)
result = self.write_command(cmd, request_id, msg, to_send, client)
client._process_response(result, self.session)
return result, to_send
@ -968,7 +970,7 @@ class _BulkWriteContext:
# without receiving a result. Send 0 for max_doc_size
# to disable size checking. Size checking is handled while
# the documents are encoded to BSON.
self.unack_write(cmd, request_id, msg, 0, to_send)
self.unack_write(cmd, request_id, msg, 0, to_send, client)
return to_send
@property
@ -1001,33 +1003,82 @@ class _BulkWriteContext:
msg: bytes,
max_doc_size: int,
docs: list[Mapping[str, Any]],
client: MongoClient,
) -> Optional[Mapping[str, Any]]:
"""A proxy for Connection.unack_write that handles event publishing."""
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.STARTED,
command=cmd,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
)
if self.publish:
assert self.start_time is not None
duration = datetime.datetime.now() - self.start_time
cmd = self._start(cmd, request_id, docs)
start = datetime.datetime.now()
try:
result = self.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value]
duration = datetime.datetime.now() - self.start_time
if result is not None:
reply = _convert_write_result(self.name, cmd, result)
else:
# Comply with APM spec.
reply = {"ok": 1}
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.SUCCEEDED,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
if result is not None:
reply = _convert_write_result(self.name, cmd, result)
else:
# Comply with APM spec.
reply = {"ok": 1}
self._succeed(request_id, reply, duration)
except Exception as exc:
duration = datetime.datetime.now() - self.start_time
if isinstance(exc, OperationFailure):
failure: _DocumentOut = _convert_write_result(self.name, cmd, exc.details) # type: ignore[arg-type]
elif isinstance(exc, NotPrimaryError):
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.FAILED,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if self.publish:
assert self.start_time is not None
duration = (datetime.datetime.now() - start) + duration
if isinstance(exc, OperationFailure):
failure: _DocumentOut = _convert_write_result(self.name, cmd, exc.details) # type: ignore[arg-type]
elif isinstance(exc, NotPrimaryError):
failure = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
self._fail(request_id, failure, duration)
raise
finally:
@ -1041,25 +1092,76 @@ class _BulkWriteContext:
request_id: int,
msg: bytes,
docs: list[Mapping[str, Any]],
client: MongoClient,
) -> dict[str, Any]:
"""A proxy for SocketInfo.write_command that handles event publishing."""
cmd[self.field] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.STARTED,
command=cmd,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
)
if self.publish:
assert self.start_time is not None
duration = datetime.datetime.now() - self.start_time
self._start(cmd, request_id, docs)
start = datetime.datetime.now()
try:
reply = self.conn.write_command(request_id, msg, self.codec)
duration = datetime.datetime.now() - self.start_time
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.SUCCEEDED,
durationMS=duration,
reply=reply,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(request_id, reply, duration)
except Exception as exc:
duration = datetime.datetime.now() - self.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.FAILED,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=self.db_name,
requestId=request_id,
operationId=request_id,
driverConnectionId=self.conn.id,
serverConnectionId=self.conn.server_connection_id,
serverHost=self.conn.address[0],
serverPort=self.conn.address[1],
serviceId=self.conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
self._fail(request_id, failure, duration)
raise
finally:

View File

@ -86,6 +86,7 @@ from pymongo.errors import (
)
from pymongo.lock import _HAS_REGISTER_AT_FORK, _create_lock, _release_locks
from pymongo.monitoring import ConnectionClosedReason
from pymongo.operations import _Op
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.server_selectors import writable_server_selector
from pymongo.server_type import SERVER_TYPE
@ -912,7 +913,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
the server may change. In such cases, store a local reference to a
ServerDescription first, then use its properties.
"""
server = self._topology.select_server(writable_server_selector)
server = self._topology.select_server(writable_server_selector, _Op.TEST)
return getattr(server.description, attr_name)
@ -1181,7 +1182,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
try:
# Use Connection.command directly to avoid implicitly creating
# another session.
with self._conn_for_reads(ReadPreference.PRIMARY_PREFERRED, None) as (
with self._conn_for_reads(
ReadPreference.PRIMARY_PREFERRED, None, operation=_Op.END_SESSIONS
) as (
conn,
read_pref,
):
@ -1270,8 +1273,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
self,
server_selector: Callable[[Selection], Selection],
session: Optional[ClientSession],
operation: str,
address: Optional[_Address] = None,
deprioritized_servers: Optional[list[Server]] = None,
operation_id: Optional[int] = None,
) -> Server:
"""Select a server to run an operation on this client.
@ -1279,6 +1284,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
not pinned and no address is given.
:param session: The ClientSession for the next operation, or None. May
be pinned to a mongos server address.
:param operation: The name of the operation that the server is being selected for.
:param address: Address when sending a message
to a specific server, used for getMore.
"""
@ -1290,12 +1296,17 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
address = session._pinned_address
if address:
# We're running a getMore or this session is pinned to a mongos.
server = topology.select_server_by_address(address)
server = topology.select_server_by_address(
address, operation, operation_id=operation_id
)
if not server:
raise AutoReconnect("server %s:%s no longer available" % address) # noqa: UP031
else:
server = topology.select_server(
server_selector, deprioritized_servers=deprioritized_servers
server_selector,
operation,
deprioritized_servers=deprioritized_servers,
operation_id=operation_id,
)
return server
except PyMongoError as exc:
@ -1305,8 +1316,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
session._unpin()
raise
def _conn_for_writes(self, session: Optional[ClientSession]) -> ContextManager[Connection]:
server = self._select_server(writable_server_selector, session)
def _conn_for_writes(
self, session: Optional[ClientSession], operation: str
) -> ContextManager[Connection]:
server = self._select_server(writable_server_selector, session, operation)
return self._checkout(server, session)
@contextlib.contextmanager
@ -1335,11 +1348,14 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
yield conn, read_preference
def _conn_for_reads(
self, read_preference: _ServerMode, session: Optional[ClientSession]
self,
read_preference: _ServerMode,
session: Optional[ClientSession],
operation: str,
) -> ContextManager[tuple[Connection, _ServerMode]]:
assert read_preference is not None, "read_preference must not be None"
_ = self._get_topology()
server = self._select_server(read_preference, session)
server = self._select_server(read_preference, session, operation)
return self._conn_from_server(read_preference, server, session)
def _should_pin_cursor(self, session: Optional[ClientSession]) -> Optional[bool]:
@ -1361,7 +1377,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
"""
if operation.conn_mgr:
server = self._select_server(
operation.read_preference, operation.session, address=address
operation.read_preference,
operation.session,
operation.name,
address=address,
)
with operation.conn_mgr.lock:
@ -1373,6 +1392,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
operation.read_preference,
self._event_listeners,
unpack_res,
self,
)
def _cmd(
@ -1383,7 +1403,12 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
) -> Response:
operation.reset() # Reset op in case of retry.
return server.run_operation(
conn, operation, read_preference, self._event_listeners, unpack_res
conn,
operation,
read_preference,
self._event_listeners,
unpack_res,
self,
)
return self._retryable_read(
@ -1392,6 +1417,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
operation.session,
address=address,
retryable=isinstance(operation, message._Query),
operation=operation.name,
)
def _retry_with_session(
@ -1400,6 +1426,8 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
func: _WriteCall[T],
session: Optional[ClientSession],
bulk: Optional[_Bulk],
operation: str,
operation_id: Optional[int] = None,
) -> T:
"""Execute an operation with at most one consecutive retries
@ -1417,7 +1445,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
func=func,
session=session,
bulk=bulk,
operation=operation,
retryable=retryable,
operation_id=operation_id,
)
@_csot.apply
@ -1426,16 +1456,19 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
func: _WriteCall[T] | _ReadCall[T],
session: Optional[ClientSession],
bulk: Optional[_Bulk],
operation: str,
is_read: bool = False,
address: Optional[_Address] = None,
read_pref: Optional[_ServerMode] = None,
retryable: bool = False,
operation_id: Optional[int] = None,
) -> T:
"""Internal retryable helper for all client transactions.
:param func: Callback function we want to retry
:param session: Client Session on which the transaction should occur
:param bulk: Abstraction to handle bulk write operations
:param operation: The name of the operation that the server is being selected for
:param is_read: If this is an exclusive read transaction, defaults to False
:param address: Server Address, defaults to None
:param read_pref: Topology of read operation, defaults to None
@ -1447,11 +1480,13 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
mongo_client=self,
func=func,
bulk=bulk,
operation=operation,
is_read=is_read,
session=session,
read_pref=read_pref,
address=address,
retryable=retryable,
operation_id=operation_id,
).run()
def _retryable_read(
@ -1459,8 +1494,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
func: _ReadCall[T],
read_pref: _ServerMode,
session: Optional[ClientSession],
operation: str,
address: Optional[_Address] = None,
retryable: bool = True,
operation_id: Optional[int] = None,
) -> T:
"""Execute an operation with consecutive retries if possible
@ -1472,6 +1509,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
:param func: Read call we want to execute
:param read_pref: Desired topology of read operation
:param session: Client session we should use to execute operation
:param operation: The name of the operation that the server is being selected for
:param address: Optional address when sending a message, defaults to None
:param retryable: if we should attempt retries
(may not always be supported even if supplied), defaults to False
@ -1486,10 +1524,12 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
func,
session,
None,
operation,
is_read=True,
address=address,
read_pref=read_pref,
retryable=retryable,
operation_id=operation_id,
)
def _retryable_write(
@ -1497,7 +1537,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
retryable: bool,
func: _WriteCall[T],
session: Optional[ClientSession],
operation: str,
bulk: Optional[_Bulk] = None,
operation_id: Optional[int] = None,
) -> T:
"""Execute an operation with consecutive retries if possible
@ -1509,10 +1551,11 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
:param retryable: if we should attempt retries (may not always be supported)
:param func: write call we want to execute during a session
:param session: Client session we will use to execute write operation
:param operation: The name of the operation that the server is being selected for
:param bulk: bulk abstraction to execute operations in bulk, defaults to None
"""
with self._tmp_session(session) as s:
return self._retry_with_session(retryable, func, s, bulk)
return self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
def __eq__(self, other: Any) -> bool:
if isinstance(other, self.__class__):
@ -1674,10 +1717,10 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
if address:
# address could be a tuple or _CursorAddress, but
# select_server_by_address needs (host, port).
server = topology.select_server_by_address(tuple(address)) # type: ignore[arg-type]
server = topology.select_server_by_address(tuple(address), _Op.KILL_CURSORS) # type: ignore[arg-type]
else:
# Application called close_cursor() with no address.
server = topology.select_server(writable_server_selector)
server = topology.select_server(writable_server_selector, _Op.KILL_CURSORS)
with self._checkout(server, session) as conn:
assert address is not None
@ -1898,7 +1941,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
if comment is not None:
cmd["comment"] = comment
admin = self._database_default_options("admin")
res = admin._retryable_read_command(cmd, session=session)
res = admin._retryable_read_command(cmd, session=session, operation=_Op.LIST_DATABASES)
# listDatabases doesn't return a cursor (yet). Fake one.
cursor = {
"id": 0,
@ -1967,7 +2010,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
if not isinstance(name, str):
raise TypeError("name_or_database must be an instance of str or a Database")
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Op.DROP_DATABASE) as conn:
self[name]._command(
conn,
{"dropDatabase": 1, "comment": comment},
@ -2241,11 +2284,13 @@ class _ClientConnectionRetryable(Generic[T]):
mongo_client: MongoClient,
func: _WriteCall[T] | _ReadCall[T],
bulk: Optional[_Bulk],
operation: str,
is_read: bool = False,
session: Optional[ClientSession] = None,
read_pref: Optional[_ServerMode] = None,
address: Optional[_Address] = None,
retryable: bool = False,
operation_id: Optional[int] = None,
):
self._last_error: Optional[Exception] = None
self._retrying = False
@ -2264,6 +2309,8 @@ class _ClientConnectionRetryable(Generic[T]):
self._address = address
self._server: Server = None # type: ignore
self._deprioritized_servers: list[Server] = []
self._operation = operation
self._operation_id = operation_id
def run(self) -> T:
"""Runs the supplied func() and attempts a retry
@ -2372,8 +2419,10 @@ class _ClientConnectionRetryable(Generic[T]):
return self._client._select_server(
self._server_selector,
self._session,
self._operation,
address=self._address,
deprioritized_servers=self._deprioritized_servers,
operation_id=self._operation_id,
)
def _write(self) -> T:

View File

@ -17,6 +17,7 @@ from __future__ import annotations
import datetime
import errno
import logging
import socket
import struct
import time
@ -41,6 +42,7 @@ from pymongo.errors import (
ProtocolError,
_OperationCancelled,
)
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply
from pymongo.monitoring import _is_speculative_authenticate
from pymongo.socket_checker import _errno_from_exception
@ -129,8 +131,8 @@ def command(
spec["collation"] = collation
publish = listeners is not None and listeners.enabled_for_commands
start = datetime.datetime.now()
if publish:
start = datetime.datetime.now()
speculative_hello = _is_speculative_authenticate(name, spec)
if compression_ctx and name.lower() in _NO_COMPRESSION:
@ -161,9 +163,24 @@ def command(
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.STARTED,
command=spec,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
if publish:
encoding_duration = datetime.datetime.now() - start
assert listeners is not None
assert address is not None
listeners.publish_command_start(
@ -174,7 +191,6 @@ def command(
conn.server_connection_id,
service_id=conn.service_id,
)
start = datetime.datetime.now()
try:
conn.conn.sendall(msg)
@ -200,12 +216,31 @@ def command(
parse_write_concern_error=parse_write_concern_error,
)
except Exception as exc:
duration = datetime.datetime.now() - start
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = message._convert_exception(exc)
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.FAILED,
durationMS=duration,
failure=failure,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = message._convert_exception(exc)
assert listeners is not None
assert address is not None
listeners.publish_command_failure(
@ -219,8 +254,27 @@ def command(
database_name=dbname,
)
raise
duration = datetime.datetime.now() - start
if client is not None:
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.SUCCEEDED,
durationMS=duration,
reply=response_doc,
commandName=next(iter(spec)),
databaseName=dbname,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
speculative_authenticate="speculativeAuthenticate" in orig,
)
if publish:
duration = (datetime.datetime.now() - start) + encoding_duration
assert listeners is not None
assert address is not None
listeners.publish_command_success(

View File

@ -15,6 +15,7 @@
"""Operation class definitions."""
from __future__ import annotations
import enum
from typing import (
TYPE_CHECKING,
Any,
@ -44,6 +45,37 @@ _IndexList = Union[
_IndexKeyHint = Union[str, _IndexList]
class _Op(str, enum.Enum):
ABORT = "abortTransaction"
AGGREGATE = "aggregate"
COMMIT = "commitTransaction"
COUNT = "count"
CREATE = "create"
CREATE_INDEXES = "createIndexes"
CREATE_SEARCH_INDEXES = "createSearchIndexes"
DELETE = "delete"
DISTINCT = "distinct"
DROP = "drop"
DROP_DATABASE = "dropDatabase"
DROP_INDEXES = "dropIndexes"
DROP_SEARCH_INDEXES = "dropSearchIndexes"
END_SESSIONS = "endSessions"
FIND_AND_MODIFY = "findAndModify"
FIND = "find"
INSERT = "insert"
LIST_COLLECTIONS = "listCollections"
LIST_INDEXES = "listIndexes"
LIST_SEARCH_INDEX = "listSearchIndexes"
LIST_DATABASES = "listDatabases"
UPDATE = "update"
UPDATE_INDEX = "updateIndex"
UPDATE_SEARCH_INDEX = "updateSearchIndex"
RENAME = "rename"
GETMORE = "getMore"
KILL_CURSORS = "killCursors"
TEST = "testOperation"
class InsertOne(Generic[_DocumentType]):
"""Represents an insert_one operation."""

View File

@ -17,6 +17,7 @@ from __future__ import annotations
import collections
import contextlib
import copy
import logging
import os
import platform
import socket
@ -71,6 +72,12 @@ from pymongo.errors import ( # type:ignore[attr-defined]
from pymongo.hello import Hello, HelloCompat
from pymongo.helpers import _handle_reauth
from pymongo.lock import _create_lock
from pymongo.logger import (
_CONNECTION_LOGGER,
_ConnectionStatusMessage,
_debug_log,
_verbose_connection_error_reason,
)
from pymongo.monitoring import (
ConnectionCheckOutFailedReason,
ConnectionClosedReason,
@ -114,7 +121,7 @@ try:
except ImportError:
# Windows, various platforms we don't claim to support
# (Jython, IronPython, ...), systems that don't provide
# (Jython, IronPython, ..), systems that don't provide
# everything we need from fcntl, etc.
def _set_non_inheritable_non_atomic(fd: int) -> None: # noqa: ARG001
"""Dummy function for platforms that don't provide fcntl."""
@ -752,6 +759,7 @@ class Connection:
self.active = False
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -1086,6 +1094,15 @@ class Connection:
if self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_ready(self.address, self.id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_READY,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
)
def validate_session(
self, client: Optional[MongoClient], session: Optional[ClientSession]
@ -1106,6 +1123,17 @@ class Connection:
if reason and self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_closed(self.address, self.id, reason)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
reason=_verbose_connection_error_reason(reason),
error=reason,
)
def _close_conn(self) -> None:
"""Close this connection."""
@ -1160,7 +1188,7 @@ class Connection:
# main thread.
#
# But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
# ...) is called in Python code, which experiences the signal as a
# ..) is called in Python code, which experiences the signal as a
# KeyboardInterrupt from the start, rather than as an initial
# socket.error, so we catch that, close the socket, and reraise it.
#
@ -1360,7 +1388,13 @@ class PoolState:
# Do *not* explicitly inherit from object or Jython won't call __del__
# http://bugs.jython.org/issue1057
class Pool:
def __init__(self, address: _Address, options: PoolOptions, handshake: bool = True):
def __init__(
self,
address: _Address,
options: PoolOptions,
handshake: bool = True,
client_id: Optional[ObjectId] = None,
):
"""
:param address: a (hostname, port) tuple
:param options: a PoolOptions instance
@ -1414,13 +1448,23 @@ class Pool:
self._max_connecting_cond = threading.Condition(self.lock)
self._max_connecting = self.opts.max_connecting
self._pending = 0
self._client_id = client_id
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
**self.opts.non_default_options,
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count = 0
self.operation_count: int = 0
# Retain references to pinned connections to prevent the CPython GC
# from thinking that a cursor's pinned connection can be GC'd when the
# cursor is GC'd (see PYTHON-2751).
@ -1436,6 +1480,14 @@ class Pool:
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_READY,
serverHost=self.address[0],
serverPort=self.address[1],
)
@property
def closed(self) -> bool:
@ -1493,6 +1545,14 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
)
else:
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
assert listeners is not None
@ -1501,6 +1561,15 @@ class Pool:
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.POOL_CLEARED,
serverHost=self.address[0],
serverPort=self.address[1],
serviceId=service_id,
)
for conn in sockets:
conn.close_conn(ConnectionClosedReason.STALE)
@ -1600,6 +1669,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_created(self.address, conn_id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CREATED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
)
try:
sock = _configured_socket(self.address, self.opts)
@ -1609,7 +1687,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn_id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if isinstance(error, (IOError, OSError, SSLError)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
@ -1654,12 +1742,29 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_check_out_started(self.address)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_STARTED,
serverHost=self.address[0],
serverPort=self.address[1],
)
conn = self._get_conn(handler=handler)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_checked_out(self.address, conn.id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
try:
with self.lock:
self.active_contexts.add(conn.cancel_context)
@ -1696,6 +1801,17 @@ class Pool:
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="An error occurred while trying to establish a new connection",
error=ConnectionCheckOutFailedReason.CONN_ERROR,
)
details = _get_timeout_details(self.opts)
_raise_connection_failure(
self.address, AutoReconnect("connection pool paused"), timeout_details=details
@ -1715,6 +1831,16 @@ class Pool:
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
)
raise _PoolClosedError(
"Attempted to check out a connection from closed connection pool"
)
@ -1795,6 +1921,16 @@ class Pool:
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="An error occurred while trying to establish a new connection",
error=ConnectionCheckOutFailedReason.CONN_ERROR,
)
raise
conn.active = True
@ -1817,6 +1953,15 @@ class Pool:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_checked_in(self.address, conn.id)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKEDIN,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
)
if self.pid != os.getpid():
self.reset_without_pause()
else:
@ -1829,6 +1974,17 @@ class Pool:
listeners.publish_connection_closed(
self.address, conn.id, ConnectionClosedReason.ERROR
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CONN_CLOSED,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
else:
with self.lock:
# Hold the lock to ensure this section does not race with
@ -1895,6 +2051,16 @@ class Pool:
listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.TIMEOUT
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
clientId=self._client_id,
message=_ConnectionStatusMessage.CHECKOUT_FAILED,
serverHost=self.address[0],
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
)
timeout = _csot.get_timeout() or self.opts.wait_queue_timeout
if self.opts.load_balanced:
other_ops = self.active_sockets - self.ncursors - self.ntxns

View File

@ -15,12 +15,14 @@
"""Communicate with one MongoDB server in a topology."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Optional, Union
from bson import _decode_all_selective
from pymongo.errors import NotPrimaryError, OperationFailure
from pymongo.helpers import _check_command_response, _handle_reauth
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query
from pymongo.response import PinnedResponse, Response
@ -29,7 +31,7 @@ if TYPE_CHECKING:
from weakref import ReferenceType
from bson.objectid import ObjectId
from pymongo.mongo_client import _MongoClientErrorHandler
from pymongo.mongo_client import MongoClient, _MongoClientErrorHandler
from pymongo.monitor import Monitor
from pymongo.monitoring import _EventListeners
from pymongo.pool import Connection, Pool
@ -102,6 +104,7 @@ class Server:
read_preference: _ServerMode,
listeners: Optional[_EventListeners],
unpack_res: Callable[..., list[_DocumentOut]],
client: MongoClient,
) -> Response:
"""Run a _Query or _GetMore operation and return a Response object.
@ -118,8 +121,7 @@ class Server:
duration = None
assert listeners is not None
publish = listeners.enabled_for_commands
if publish:
start = datetime.now()
start = datetime.now()
use_cmd = operation.use_command(conn)
more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come
@ -129,6 +131,24 @@ class Server:
message = operation.get_message(read_preference, conn, use_cmd)
request_id, data, max_doc_size = self._split_message(message)
cmd, dbn = operation.as_command(conn)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.STARTED,
command=cmd,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
if publish:
cmd, dbn = operation.as_command(conn)
if "$db" not in cmd:
@ -142,7 +162,6 @@ class Server:
conn.server_connection_id,
service_id=conn.service_id,
)
start = datetime.now()
try:
if more_to_come:
@ -170,12 +189,30 @@ class Server:
operation.client._process_response(first, operation.session)
_check_command_response(first, conn.max_wire_version)
except Exception as exc:
duration = datetime.now() - start
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.FAILED,
durationMS=duration,
failure=failure,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
isServerSideError=isinstance(exc, OperationFailure),
)
if publish:
duration = datetime.now() - start
if isinstance(exc, (NotPrimaryError, OperationFailure)):
failure: _DocumentOut = exc.details # type: ignore[assignment]
else:
failure = _convert_exception(exc)
assert listeners is not None
listeners.publish_command_failure(
duration,
@ -188,21 +225,37 @@ class Server:
database_name=dbn,
)
raise
if publish:
duration = datetime.now() - start
# Must publish in find / getMore / explain command response
# format.
if use_cmd:
res: _DocumentOut = docs[0]
elif operation.name == "explain":
res = docs[0] if docs else {}
duration = datetime.now() - start
# Must publish in find / getMore / explain command response
# format.
if use_cmd:
res = docs[0]
elif operation.name == "explain":
res = docs[0] if docs else {}
else:
res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr]
if operation.name == "find":
res["cursor"]["firstBatch"] = docs
else:
res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr]
if operation.name == "find":
res["cursor"]["firstBatch"] = docs
else:
res["cursor"]["nextBatch"] = docs
res["cursor"]["nextBatch"] = docs
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_COMMAND_LOGGER,
clientId=client._topology_settings._topology_id,
message=_CommandStatusMessage.SUCCEEDED,
durationMS=duration,
reply=res,
commandName=next(iter(cmd)),
databaseName=dbn,
requestId=request_id,
operationId=request_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=conn.address[0],
serverPort=conn.address[1],
serviceId=conn.service_id,
)
if publish:
assert listeners is not None
listeners.publish_command_success(
duration,

View File

@ -16,6 +16,7 @@
from __future__ import annotations
import logging
import os
import queue
import random
@ -38,6 +39,12 @@ from pymongo.errors import (
)
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import (
_SERVER_SELECTION_LOGGER,
_debug_log,
_info_log,
_ServerSelectionStatusMessage,
)
from pymongo.monitor import SrvMonitor
from pymongo.pool import Pool, PoolOptions
from pymongo.server import Server
@ -208,13 +215,16 @@ class Topology:
def select_servers(
self,
selector: Callable[[Selection], Selection],
operation: str,
server_selection_timeout: Optional[float] = None,
address: Optional[_Address] = None,
operation_id: Optional[int] = None,
) -> list[Server]:
"""Return a list of Servers matching selector, or time out.
:param selector: function that takes a list of Servers and returns
a subset of them.
:param operation: The name of the operation that the server is being selected for.
:param server_selection_timeout: maximum seconds to wait.
If not provided, the default value common.SERVER_SELECTION_TIMEOUT
is used.
@ -231,7 +241,9 @@ class Topology:
server_timeout = server_selection_timeout
with self._lock:
server_descriptions = self._select_servers_loop(selector, server_timeout, address)
server_descriptions = self._select_servers_loop(
selector, server_timeout, operation, operation_id, address
)
return [
cast(Server, self.get_server_by_address(sd.address)) for sd in server_descriptions
@ -241,11 +253,26 @@ class Topology:
self,
selector: Callable[[Selection], Selection],
timeout: float,
operation: str,
operation_id: Optional[int],
address: Optional[_Address],
) -> list[ServerDescription]:
"""select_servers() guts. Hold the lock when calling this."""
now = time.monotonic()
end_time = now + timeout
logged_waiting = False
if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SERVER_SELECTION_LOGGER,
message=_ServerSelectionStatusMessage.STARTED,
selector=selector,
operation=operation,
operationId=operation_id,
topologyDescription=self.description,
clientId=self.description._topology_settings._topology_id,
)
server_descriptions = self._description.apply_selector(
selector, address, custom_selector=self._settings.server_selector
)
@ -253,10 +280,34 @@ class Topology:
while not server_descriptions:
# No suitable servers.
if timeout == 0 or now > end_time:
if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SERVER_SELECTION_LOGGER,
message=_ServerSelectionStatusMessage.FAILED,
selector=selector,
operation=operation,
operationId=operation_id,
topologyDescription=self.description,
clientId=self.description._topology_settings._topology_id,
failure=self._error_message(selector),
)
raise ServerSelectionTimeoutError(
f"{self._error_message(selector)}, Timeout: {timeout}s, Topology Description: {self.description!r}"
)
if not logged_waiting:
_info_log(
_SERVER_SELECTION_LOGGER,
message=_ServerSelectionStatusMessage.WAITING,
selector=selector,
operation=operation,
operationId=operation_id,
topologyDescription=self.description,
clientId=self.description._topology_settings._topology_id,
remainingTimeMS=int(end_time - time.monotonic()),
)
logged_waiting = True
self._ensure_opened()
self._request_check_all()
@ -277,11 +328,15 @@ class Topology:
def _select_server(
self,
selector: Callable[[Selection], Selection],
operation: str,
server_selection_timeout: Optional[float] = None,
address: Optional[_Address] = None,
deprioritized_servers: Optional[list[Server]] = None,
operation_id: Optional[int] = None,
) -> Server:
servers = self.select_servers(selector, server_selection_timeout, address)
servers = self.select_servers(
selector, operation, server_selection_timeout, address, operation_id
)
servers = _filter_servers(servers, deprioritized_servers)
if len(servers) == 1:
return servers[0]
@ -294,20 +349,43 @@ class Topology:
def select_server(
self,
selector: Callable[[Selection], Selection],
operation: str,
server_selection_timeout: Optional[float] = None,
address: Optional[_Address] = None,
deprioritized_servers: Optional[list[Server]] = None,
operation_id: Optional[int] = None,
) -> Server:
"""Like select_servers, but choose a random server if several match."""
server = self._select_server(
selector, server_selection_timeout, address, deprioritized_servers
selector,
operation,
server_selection_timeout,
address,
deprioritized_servers,
operation_id=operation_id,
)
if _csot.get_timeout():
_csot.set_rtt(server.description.min_round_trip_time)
if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SERVER_SELECTION_LOGGER,
message=_ServerSelectionStatusMessage.SUCCEEDED,
selector=selector,
operation=operation,
operationId=operation_id,
topologyDescription=self.description,
clientId=self.description._topology_settings._topology_id,
serverHost=server.description.address[0],
serverPort=server.description.address[1],
)
return server
def select_server_by_address(
self, address: _Address, server_selection_timeout: Optional[int] = None
self,
address: _Address,
operation: str,
server_selection_timeout: Optional[int] = None,
operation_id: Optional[int] = None,
) -> Server:
"""Return a Server for "address", reconnecting if necessary.
@ -316,16 +394,24 @@ class Topology:
cannot be reached.
:param address: A (host, port) pair.
:param operation: The name of the operation that the server is being selected for.
:param server_selection_timeout: maximum seconds to wait.
If not provided, the default value
common.SERVER_SELECTION_TIMEOUT is used.
:param operation_id: The unique id of the current operation being performed. Defaults to None if not provided.
Calls self.open() if needed.
Raises exc:`ServerSelectionTimeoutError` after
`server_selection_timeout` if no matching servers are found.
"""
return self.select_server(any_server_selector, server_selection_timeout, address)
return self.select_server(
any_server_selector,
operation,
server_selection_timeout,
address,
operation_id=operation_id,
)
def _process_change(
self,
@ -775,7 +861,9 @@ class Topology:
self._servers.pop(address)
def _create_pool_for_server(self, address: _Address) -> Pool:
return self._settings.pool_class(address, self._settings.pool_options)
return self._settings.pool_class(
address, self._settings.pool_options, client_id=self._topology_id
)
def _create_pool_for_monitor(self, address: _Address) -> Pool:
options = self._settings.pool_options
@ -795,7 +883,9 @@ class Topology:
server_api=options.server_api,
)
return self._settings.pool_class(address, monitor_pool_options, handshake=False)
return self._settings.pool_class(
address, monitor_pool_options, handshake=False, client_id=self._topology_id
)
def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
"""Format an error message if server selection fails.

View File

@ -236,6 +236,5 @@ test-command = "python {project}/tools/fail_if_no_c.py"
[tool.cibuildwheel.linux]
archs = "x86_64 aarch64 ppc64le s390x i686"
[tool.cibuildwheel.macos]
archs = "x86_64 arm64"

View File

@ -0,0 +1,215 @@
{
"description": "command-logging",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"collectionName": "logging-tests-collection",
"databaseName": "logging-tests",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "A successful command",
"operations": [
{
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "ping",
"command": {
"$$matchAsDocument": {
"$$matchAsRoot": {
"ping": 1,
"$db": "logging-tests"
}
}
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"databaseName": "logging-tests",
"commandName": "ping",
"reply": {
"$$type": "string"
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "A failed command",
"operations": [
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isClientError": false
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "find",
"command": {
"$$type": "string"
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"databaseName": "logging-tests",
"commandName": "find",
"failure": {
"$$exists": true
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,146 @@
{
"description": "driver-connection-id",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"collectionName": "logging-tests-collection",
"databaseName": "logging-tests",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "A successful command",
"operations": [
{
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "ping",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "ping",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "A failed command",
"operations": [
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isClientError": false
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "find",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "find",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,94 @@
{
"description": "no-handshake-command-logs",
"schemaVersion": "1.13",
"tests": [
{
"description": "Handshake commands should not generate log messages",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
},
"observeEvents": [
"connectionCreatedEvent",
"connectionReadyEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"connectionCreatedEvent": {}
},
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"connectionReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "ping"
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "ping"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,91 @@
{
"description": "no-heartbeat-command-logs",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"topologies": [
"single",
"replicaset",
"sharded"
]
}
],
"tests": [
{
"description": "Heartbeat commands should not generate log messages",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
},
"observeEvents": [
"serverDescriptionChangedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverDescriptionChangedEvent": {}
},
"count": 1
}
},
{
"name": "runCommand",
"object": "database",
"arguments": {
"command": {
"ping": 1
},
"commandName": "ping"
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "ping"
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "ping"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,198 @@
{
"description": "operation-id",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"collectionName": "logging-tests-collection",
"databaseName": "logging-tests",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "Successful bulk write command log messages include operationIds",
"operations": [
{
"name": "bulkWrite",
"object": "collection",
"arguments": {
"requests": [
{
"insertOne": {
"document": {
"x": 1
}
}
},
{
"deleteOne": {
"filter": {
"x": 1
}
}
}
]
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "insert",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "delete",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "delete",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "Failed bulk write command log message includes operationId",
"operations": [
{
"name": "bulkWrite",
"object": "collection",
"arguments": {
"requests": [
{
"updateOne": {
"filter": {
"x": 1
},
"update": [
{
"$invalidOperator": true
}
]
}
}
]
},
"expectError": {
"isClientError": false
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "update",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "update",
"operationId": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,119 @@
{
"description": "pre-42-server-connection-id",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"maxServerVersion": "4.0.99"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-server-connection-id-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"databaseName": "logging-server-connection-id-tests",
"collectionName": "logging-tests-collection",
"documents": []
}
],
"tests": [
{
"description": "command log messages do not include server connection id",
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "insert",
"serverConnectionId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"serverConnectionId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "find",
"serverConnectionId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "find",
"serverConnectionId": {
"$$exists": false
}
}
}
]
}
]
}
]
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,131 @@
{
"description": "server-connection-id",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"minServerVersion": "4.2"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-server-connection-id-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"databaseName": "logging-server-connection-id-tests",
"collectionName": "logging-tests-collection",
"documents": []
}
],
"tests": [
{
"description": "command log messages include server connection id",
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "insert",
"serverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"serverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "find",
"serverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "find",
"serverConnectionId": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,207 @@
{
"description": "service-id",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-server-connection-id-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection"
}
}
],
"initialData": [
{
"databaseName": "logging-server-connection-id-tests",
"collectionName": "logging-tests-collection",
"documents": []
}
],
"tests": [
{
"description": "command log messages include serviceId when in LB mode",
"runOnRequirements": [
{
"topologies": [
"load-balanced"
]
}
],
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "insert",
"serviceId": {
"$$type": "string"
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"serviceId": {
"$$type": "string"
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "find",
"serviceId": {
"$$type": "string"
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "find",
"serviceId": {
"$$type": "string"
}
}
}
]
}
]
},
{
"description": "command log messages omit serviceId when not in LB mode",
"runOnRequirements": [
{
"topologies": [
"single",
"replicaset",
"sharded"
]
}
],
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "insert",
"serviceId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"serviceId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"commandName": "find",
"serviceId": {
"$$exists": false
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command failed",
"commandName": "find",
"serviceId": {
"$$exists": false
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,134 @@
{
"description": "unacknowledged-write",
"schemaVersion": "1.13",
"createEntities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"command": "debug"
}
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "logging-tests-collection",
"collectionOptions": {
"writeConcern": {
"w": 0
}
}
}
}
],
"initialData": [
{
"collectionName": "logging-tests-collection",
"databaseName": "logging-tests",
"documents": [
{
"_id": 1
}
]
}
],
"tests": [
{
"description": "An unacknowledged write generates a succeeded log message with ok: 1 reply",
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 2
}
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "command",
"data": {
"message": "Command started",
"databaseName": "logging-tests",
"commandName": "insert",
"command": {
"$$matchAsDocument": {
"$$matchAsRoot": {
"insert": "logging-tests-collection",
"$db": "logging-tests"
}
}
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "command",
"data": {
"message": "Command succeeded",
"commandName": "insert",
"reply": {
"$$matchAsDocument": {
"ok": 1
}
},
"requestId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,495 @@
{
"description": "connection-logging",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"topologies": [
"single"
]
}
],
"createEntities": [
{
"client": {
"id": "failPointClient"
}
}
],
"tests": [
{
"description": "Create a client, run a command, and close the client",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeLogMessages": {
"connection": "debug"
}
}
}
]
}
},
{
"name": "listDatabases",
"object": "client",
"arguments": {
"filter": {}
}
},
{
"name": "close",
"object": "client"
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checkout started",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection created",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection ready",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checked out",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checked in",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checkout started",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checked out",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checked in",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection closed",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"reason": "Connection pool was closed"
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool closed",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "Connection checkout fails due to error establishing connection",
"runOnRequirements": [
{
"auth": true,
"minServerVersion": "4.0"
}
],
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"retryReads": false,
"appname": "clientAppName",
"heartbeatFrequencyMS": 10000
},
"observeLogMessages": {
"connection": "debug"
}
}
}
]
}
},
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"saslContinue"
],
"closeConnection": true,
"appName": "clientAppName"
}
}
}
},
{
"name": "listDatabases",
"object": "client",
"arguments": {
"filter": {}
},
"expectError": {
"isClientError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checkout started",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection created",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection closed",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"reason": "An error occurred while using the connection",
"error": {
"$$exists": true
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection checkout failed",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"reason": "An error occurred while trying to establish a new connection",
"error": {
"$$exists": true
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool cleared",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,453 @@
{
"description": "connection-pool-options",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"topologies": [
"single"
]
}
],
"tests": [
{
"description": "Options should be included in connection pool created message when specified",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"connectionReadyEvent"
],
"observeLogMessages": {
"connection": "debug"
},
"uriOptions": {
"minPoolSize": 1,
"maxPoolSize": 5,
"maxIdleTimeMS": 10000
}
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"connectionReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"minPoolSize": 1,
"maxPoolSize": 5,
"maxIdleTimeMS": 10000
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection created",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection ready",
"driverConnectionId": {
"$$type": [
"int",
"long"
]
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "maxConnecting should be included in connection pool created message when specified",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"poolReadyEvent"
],
"observeLogMessages": {
"connection": "debug"
},
"uriOptions": {
"maxConnecting": 5
}
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"poolReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"maxConnecting": 5
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "waitQueueTimeoutMS should be included in connection pool created message when specified",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"poolReadyEvent"
],
"observeLogMessages": {
"connection": "debug"
},
"uriOptions": {
"waitQueueTimeoutMS": 10000
}
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"poolReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"waitQueueTimeoutMS": 10000
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "waitQueueSize should be included in connection pool created message when specified",
"skipReason": "PyMongo does not support waitQueueSize",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"poolReadyEvent"
],
"observeLogMessages": {
"connection": "debug"
},
"uriOptions": {
"waitQueueSize": 100
}
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"poolReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"waitQueueSize": 100
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "waitQueueMultiple should be included in connection pool created message when specified",
"skipReason": "PyMongo does not support waitQueueMultiple",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"observeEvents": [
"poolReadyEvent"
],
"observeLogMessages": {
"connection": "debug"
},
"uriOptions": {
"waitQueueSize": 5
}
}
}
]
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"poolReadyEvent": {}
},
"count": 1
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool created",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
},
"waitQueueMultiple": 5
}
},
{
"level": "debug",
"component": "connection",
"data": {
"message": "Connection pool ready",
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
}
]
}

View File

@ -1,7 +1,7 @@
{
"version": 1,
"style": "unit",
"description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)",
"description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections = false)",
"poolOptions": {
"backgroundThreadIntervalMS": 10000
},
@ -78,4 +78,4 @@
"ConnectionCheckOutStarted",
"ConnectionPoolCreated"
]
}
}

View File

@ -22,6 +22,7 @@ from operations import operations # type: ignore[import]
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from pymongo.operations import _Op
from pymongo.server_type import SERVER_TYPE
@ -68,7 +69,7 @@ class TestResetAndRequestCheck(unittest.TestCase):
# Server is Unknown.
topology = self.client._topology
with self.assertRaises(ConnectionFailure):
topology.select_server_by_address(self.server.address, 0)
topology.select_server_by_address(self.server.address, _Op.TEST, 0)
time.sleep(0.5)
after = time.time()
@ -95,7 +96,7 @@ class TestResetAndRequestCheck(unittest.TestCase):
# Server is *not* Unknown.
topology = self.client._topology
server = topology.select_server_by_address(self.server.address, 0)
server = topology.select_server_by_address(self.server.address, _Op.TEST, 0)
assert server is not None
self.assertEqual(SERVER_TYPE.Standalone, server.description.server_type)
@ -117,7 +118,7 @@ class TestResetAndRequestCheck(unittest.TestCase):
# Server is rediscovered.
topology = self.client._topology
server = topology.select_server_by_address(self.server.address, 0)
server = topology.select_server_by_address(self.server.address, _Op.TEST, 0)
assert server is not None
self.assertEqual(SERVER_TYPE.Standalone, server.description.server_type)

View File

@ -0,0 +1,107 @@
{
"description": "server-selection-logging",
"schemaVersion": "1.13",
"runOnRequirements": [
{
"topologies": [
"load-balanced"
]
}
],
"createEntities": [
{
"client": {
"id": "client",
"uriOptions": {
"heartbeatFrequencyMS": 500
},
"observeLogMessages": {
"serverSelection": "debug"
},
"observeEvents": [
"serverDescriptionChangedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "server-selection"
}
}
],
"tests": [
{
"description": "A successful operation - load balanced cluster",
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverDescriptionChangedEvent": {
"newDescription": {
"type": "LoadBalancer"
}
}
},
"count": 1
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
}
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection succeeded",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,229 @@
{
"description": "operation-id",
"schemaVersion": "1.14",
"runOnRequirements": [
{
"topologies": [
"single"
]
}
],
"createEntities": [
{
"client": {
"id": "client",
"uriOptions": {
"retryWrites": false,
"heartbeatFrequencyMS": 500,
"appName": "loggingClient",
"serverSelectionTimeoutMS": 2000
},
"observeLogMessages": {
"serverSelection": "debug"
},
"observeEvents": [
"serverDescriptionChangedEvent",
"topologyDescriptionChangedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "server-selection"
}
},
{
"client": {
"id": "failPointClient"
}
}
],
"tests": [
{
"description": "Successful bulkWrite operation: log messages have operationIds",
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 2
}
},
{
"name": "bulkWrite",
"object": "collection",
"arguments": {
"requests": [
{
"insertOne": {
"document": {
"x": 1
}
}
}
]
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"operationId": {
"$$type": [
"int",
"long"
]
},
"operation": "insert"
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection succeeded",
"operationId": {
"$$type": [
"int",
"long"
]
},
"operation": "insert"
}
}
]
}
]
},
{
"description": "Failed bulkWrite operation: log messages have operationIds",
"runOnRequirements": [
{
"minServerVersion": "4.4"
}
],
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": [
"hello",
"ismaster"
],
"appName": "loggingClient",
"closeConnection": true
}
}
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverDescriptionChangedEvent": {
"newDescription": {
"type": "Unknown"
}
}
},
"count": 1
}
},
{
"name": "bulkWrite",
"object": "collection",
"arguments": {
"requests": [
{
"insertOne": {
"document": {
"x": 1
}
}
}
]
},
"expectError": {
"isClientError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"operationId": {
"$$type": [
"int",
"long"
]
},
"operation": "insert"
}
},
{
"level": "info",
"component": "serverSelection",
"data": {
"message": "Waiting for suitable server to become available",
"operationId": {
"$$type": [
"int",
"long"
]
},
"operation": "insert"
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection failed",
"operationId": {
"$$type": [
"int",
"long"
]
},
"operation": "insert"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,228 @@
{
"description": "replica-set-logging",
"schemaVersion": "1.14",
"runOnRequirements": [
{
"topologies": [
"replicaset"
]
}
],
"createEntities": [
{
"client": {
"id": "client",
"uriOptions": {
"retryWrites": false,
"heartbeatFrequencyMS": 500,
"serverSelectionTimeoutMS": 2000
},
"observeLogMessages": {
"serverSelection": "debug"
},
"observeEvents": [
"serverDescriptionChangedEvent",
"topologyDescriptionChangedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "server-selection"
}
},
{
"client": {
"id": "failPointClient"
}
},
{
"collection": {
"id": "unsatisfiableRPColl",
"database": "database",
"collectionName": "unsatisfiableRPColl",
"collectionOptions": {
"readPreference": {
"mode": "Secondary",
"tagSets": [
{
"nonexistenttag": "a"
}
]
}
}
}
}
],
"tests": [
{
"description": "A successful operation",
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 4
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
}
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection succeeded",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "Server selection fails due to unsatisfiable read preference",
"runOnRequirements": [
{
"minServerVersion": "4.0"
}
],
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 4
}
},
{
"name": "find",
"object": "unsatisfiableRPColl",
"arguments": {
"filter": {
"x": 1
}
},
"expectError": {
"isClientError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"selector": {
"$$exists": true
},
"operation": "find",
"topologyDescription": {
"$$exists": true
}
}
},
{
"level": "info",
"component": "serverSelection",
"data": {
"message": "Waiting for suitable server to become available",
"selector": {
"$$exists": true
},
"operation": "find",
"topologyDescription": {
"$$exists": true
},
"remainingTimeMS": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection failed",
"selector": {
"$$exists": true
},
"operation": "find",
"topologyDescription": {
"$$exists": true
},
"failure": {
"$$exists": true
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,237 @@
{
"description": "server-selection-logging",
"schemaVersion": "1.14",
"runOnRequirements": [
{
"topologies": [
"sharded"
]
}
],
"createEntities": [
{
"client": {
"id": "client",
"uriOptions": {
"retryWrites": false,
"heartbeatFrequencyMS": 500,
"appName": "loggingClient",
"serverSelectionTimeoutMS": 2000
},
"observeLogMessages": {
"serverSelection": "debug"
},
"observeEvents": [
"serverDescriptionChangedEvent",
"topologyDescriptionChangedEvent"
],
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "logging-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "server-selection"
}
},
{
"client": {
"id": "failPointClient",
"useMultipleMongoses": false
}
}
],
"tests": [
{
"description": "A successful operation",
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"topologyDescriptionChangedEvent": {}
},
"count": 2
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
}
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection succeeded",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
},
"serverHost": {
"$$type": "string"
},
"serverPort": {
"$$type": [
"int",
"long"
]
}
}
}
]
}
]
},
{
"description": "Failure due to unreachable server",
"runOnRequirements": [
{
"minServerVersion": "4.4"
}
],
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "failPointClient",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": [
"hello",
"ismaster"
],
"appName": "loggingClient",
"closeConnection": true
}
}
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverDescriptionChangedEvent": {
"newDescription": {
"type": "Unknown"
}
}
},
"count": 1
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
},
"expectError": {
"isClientError": true
}
}
],
"expectLogMessages": [
{
"client": "client",
"messages": [
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection started",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
}
}
},
{
"level": "info",
"component": "serverSelection",
"data": {
"message": "Waiting for suitable server to become available",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
},
"remainingTimeMS": {
"$$type": [
"int",
"long"
]
}
}
},
{
"level": "debug",
"component": "serverSelection",
"data": {
"message": "Server selection failed",
"selector": {
"$$exists": true
},
"operation": "insert",
"topologyDescription": {
"$$exists": true
},
"failure": {
"$$exists": true
}
}
}
]
}
]
}
]
}

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,8 @@ from typing import Iterable, Type, no_type_check
from unittest import mock
from unittest.mock import patch
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import (
@ -560,7 +562,7 @@ class TestClient(IntegrationTest):
with client_knobs(kill_cursor_frequency=0.1):
# Assert reaper doesn't remove connections when maxIdleTimeMS not set
client = rs_or_single_client()
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn:
pass
self.assertEqual(1, len(server._pool.conns))
@ -571,7 +573,7 @@ class TestClient(IntegrationTest):
with client_knobs(kill_cursor_frequency=0.1):
# Assert reaper removes idle socket and replaces it with a new one
client = rs_or_single_client(maxIdleTimeMS=500, minPoolSize=1)
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn:
pass
# When the reaper runs at the same time as the get_socket, two
@ -585,7 +587,7 @@ class TestClient(IntegrationTest):
with client_knobs(kill_cursor_frequency=0.1):
# Assert reaper respects maxPoolSize when adding new connections.
client = rs_or_single_client(maxIdleTimeMS=500, minPoolSize=1, maxPoolSize=1)
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn:
pass
# When the reaper runs at the same time as the get_socket,
@ -599,7 +601,7 @@ class TestClient(IntegrationTest):
with client_knobs(kill_cursor_frequency=0.1):
# Assert reaper has removed idle socket and NOT replaced it
client = rs_or_single_client(maxIdleTimeMS=500)
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn_one:
pass
# Assert that the pool does not close connections prematurely.
@ -616,12 +618,12 @@ class TestClient(IntegrationTest):
def test_min_pool_size(self):
with client_knobs(kill_cursor_frequency=0.1):
client = rs_or_single_client()
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
self.assertEqual(0, len(server._pool.conns))
# Assert that pool started up at minPoolSize
client = rs_or_single_client(minPoolSize=10)
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
wait_until(
lambda: len(server._pool.conns) == 10,
"pool initialized with 10 connections",
@ -640,7 +642,7 @@ class TestClient(IntegrationTest):
# Use high frequency to test _get_socket_no_auth.
with client_knobs(kill_cursor_frequency=99999999):
client = rs_or_single_client(maxIdleTimeMS=500)
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn:
pass
self.assertEqual(1, len(server._pool.conns))
@ -654,7 +656,7 @@ class TestClient(IntegrationTest):
# Test that connections are reused if maxIdleTimeMS is not set.
client = rs_or_single_client()
server = client._get_topology().select_server(readable_server_selector)
server = client._get_topology().select_server(readable_server_selector, _Op.TEST)
with server._pool.checkout() as conn:
pass
self.assertEqual(1, len(server._pool.conns))
@ -2180,7 +2182,7 @@ class TestMongoClientFailover(MockClientTest):
# But it can reconnect.
c.revive_host("a:1")
c._get_topology().select_servers(writable_server_selector)
c._get_topology().select_servers(writable_server_selector, _Op.TEST)
self.assertEqual(c.address, ("a", 1))
def _test_network_error(self, operation_callback):
@ -2203,7 +2205,7 @@ class TestMongoClientFailover(MockClientTest):
# Set host-specific information so we can test whether it is reset.
c.set_wire_version_range("a:1", 2, 6)
c.set_wire_version_range("b:2", 2, 7)
c._get_topology().select_servers(writable_server_selector)
c._get_topology().select_servers(writable_server_selector, _Op.TEST)
wait_until(lambda: len(c.nodes) == 2, "connect")
c.kill_host("a:1")

View File

@ -0,0 +1,39 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run the command monitoring unified format spec tests."""
from __future__ import annotations
import os
import sys
sys.path[0:0] = [""]
from test import unittest
from test.unified_format import generate_test_classes
# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "command_logging")
globals().update(
generate_test_classes(
_TEST_PATH,
module=__name__,
)
)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,39 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run the connection logging unified format spec tests."""
from __future__ import annotations
import os
import sys
sys.path[0:0] = [""]
from test import unittest
from test.unified_format import generate_test_classes
# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "connection_logging")
globals().update(
generate_test_classes(
_TEST_PATH,
module=__name__,
)
)
if __name__ == "__main__":
unittest.main()

View File

@ -85,7 +85,7 @@ OBJECT_TYPES = {
class TestCMAP(IntegrationTest):
# Location of JSON test specifications.
TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "cmap")
TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "connection_monitoring")
# Test operations:

84
test/test_logger.py Normal file
View File

@ -0,0 +1,84 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from test import unittest
from test.test_client import IntegrationTest
from unittest.mock import patch
from bson import json_util
from pymongo.errors import OperationFailure
from pymongo.logger import _DEFAULT_DOCUMENT_LENGTH
# https://github.com/mongodb/specifications/tree/master/source/command-logging-and-monitoring/tests#prose-tests
class TestLogger(IntegrationTest):
def test_default_truncation_limit(self):
docs = [{"x": "y"} for _ in range(100)]
db = self.db
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
db.test.insert_many(docs)
cmd_started_log = json_util.loads(cm.records[0].message)
self.assertEqual(len(cmd_started_log["command"]), _DEFAULT_DOCUMENT_LENGTH + 3)
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertLessEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
list(db.test.find({}))
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)
def test_configured_truncation_limit(self):
cmd = {"hello": True}
db = self.db
with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": "5"}):
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
db.command(cmd)
cmd_started_log = json_util.loads(cm.records[0].message)
self.assertEqual(len(cmd_started_log["command"]), 5 + 3)
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertLessEqual(len(cmd_succeeded_log["reply"]), 5 + 3)
with self.assertRaises(OperationFailure):
db.command({"notARealCommand": True})
cmd_failed_log = json_util.loads(cm.records[-1].message)
self.assertEqual(len(cmd_failed_log["failure"]), 5 + 3)
def test_truncation_multi_byte_codepoints(self):
document_lengths = ["20000", "20001", "20002"]
multi_byte_char_str_len = 50_000
str_to_repeat = ""
multi_byte_char_str = ""
for i in range(multi_byte_char_str_len):
multi_byte_char_str += str_to_repeat
for length in document_lengths:
with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": length}):
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
self.db.test.insert_one({"x": multi_byte_char_str})
cmd_started_log = json_util.loads(cm.records[0].message)["command"]
cmd_started_log = cmd_started_log[:-3]
last_3_bytes = cmd_started_log.encode()[-3:].decode()
self.assertEqual(last_3_bytes, str_to_repeat)
if __name__ == "__main__":
unittest.main()

View File

@ -20,6 +20,8 @@ import sys
import time
import warnings
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import client_context, unittest
@ -113,7 +115,7 @@ class TestMaxStaleness(unittest.TestCase):
client.pymongo_test.test.insert_one({})
# Wait for the server description to be updated.
time.sleep(1)
server = client._topology.select_server(writable_server_selector)
server = client._topology.select_server(writable_server_selector, _Op.TEST)
first = server.description.last_write_date
self.assertTrue(first)
# The first last_write_date may correspond to a internal server write,
@ -122,7 +124,7 @@ class TestMaxStaleness(unittest.TestCase):
client.pymongo_test.test.insert_one({})
# Wait for the server description to be updated.
time.sleep(1)
server = client._topology.select_server(writable_server_selector)
server = client._topology.select_server(writable_server_selector, _Op.TEST)
second = server.description.last_write_date
assert first is not None

View File

@ -18,6 +18,8 @@ from __future__ import annotations
import sys
import threading
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import MockClientTest, client_context, unittest
@ -60,7 +62,8 @@ def do_simple_op(client, nthreads):
def writable_addresses(topology):
return {
server.description.address for server in topology.select_servers(writable_server_selector)
server.description.address
for server in topology.select_servers(writable_server_selector, _Op.TEST)
}

View File

@ -22,6 +22,8 @@ import random
import sys
from typing import Any
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import IntegrationTest, SkipTest, client_context, unittest
@ -267,7 +269,7 @@ class TestReadPreferences(TestReadPreferencesBase):
not_used = data_members.difference(used)
latencies = ", ".join(
"%s: %sms" % (server.description.address, server.description.round_trip_time)
for server in c._get_topology().select_servers(readable_server_selector)
for server in c._get_topology().select_servers(readable_server_selector, _Op.TEST)
)
self.assertFalse(
@ -285,8 +287,8 @@ class ReadPrefTester(MongoClient):
super().__init__(*args, **client_options)
@contextlib.contextmanager
def _conn_for_reads(self, read_preference, session):
context = super()._conn_for_reads(read_preference, session)
def _conn_for_reads(self, read_preference, session, operation):
context = super()._conn_for_reads(read_preference, session, operation)
with context as (conn, read_preference):
self.record_a_read(conn.address)
yield conn, read_preference
@ -299,7 +301,7 @@ class ReadPrefTester(MongoClient):
yield conn, read_preference
def record_a_read(self, address):
server = self._get_topology().select_server_by_address(address, 0)
server = self._get_topology().select_server_by_address(address, _Op.TEST, 0)
self.has_read_from.add(server)

View File

@ -21,6 +21,7 @@ import sys
from pymongo import MongoClient, ReadPreference
from pymongo.errors import ServerSelectionTimeoutError
from pymongo.hello import HelloCompat
from pymongo.operations import _Op
from pymongo.server_selectors import writable_server_selector
from pymongo.settings import TopologySettings
from pymongo.topology import Topology
@ -158,7 +159,7 @@ class TestCustomServerSelectorFunction(IntegrationTest):
# Invoke server selection and assert no filtering based on latency
# prior to custom server selection logic kicking in.
server = topology.select_server(ReadPreference.NEAREST)
server = topology.select_server(ReadPreference.NEAREST, _Op.TEST)
assert selector.selection is not None
self.assertEqual(len(selector.selection), len(topology.description.server_descriptions()))
@ -193,7 +194,7 @@ class TestCustomServerSelectorFunction(IntegrationTest):
# Invoke server selection and assert no calls to our custom selector.
with self.assertRaisesRegex(ServerSelectionTimeoutError, "No primary available for writes"):
topology.select_server(writable_server_selector, server_selection_timeout=0.1)
topology.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1)
self.assertEqual(selector.call_count, 0)

View File

@ -28,6 +28,7 @@ from test.utils import (
from test.utils_selection_tests import create_topology
from pymongo.common import clean_node
from pymongo.operations import _Op
from pymongo.read_preferences import ReadPreference
# Location of JSON test specifications.
@ -52,7 +53,7 @@ class TestAllScenarios(unittest.TestCase):
# Number of times to repeat server selection
iterations = scenario_def["iterations"]
for _ in range(iterations):
server = topology.select_server(pref, server_selection_timeout=0)
server = topology.select_server(pref, _Op.TEST, server_selection_timeout=0)
counts[server.description.address] += 1
# Verify expected_frequencies

View File

@ -0,0 +1,39 @@
# Copyright 2024-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run the server selection logging unified format spec tests."""
from __future__ import annotations
import os
import sys
sys.path[0:0] = [""]
from test import unittest
from test.unified_format import generate_test_classes
# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "server_selection_logging")
globals().update(
generate_test_classes(
_TEST_PATH,
module=__name__,
)
)
if __name__ == "__main__":
unittest.main()

View File

@ -17,6 +17,8 @@ from __future__ import annotations
import sys
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import client_knobs, unittest
@ -170,7 +172,7 @@ class TestSingleServerTopology(TopologyTest):
# Can't select a server while the only server is of type Unknown.
with self.assertRaisesRegex(ConnectionFailure, "No servers found yet"):
t.select_servers(any_server_selector, server_selection_timeout=0)
t.select_servers(any_server_selector, _Op.TEST, server_selection_timeout=0)
got_hello(t, address, hello_response)
@ -179,7 +181,7 @@ class TestSingleServerTopology(TopologyTest):
# No matter whether the server is writable,
# select_servers() returns it.
s = t.select_server(writable_server_selector)
s = t.select_server(writable_server_selector, _Op.TEST)
self.assertEqual(server_type, s.description.server_type)
# Topology type single is always readable and writable regardless
@ -217,7 +219,7 @@ class TestSingleServerTopology(TopologyTest):
t = create_mock_topology(monitor_class=TestMonitor)
self.addCleanup(t.close)
s = t.select_server(writable_server_selector)
s = t.select_server(writable_server_selector, _Op.TEST)
self.assertEqual(125, s.description.round_trip_time)
round_trip_time = 25
@ -232,7 +234,7 @@ class TestSingleServerTopology(TopologyTest):
def raises_err():
try:
t.select_server(writable_server_selector, server_selection_timeout=0.1)
t.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1)
except ConnectionFailure:
return True
else:
@ -535,7 +537,7 @@ class TestMultiServerTopology(TopologyTest):
self.assertEqual(server.description.min_wire_version, 1)
self.assertEqual(server.description.max_wire_version, 6)
t.select_servers(any_server_selector)
t.select_servers(any_server_selector, _Op.TEST)
# Incompatible.
got_hello(
@ -552,7 +554,7 @@ class TestMultiServerTopology(TopologyTest):
)
try:
t.select_servers(any_server_selector)
t.select_servers(any_server_selector, _Op.TEST)
except ConfigurationError as e:
# Error message should say which server failed and why.
self.assertEqual(
@ -578,7 +580,7 @@ class TestMultiServerTopology(TopologyTest):
)
try:
t.select_servers(any_server_selector)
t.select_servers(any_server_selector, _Op.TEST)
except ConfigurationError as e:
# Error message should say which server failed and why.
self.assertEqual(
@ -594,7 +596,7 @@ class TestMultiServerTopology(TopologyTest):
t = create_mock_topology(seeds=["a", "b"], replica_set_name="rs")
def write_batch_size():
s = t.select_server(writable_server_selector)
s = t.select_server(writable_server_selector, _Op.TEST)
return s.description.max_write_batch_size
got_hello(
@ -715,7 +717,7 @@ def wait_for_primary(topology):
def get_primary():
try:
return topology.select_server(writable_server_selector, 0)
return topology.select_server(writable_server_selector, _Op.TEST, 0)
except ConnectionFailure:
return None
@ -771,7 +773,7 @@ class TestTopologyErrors(TopologyTest):
# The third hello call (the immediate retry) happens sometime soon
# after the failed check triggered by request_check_all. Wait until
# the server becomes known again.
server = t.select_server(writable_server_selector, 0.250)
server = t.select_server(writable_server_selector, _Op.TEST, 0.250)
self.assertEqual(SERVER_TYPE.Standalone, server.description.server_type)
self.assertEqual(3, hello_count[0])
@ -785,13 +787,13 @@ class TestTopologyErrors(TopologyTest):
t = create_mock_topology(monitor_class=TestMonitor)
self.addCleanup(t.close)
with self.assertRaisesRegex(ConnectionFailure, "internal error"):
t.select_server(any_server_selector, server_selection_timeout=0.5)
t.select_server(any_server_selector, _Op.TEST, server_selection_timeout=0.5)
class TestServerSelectionErrors(TopologyTest):
def assertMessage(self, message, topology, selector=any_server_selector):
with self.assertRaises(ConnectionFailure) as context:
topology.select_server(selector, server_selection_timeout=0)
topology.select_server(selector, _Op.TEST, server_selection_timeout=0)
self.assertIn(message, str(context.exception))

View File

@ -29,7 +29,7 @@ import sys
import time
import traceback
import types
from collections import abc
from collections import abc, defaultdict
from test import (
AWS_CREDS,
AWS_CREDS_2,
@ -58,7 +58,7 @@ from test.utils import (
)
from test.utils_spec_runner import SpecRunnerThread
from test.version import Version
from typing import Any, Dict, List, Mapping, Optional
from typing import Any, Dict, List, Mapping, Optional, Union
import pymongo
from bson import SON, Code, DBRef, Decimal128, Int64, MaxKey, MinKey, json_util
@ -109,7 +109,11 @@ from pymongo.monitoring import (
ServerHeartbeatSucceededEvent,
ServerListener,
ServerOpeningEvent,
TopologyClosedEvent,
TopologyDescriptionChangedEvent,
TopologyEvent,
TopologyListener,
TopologyOpenedEvent,
_CommandEvent,
_ConnectionEvent,
_PoolEvent,
@ -313,7 +317,9 @@ class NonLazyCursor:
self.client = None
class EventListenerUtil(CMAPListener, CommandListener, ServerListener, ServerHeartbeatListener):
class EventListenerUtil(
CMAPListener, CommandListener, ServerListener, ServerHeartbeatListener, TopologyListener
):
def __init__(
self, observe_events, ignore_commands, observe_sensitive_commands, store_events, entity_map
):
@ -395,13 +401,18 @@ class EventListenerUtil(CMAPListener, CommandListener, ServerListener, ServerHea
else:
self.add_event(event)
def opened(self, event: ServerOpeningEvent) -> None:
def opened(self, event: Union[ServerOpeningEvent, TopologyOpenedEvent]) -> None:
self.add_event(event)
def description_changed(self, event: ServerDescriptionChangedEvent) -> None:
def description_changed(
self, event: Union[ServerDescriptionChangedEvent, TopologyDescriptionChangedEvent]
) -> None:
self.add_event(event)
def closed(self, event: ServerClosedEvent) -> None:
def topology_changed(self, event: TopologyDescriptionChangedEvent) -> None:
self.add_event(event)
def closed(self, event: Union[ServerClosedEvent, TopologyClosedEvent]) -> None:
self.add_event(event)
@ -701,6 +712,12 @@ class MatchEvaluatorUtil:
self.test.fail(f"Actual command is missing the {key_to_compare} field: {spec}")
self.test.assertLessEqual(actual[key_to_compare], spec)
def _operation_matchAsDocument(self, spec, actual, key_to_compare):
self._match_document(spec, json_util.loads(actual[key_to_compare]), False)
def _operation_matchAsRoot(self, spec, actual, key_to_compare):
self._match_document(spec, actual, True)
def _evaluate_special_operation(self, opname, spec, actual, key_to_compare):
method_name = "_operation_{}".format(opname.strip("$"))
try:
@ -909,6 +926,8 @@ class MatchEvaluatorUtil:
self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent)
if "awaited" in spec:
self.test.assertEqual(actual.awaited, spec["awaited"])
elif name == "topologyDescriptionChangedEvent":
self.test.assertIsInstance(actual, TopologyDescriptionChangedEvent)
else:
raise Exception(f"Unsupported event type {name}")
@ -1698,6 +1717,48 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
else:
assert server_connection_id is None
def check_log_messages(self, operations, spec):
def format_logs(log_list):
client_to_log = defaultdict(list)
for log in log_list:
data = json_util.loads(log.message)
client = data.pop("clientId")
client_to_log[client].append(
{
"level": log.levelname.lower(),
"component": log.name.replace("pymongo.", "", 1),
"data": data,
}
)
return client_to_log
with self.assertLogs("pymongo", level="DEBUG") as cm:
self.run_operations(operations)
formatted_logs = format_logs(cm.records)
for client in spec:
components = set()
for message in client["messages"]:
components.add(message["component"])
clientid = self.entity_map[client["client"]]._topology_settings._topology_id
actual_logs = formatted_logs[clientid]
actual_logs = [log for log in actual_logs if log["component"] in components]
self.assertEqual(len(client["messages"]), len(actual_logs))
for expected_msg, actual_msg in zip(client["messages"], actual_logs):
expected_data, actual_data = expected_msg.pop("data"), actual_msg.pop("data")
if "failureIsRedacted" in expected_msg:
self.assertIn("failure", actual_data)
should_redact = expected_msg.pop("failureIsRedacted")
if should_redact:
actual_fields = set(json_util.loads(actual_data["failure"]).keys())
self.assertTrue(
{"code", "codeName", "errorLabels"}.issuperset(actual_fields)
)
self.match_evaluator.match_result(expected_data, actual_data)
self.match_evaluator.match_result(expected_msg, actual_msg)
def verify_outcome(self, spec):
for collection_data in spec:
coll_name = collection_data["collectionName"]
@ -1758,8 +1819,13 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
# process initialData
self.insert_initial_data(self.TEST_SPEC.get("initialData", []))
# process operations
self.run_operations(spec["operations"])
if "expectLogMessages" in spec:
expect_log_messages = spec["expectLogMessages"]
self.assertTrue(expect_log_messages, "expectEvents must be non-empty")
self.check_log_messages(spec["operations"], expect_log_messages)
else:
# process operations
self.run_operations(spec["operations"])
# process expectEvents
if "expectEvents" in spec:

View File

@ -54,6 +54,7 @@ from pymongo.monitoring import (
PoolCreatedEvent,
PoolReadyEvent,
)
from pymongo.operations import _Op
from pymongo.pool import _CancellationContext, _PoolGeneration
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
@ -296,7 +297,7 @@ class MockConnection:
class MockPool:
def __init__(self, address, options, handshake=True):
def __init__(self, address, options, handshake=True, client_id=None):
self.gen = _PoolGeneration()
self._lock = _create_lock()
self.opts = options
@ -867,13 +868,16 @@ class DeprecationFilter:
def get_pool(client):
"""Get the standalone, primary, or mongos pool."""
topology = client._get_topology()
server = topology.select_server(writable_server_selector)
server = topology.select_server(writable_server_selector, _Op.TEST)
return server.pool
def get_pools(client):
"""Get all pools."""
return [server.pool for server in client._get_topology().select_servers(any_server_selector)]
return [
server.pool
for server in client._get_topology().select_servers(any_server_selector, _Op.TEST)
]
# Constants for run_threads and lazy_client_trial.
@ -991,7 +995,7 @@ def parse_read_preference(pref):
mode_string = mode_string[:1].lower() + mode_string[1:]
mode = read_preferences.read_pref_mode_from_name(mode_string)
max_staleness = pref.get("maxStalenessSeconds", -1)
tag_sets = pref.get("tag_sets")
tag_sets = pref.get("tagSets") or pref.get("tag_sets")
return read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness
)

View File

@ -19,6 +19,8 @@ import datetime
import os
import sys
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import unittest
@ -178,7 +180,7 @@ def create_test(scenario_def):
with self.assertRaises((ConfigurationError, ValueError)):
# Error can be raised when making Read Pref or selecting.
pref = parse_read_preference(pref_def)
top_latency.select_server(pref)
top_latency.select_server(pref, _Op.TEST)
return
pref = parse_read_preference(pref_def)
@ -186,18 +188,18 @@ def create_test(scenario_def):
# Select servers.
if not scenario_def.get("suitable_servers"):
with self.assertRaises(AutoReconnect):
top_suitable.select_server(pref, server_selection_timeout=0)
top_suitable.select_server(pref, _Op.TEST, server_selection_timeout=0)
return
if not scenario_def["in_latency_window"]:
with self.assertRaises(AutoReconnect):
top_latency.select_server(pref, server_selection_timeout=0)
top_latency.select_server(pref, _Op.TEST, server_selection_timeout=0)
return
actual_suitable_s = top_suitable.select_servers(pref, server_selection_timeout=0)
actual_latency_s = top_latency.select_servers(pref, server_selection_timeout=0)
actual_suitable_s = top_suitable.select_servers(pref, _Op.TEST, server_selection_timeout=0)
actual_latency_s = top_latency.select_servers(pref, _Op.TEST, server_selection_timeout=0)
expected_suitable_servers = {}
for server in scenario_def["suitable_servers"]: