PYTHON-2822 Add server connectionId to command monitoring events (#1438)

This commit is contained in:
Noah Stapp 2023-12-01 14:33:37 -08:00 committed by GitHub
parent fa25311726
commit b1939e1470
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 298 additions and 11 deletions

View File

@ -1,6 +1,16 @@
Changelog
=========
Changes in Version 4.7
------------------------
PyMongo 4.7 brings a number of improvements including:
- Added the :class:`~pymongo.hello.Hello.server_connection_id`,
:attr:`pymongo.monitoring.CommandStartedEvent.server_connection_id`,
:attr:`pymongo.monitoring.CommandSucceededEvent.server_connection_id`, and
:attr:`pymongo.monitoring.CommandFailedEvent.server_connection_id` properties.
Changes in Version 4.6.1
------------------------

View File

@ -218,3 +218,7 @@ class Hello(Generic[_DocumentType]):
@property
def hello_ok(self) -> bool:
return self._doc.get("helloOk", False)
@property
def connection_id(self) -> Optional[int]:
return self._doc.get("connectionId")

View File

@ -1082,6 +1082,7 @@ class _BulkWriteContext:
self.db_name,
request_id,
self.conn.address,
self.conn.server_connection_id,
self.op_id,
self.conn.service_id,
)
@ -1095,6 +1096,7 @@ class _BulkWriteContext:
self.name,
request_id,
self.conn.address,
self.conn.server_connection_id,
self.op_id,
self.conn.service_id,
database_name=self.db_name,
@ -1108,6 +1110,7 @@ class _BulkWriteContext:
self.name,
request_id,
self.conn.address,
self.conn.server_connection_id,
self.op_id,
self.conn.service_id,
database_name=self.db_name,

View File

@ -537,7 +537,15 @@ def _is_speculative_authenticate(command_name: str, doc: Mapping[str, Any]) -> b
class _CommandEvent:
"""Base class for command events."""
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id", "__service_id", "__db")
__slots__ = (
"__cmd_name",
"__rqst_id",
"__conn_id",
"__op_id",
"__service_id",
"__db",
"__server_conn_id",
)
def __init__(
self,
@ -547,6 +555,7 @@ class _CommandEvent:
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
server_connection_id: Optional[int] = None,
) -> None:
self.__cmd_name = command_name
self.__rqst_id = request_id
@ -554,6 +563,7 @@ class _CommandEvent:
self.__op_id = operation_id
self.__service_id = service_id
self.__db = database_name
self.__server_conn_id = server_connection_id
@property
def command_name(self) -> str:
@ -591,6 +601,14 @@ class _CommandEvent:
"""
return self.__db
@property
def server_connection_id(self) -> Optional[int]:
"""The server-side connection id for the connection this command was sent on, or ``None``.
.. versionadded:: 4.7
"""
return self.__server_conn_id
class CommandStartedEvent(_CommandEvent):
"""Event published when a command starts.
@ -614,6 +632,7 @@ class CommandStartedEvent(_CommandEvent):
connection_id: _Address,
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
server_connection_id: Optional[int] = None,
) -> None:
if not command:
raise ValueError(f"{command!r} is not a valid command")
@ -626,6 +645,7 @@ class CommandStartedEvent(_CommandEvent):
operation_id,
service_id=service_id,
database_name=database_name,
server_connection_id=server_connection_id,
)
cmd_name = command_name.lower()
if cmd_name in _SENSITIVE_COMMANDS or _is_speculative_authenticate(cmd_name, command):
@ -644,13 +664,16 @@ class CommandStartedEvent(_CommandEvent):
return super().database_name
def __repr__(self) -> str:
return ("<{} {} db: {!r}, command: {!r}, operation_id: {}, service_id: {}>").format(
return (
"<{} {} db: {!r}, command: {!r}, operation_id: {}, service_id: {}, server_connection_id: {}>"
).format(
self.__class__.__name__,
self.connection_id,
self.database_name,
self.command_name,
self.operation_id,
self.service_id,
self.server_connection_id,
)
@ -680,6 +703,7 @@ class CommandSucceededEvent(_CommandEvent):
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
server_connection_id: Optional[int] = None,
) -> None:
super().__init__(
command_name,
@ -688,6 +712,7 @@ class CommandSucceededEvent(_CommandEvent):
operation_id,
service_id=service_id,
database_name=database_name,
server_connection_id=server_connection_id,
)
self.__duration_micros = _to_micros(duration)
cmd_name = command_name.lower()
@ -708,7 +733,7 @@ class CommandSucceededEvent(_CommandEvent):
def __repr__(self) -> str:
return (
"<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, service_id: {}>"
"<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, service_id: {}, server_connection_id: {}>"
).format(
self.__class__.__name__,
self.connection_id,
@ -717,6 +742,7 @@ class CommandSucceededEvent(_CommandEvent):
self.operation_id,
self.duration_micros,
self.service_id,
self.server_connection_id,
)
@ -746,6 +772,7 @@ class CommandFailedEvent(_CommandEvent):
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
server_connection_id: Optional[int] = None,
) -> None:
super().__init__(
command_name,
@ -754,6 +781,7 @@ class CommandFailedEvent(_CommandEvent):
operation_id,
service_id=service_id,
database_name=database_name,
server_connection_id=server_connection_id,
)
self.__duration_micros = _to_micros(duration)
self.__failure = failure
@ -771,7 +799,7 @@ class CommandFailedEvent(_CommandEvent):
def __repr__(self) -> str:
return (
"<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, "
"failure: {!r}, service_id: {}>"
"failure: {!r}, service_id: {}, server_connection_id: {}>"
).format(
self.__class__.__name__,
self.connection_id,
@ -781,6 +809,7 @@ class CommandFailedEvent(_CommandEvent):
self.duration_micros,
self.failure,
self.service_id,
self.server_connection_id,
)
@ -1453,6 +1482,7 @@ class _EventListeners:
database_name: str,
request_id: int,
connection_id: _Address,
server_connection_id: Optional[int],
op_id: Optional[int] = None,
service_id: Optional[ObjectId] = None,
) -> None:
@ -1470,7 +1500,13 @@ class _EventListeners:
if op_id is None:
op_id = request_id
event = CommandStartedEvent(
command, database_name, request_id, connection_id, op_id, service_id=service_id
command,
database_name,
request_id,
connection_id,
op_id,
service_id=service_id,
server_connection_id=server_connection_id,
)
for subscriber in self.__command_listeners:
try:
@ -1485,6 +1521,7 @@ class _EventListeners:
command_name: str,
request_id: int,
connection_id: _Address,
server_connection_id: Optional[int],
op_id: Optional[int] = None,
service_id: Optional[ObjectId] = None,
speculative_hello: bool = False,
@ -1518,6 +1555,7 @@ class _EventListeners:
op_id,
service_id,
database_name=database_name,
server_connection_id=server_connection_id,
)
for subscriber in self.__command_listeners:
try:
@ -1532,6 +1570,7 @@ class _EventListeners:
command_name: str,
request_id: int,
connection_id: _Address,
server_connection_id: Optional[int],
op_id: Optional[int] = None,
service_id: Optional[ObjectId] = None,
database_name: str = "",
@ -1560,6 +1599,7 @@ class _EventListeners:
op_id,
service_id=service_id,
database_name=database_name,
server_connection_id=server_connection_id,
)
for subscriber in self.__command_listeners:
try:

View File

@ -167,7 +167,12 @@ def command(
assert listeners is not None
assert address is not None
listeners.publish_command_start(
orig, dbname, request_id, address, service_id=conn.service_id
orig,
dbname,
request_id,
address,
conn.server_connection_id,
service_id=conn.service_id,
)
start = datetime.datetime.now()
@ -209,6 +214,7 @@ def command(
name,
request_id,
address,
conn.server_connection_id,
service_id=conn.service_id,
database_name=dbname,
)
@ -223,6 +229,7 @@ def command(
name,
request_id,
address,
conn.server_connection_id,
service_id=conn.service_id,
speculative_hello=speculative_hello,
database_name=dbname,

View File

@ -762,6 +762,7 @@ class Connection:
self.more_to_come: bool = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
# When executing a transaction in load balancing mode, this flag is
# set to true to indicate that the session now owns the connection.
self.pinned_txn = False
@ -902,6 +903,7 @@ class Connection:
self.compression_context = ctx
self.op_msg_enabled = True
self.server_connection_id = hello.connection_id
if creds:
self.negotiated_mechs = hello.sasl_supported_mechs
if auth_ctx:

View File

@ -135,7 +135,12 @@ class Server:
cmd["$db"] = dbn
assert listeners is not None
listeners.publish_command_start(
cmd, dbn, request_id, conn.address, service_id=conn.service_id
cmd,
dbn,
request_id,
conn.address,
conn.server_connection_id,
service_id=conn.service_id,
)
start = datetime.now()
@ -178,6 +183,7 @@ class Server:
operation.name,
request_id,
conn.address,
conn.server_connection_id,
service_id=conn.service_id,
database_name=dbn,
)
@ -204,6 +210,7 @@ class Server:
operation.name,
request_id,
conn.address,
conn.server_connection_id,
service_id=conn.service_id,
database_name=dbn,
)

View File

@ -0,0 +1,101 @@
{
"description": "pre-42-server-connection-id",
"schemaVersion": "1.6",
"runOnRequirements": [
{
"maxServerVersion": "4.0.99"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "server-connection-id-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "coll"
}
}
],
"initialData": [
{
"databaseName": "server-connection-id-tests",
"collectionName": "coll",
"documents": []
}
],
"tests": [
{
"description": "command events do not include server connection id",
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "insert",
"hasServerConnectionId": false
}
},
{
"commandSucceededEvent": {
"commandName": "insert",
"hasServerConnectionId": false
}
},
{
"commandStartedEvent": {
"commandName": "find",
"hasServerConnectionId": false
}
},
{
"commandFailedEvent": {
"commandName": "find",
"hasServerConnectionId": false
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,101 @@
{
"description": "server-connection-id",
"schemaVersion": "1.6",
"runOnRequirements": [
{
"minServerVersion": "4.2"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "server-connection-id-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "coll"
}
}
],
"initialData": [
{
"databaseName": "server-connection-id-tests",
"collectionName": "coll",
"documents": []
}
],
"tests": [
{
"description": "command events include server connection id",
"operations": [
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"x": 1
}
}
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"$or": true
}
},
"expectError": {
"isError": true
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "insert",
"hasServerConnectionId": true
}
},
{
"commandSucceededEvent": {
"commandName": "insert",
"hasServerConnectionId": true
}
},
{
"commandStartedEvent": {
"commandName": "find",
"hasServerConnectionId": true
}
},
{
"commandFailedEvent": {
"commandName": "find",
"hasServerConnectionId": true
}
}
]
}
]
}
]
}

View File

@ -1085,7 +1085,7 @@ class TestCommandMonitoring(IntegrationTest):
self.listener.reset()
cmd = SON([("getnonce", 1)])
listeners.publish_command_start(cmd, "pymongo_test", 12345, self.client.address) # type: ignore[arg-type]
listeners.publish_command_start(cmd, "pymongo_test", 12345, self.client.address, None) # type: ignore[arg-type]
delta = datetime.timedelta(milliseconds=100)
listeners.publish_command_success(
delta,
@ -1093,6 +1093,7 @@ class TestCommandMonitoring(IntegrationTest):
"getnonce",
12345,
self.client.address, # type: ignore[arg-type]
None,
database_name="pymongo_test",
)
started = self.listener.started_events[0]
@ -1161,7 +1162,7 @@ class TestEventClasses(unittest.TestCase):
self.assertEqual(
repr(event),
"<CommandStartedEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, service_id: None>",
"command: 'ping', operation_id: 2, service_id: None, server_connection_id: None>",
)
delta = datetime.timedelta(milliseconds=100)
event = monitoring.CommandSucceededEvent(
@ -1171,7 +1172,7 @@ class TestEventClasses(unittest.TestCase):
repr(event),
"<CommandSucceededEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"service_id: None>",
"service_id: None, server_connection_id: None>",
)
event = monitoring.CommandFailedEvent(
delta, {"ok": 0}, "ping", request_id, connection_id, operation_id, database_name=db_name
@ -1180,7 +1181,7 @@ class TestEventClasses(unittest.TestCase):
repr(event),
"<CommandFailedEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"failure: {'ok': 0}, service_id: None>",
"failure: {'ok': 0}, service_id: None, server_connection_id: None>",
)
def test_server_heartbeat_event_repr(self):

View File

@ -779,6 +779,14 @@ class MatchEvaluatorUtil:
else:
self.test.assertIsNone(actual.service_id)
def assertHasServerConnectionId(self, spec, actual):
if "hasServerConnectionId" in spec:
if spec.get("hasServerConnectionId"):
self.test.assertIsNotNone(actual.server_connection_id)
self.test.assertIsInstance(actual.server_connection_id, int)
else:
self.test.assertIsNone(actual.server_connection_id)
def match_server_description(self, actual: ServerDescription, spec: dict) -> None:
if "type" in spec:
self.test.assertEqual(actual.server_type_name, spec["type"])
@ -807,6 +815,7 @@ class MatchEvaluatorUtil:
self.match_result(command, actual.command)
self.assertHasDatabaseName(spec, actual)
self.assertHasServiceId(spec, actual)
self.assertHasServerConnectionId(spec, actual)
elif name == "commandSucceededEvent":
self.test.assertIsInstance(actual, CommandSucceededEvent)
reply = spec.get("reply")
@ -814,10 +823,12 @@ class MatchEvaluatorUtil:
self.match_result(reply, actual.reply)
self.assertHasDatabaseName(spec, actual)
self.assertHasServiceId(spec, actual)
self.assertHasServerConnectionId(spec, actual)
elif name == "commandFailedEvent":
self.test.assertIsInstance(actual, CommandFailedEvent)
self.assertHasServiceId(spec, actual)
self.assertHasDatabaseName(spec, actual)
self.assertHasServerConnectionId(spec, actual)
elif name == "poolCreatedEvent":
self.test.assertIsInstance(actual, PoolCreatedEvent)
elif name == "poolReadyEvent":