PYTHON-3598 Add Command(Succeeded|Failed)Event.database_name property (#1368)

This commit is contained in:
Jib 2023-10-06 10:45:41 -04:00 committed by GitHub
parent cbd61c5236
commit 556557255a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 273 additions and 34 deletions

View File

@ -1,6 +1,13 @@
Changelog
=========
Changes in Version 4.6
----------------------
PyMongo 4.6 brings a number of improvements including:
- Added the :attr:`pymongo.monitoring.CommandSucceededEvent.database_name` property.
- Added the :attr:`pymongo.monitoring.CommandFailedEvent.database_name` property.
Changes in Version 4.5
----------------------

View File

@ -1097,6 +1097,7 @@ class _BulkWriteContext:
self.conn.address,
self.op_id,
self.conn.service_id,
database_name=self.db_name,
)
def _fail(self, request_id: int, failure: _DocumentOut, duration: timedelta) -> None:
@ -1109,6 +1110,7 @@ class _BulkWriteContext:
self.conn.address,
self.op_id,
self.conn.service_id,
database_name=self.db_name,
)

View File

@ -561,7 +561,7 @@ def _is_speculative_authenticate(command_name: str, doc: Mapping[str, Any]) -> b
class _CommandEvent:
"""Base class for command events."""
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id", "__service_id")
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id", "__service_id", "__db")
def __init__(
self,
@ -570,12 +570,14 @@ class _CommandEvent:
connection_id: _Address,
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
) -> None:
self.__cmd_name = command_name
self.__rqst_id = request_id
self.__conn_id = connection_id
self.__op_id = operation_id
self.__service_id = service_id
self.__db = database_name
@property
def command_name(self) -> str:
@ -605,6 +607,14 @@ class _CommandEvent:
"""An id for this series of events or None."""
return self.__op_id
@property
def database_name(self) -> str:
"""The database_name this command was sent to, or ``""``.
.. versionadded:: 4.6
"""
return self.__db
class CommandStartedEvent(_CommandEvent):
"""Event published when a command starts.
@ -619,7 +629,7 @@ class CommandStartedEvent(_CommandEvent):
- `service_id`: The service_id this command was sent to, or ``None``.
"""
__slots__ = ("__cmd", "__db")
__slots__ = ("__cmd",)
def __init__(
self,
@ -635,14 +645,18 @@ class CommandStartedEvent(_CommandEvent):
# Command name must be first key.
command_name = next(iter(command))
super().__init__(
command_name, request_id, connection_id, operation_id, service_id=service_id
command_name,
request_id,
connection_id,
operation_id,
service_id=service_id,
database_name=database_name,
)
cmd_name = command_name.lower()
if cmd_name in _SENSITIVE_COMMANDS or _is_speculative_authenticate(cmd_name, command):
self.__cmd: _DocumentOut = {}
else:
self.__cmd = command
self.__db = database_name
@property
def command(self) -> _DocumentOut:
@ -652,7 +666,7 @@ class CommandStartedEvent(_CommandEvent):
@property
def database_name(self) -> str:
"""The name of the database this command was run against."""
return self.__db
return super().database_name
def __repr__(self) -> str:
return ("<{} {} db: {!r}, command: {!r}, operation_id: {}, service_id: {}>").format(
@ -677,6 +691,7 @@ class CommandSucceededEvent(_CommandEvent):
was sent to.
- `operation_id`: An optional identifier for a series of related events.
- `service_id`: The service_id this command was sent to, or ``None``.
- `database_name`: The database this command was sent to, or ``""``.
"""
__slots__ = ("__duration_micros", "__reply")
@ -690,9 +705,15 @@ class CommandSucceededEvent(_CommandEvent):
connection_id: _Address,
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
) -> None:
super().__init__(
command_name, request_id, connection_id, operation_id, service_id=service_id
command_name,
request_id,
connection_id,
operation_id,
service_id=service_id,
database_name=database_name,
)
self.__duration_micros = _to_micros(duration)
cmd_name = command_name.lower()
@ -713,10 +734,11 @@ class CommandSucceededEvent(_CommandEvent):
def __repr__(self) -> str:
return (
"<{} {} command: {!r}, operation_id: {}, duration_micros: {}, service_id: {}>"
"<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, service_id: {}>"
).format(
self.__class__.__name__,
self.connection_id,
self.database_name,
self.command_name,
self.operation_id,
self.duration_micros,
@ -736,6 +758,7 @@ class CommandFailedEvent(_CommandEvent):
was sent to.
- `operation_id`: An optional identifier for a series of related events.
- `service_id`: The service_id this command was sent to, or ``None``.
- `database_name`: The database this command was sent to, or ``""``.
"""
__slots__ = ("__duration_micros", "__failure")
@ -749,9 +772,15 @@ class CommandFailedEvent(_CommandEvent):
connection_id: _Address,
operation_id: Optional[int],
service_id: Optional[ObjectId] = None,
database_name: str = "",
) -> None:
super().__init__(
command_name, request_id, connection_id, operation_id, service_id=service_id
command_name,
request_id,
connection_id,
operation_id,
service_id=service_id,
database_name=database_name,
)
self.__duration_micros = _to_micros(duration)
self.__failure = failure
@ -768,11 +797,12 @@ class CommandFailedEvent(_CommandEvent):
def __repr__(self) -> str:
return (
"<{} {} command: {!r}, operation_id: {}, duration_micros: {}, "
"<{} {} db: {!r}, command: {!r}, operation_id: {}, duration_micros: {}, "
"failure: {!r}, service_id: {}>"
).format(
self.__class__.__name__,
self.connection_id,
self.database_name,
self.command_name,
self.operation_id,
self.duration_micros,
@ -1491,6 +1521,7 @@ class _EventListeners:
op_id: Optional[int] = None,
service_id: Optional[ObjectId] = None,
speculative_hello: bool = False,
database_name: str = "",
) -> None:
"""Publish a CommandSucceededEvent to all command listeners.
@ -1504,6 +1535,7 @@ class _EventListeners:
- `op_id`: The (optional) operation id for this operation.
- `service_id`: The service_id this command was sent to, or ``None``.
- `speculative_hello`: Was the command sent with speculative auth?
- `database_name`: The database this command was sent to, or ``""``.
"""
if op_id is None:
op_id = request_id
@ -1512,7 +1544,14 @@ class _EventListeners:
# speculativeAuthenticate.
reply = {}
event = CommandSucceededEvent(
duration, reply, command_name, request_id, connection_id, op_id, service_id
duration,
reply,
command_name,
request_id,
connection_id,
op_id,
service_id,
database_name=database_name,
)
for subscriber in self.__command_listeners:
try:
@ -1529,6 +1568,7 @@ class _EventListeners:
connection_id: _Address,
op_id: Optional[int] = None,
service_id: Optional[ObjectId] = None,
database_name: str = "",
) -> None:
"""Publish a CommandFailedEvent to all command listeners.
@ -1542,11 +1582,19 @@ class _EventListeners:
command was sent to.
- `op_id`: The (optional) operation id for this operation.
- `service_id`: The service_id this command was sent to, or ``None``.
- `database_name`: The database this command was sent to, or ``""``.
"""
if op_id is None:
op_id = request_id
event = CommandFailedEvent(
duration, failure, command_name, request_id, connection_id, op_id, service_id=service_id
duration,
failure,
command_name,
request_id,
connection_id,
op_id,
service_id=service_id,
database_name=database_name,
)
for subscriber in self.__command_listeners:
try:

View File

@ -205,7 +205,13 @@ def command(
assert listeners is not None
assert address is not None
listeners.publish_command_failure(
duration, failure, name, request_id, address, service_id=conn.service_id
duration,
failure,
name,
request_id,
address,
service_id=conn.service_id,
database_name=dbname,
)
raise
if publish:
@ -220,6 +226,7 @@ def command(
address,
service_id=conn.service_id,
speculative_hello=speculative_hello,
database_name=dbname,
)
if client and client._encrypter and reply:

View File

@ -178,6 +178,7 @@ class Server:
request_id,
conn.address,
service_id=conn.service_id,
database_name=dbn,
)
raise
@ -203,6 +204,7 @@ class Server:
request_id,
conn.address,
service_id=conn.service_id,
database_name=dbn,
)
# Decrypt response.

View File

@ -1,6 +1,6 @@
{
"description": "find",
"schemaVersion": "1.1",
"schemaVersion": "1.15",
"createEntities": [
{
"client": {
@ -103,7 +103,8 @@
]
}
},
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
}
]
@ -198,7 +199,8 @@
]
}
},
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
}
]
@ -262,7 +264,8 @@
]
}
},
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
}
]
@ -338,7 +341,8 @@
]
}
},
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
},
{
@ -376,7 +380,8 @@
]
}
},
"commandName": "getMore"
"commandName": "getMore",
"databaseName": "command-monitoring-tests"
}
}
]
@ -464,7 +469,8 @@
]
}
},
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
},
{
@ -498,7 +504,8 @@
]
}
},
"commandName": "getMore"
"commandName": "getMore",
"databaseName": "command-monitoring-tests"
}
}
]
@ -539,7 +546,8 @@
},
{
"commandFailedEvent": {
"commandName": "find"
"commandName": "find",
"databaseName": "command-monitoring-tests"
}
}
]

View File

@ -0,0 +1,155 @@
{
"description": "writeConcernError",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.1.0",
"topologies": [
"replicaset"
],
"serverless": "forbid"
}
],
"createEntities": [
{
"client": {
"id": "client",
"observeEvents": [
"commandStartedEvent",
"commandSucceededEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database",
"client": "client",
"databaseName": "command-monitoring-tests"
}
},
{
"collection": {
"id": "collection",
"database": "database",
"collectionName": "test"
}
}
],
"initialData": [
{
"collectionName": "test",
"databaseName": "command-monitoring-tests",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "A retryable write with write concern errors publishes success event",
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"insert"
],
"writeConcernError": {
"code": 91,
"errorLabels": [
"RetryableWriteError"
]
}
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 2,
"x": 22
}
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "test",
"documents": [
{
"_id": 2,
"x": 22
}
],
"ordered": true
},
"commandName": "insert",
"databaseName": "command-monitoring-tests"
}
},
{
"commandSucceededEvent": {
"reply": {
"ok": 1,
"n": 1,
"writeConcernError": {
"code": 91,
"errorLabels": [
"RetryableWriteError"
]
}
},
"commandName": "insert"
}
},
{
"commandStartedEvent": {
"command": {
"insert": "test",
"documents": [
{
"_id": 2,
"x": 22
}
],
"ordered": true
},
"commandName": "insert",
"databaseName": "command-monitoring-tests"
}
},
{
"commandSucceededEvent": {
"reply": {
"ok": 1,
"n": 1
},
"commandName": "insert"
}
}
]
}
]
}
]
}

View File

@ -1087,7 +1087,12 @@ class TestCommandMonitoring(IntegrationTest):
listeners.publish_command_start(cmd, "pymongo_test", 12345, self.client.address) # type: ignore[arg-type]
delta = datetime.timedelta(milliseconds=100)
listeners.publish_command_success(
delta, {"nonce": "e474f4561c5eb40b", "ok": 1.0}, "getnonce", 12345, self.client.address # type: ignore[arg-type]
delta,
{"nonce": "e474f4561c5eb40b", "ok": 1.0},
"getnonce",
12345,
self.client.address, # type: ignore[arg-type]
database_name="pymongo_test",
)
started = self.listener.started_events[0]
succeeded = self.listener.succeeded_events[0]
@ -1148,9 +1153,9 @@ class TestGlobalListener(IntegrationTest):
class TestEventClasses(unittest.TestCase):
def test_command_event_repr(self):
request_id, connection_id, operation_id = 1, ("localhost", 27017), 2
request_id, connection_id, operation_id, db_name = 1, ("localhost", 27017), 2, "admin"
event = monitoring.CommandStartedEvent(
{"ping": 1}, "admin", request_id, connection_id, operation_id
{"ping": 1}, db_name, request_id, connection_id, operation_id
)
self.assertEqual(
repr(event),
@ -1159,20 +1164,20 @@ class TestEventClasses(unittest.TestCase):
)
delta = datetime.timedelta(milliseconds=100)
event = monitoring.CommandSucceededEvent(
delta, {"ok": 1}, "ping", request_id, connection_id, operation_id
delta, {"ok": 1}, "ping", request_id, connection_id, operation_id, database_name=db_name
)
self.assertEqual(
repr(event),
"<CommandSucceededEvent ('localhost', 27017) "
"<CommandSucceededEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"service_id: None>",
)
event = monitoring.CommandFailedEvent(
delta, {"ok": 0}, "ping", request_id, connection_id, operation_id
delta, {"ok": 0}, "ping", request_id, connection_id, operation_id, database_name=db_name
)
self.assertEqual(
repr(event),
"<CommandFailedEvent ('localhost', 27017) "
"<CommandFailedEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"failure: {'ok': 0}, service_id: None>",
)

View File

@ -134,14 +134,14 @@ KMS_TLS_OPTS = {
# Build up a placeholder map.
PLACEHOLDER_MAP = {}
for (provider_name, provider_data) in [
for provider_name, provider_data in [
("local", {"key": LOCAL_MASTER_KEY}),
("aws", AWS_CREDS),
("azure", AZURE_CREDS),
("gcp", GCP_CREDS),
("kmip", KMIP_CREDS),
]:
for (key, value) in provider_data.items():
for key, value in provider_data.items():
placeholder = f"/clientEncryptionOpts/kmsProviders/{provider_name}/{key}"
PLACEHOLDER_MAP[placeholder] = value
@ -156,6 +156,7 @@ def with_metaclass(meta, *bases):
Vendored from six: https://github.com/benjaminp/six/blob/master/six.py
"""
# This requires a bit of explanation: the basic idea is to make a dummy
# metaclass for one level of class instantiation that replaces itself with
# the actual metaclass.
@ -746,6 +747,10 @@ class MatchEvaluatorUtil:
self.test.assertEqual(expectation, actual)
return None
def assertHasDatabaseName(self, spec, actual):
if "databaseName" in spec:
self.test.assertEqual(spec["databaseName"], actual.database_name)
def assertHasServiceId(self, spec, actual):
if "hasServiceId" in spec:
if spec.get("hasServiceId"):
@ -778,21 +783,21 @@ class MatchEvaluatorUtil:
if name == "commandStartedEvent":
self.test.assertIsInstance(actual, CommandStartedEvent)
command = spec.get("command")
database_name = spec.get("databaseName")
if command:
self.match_result(command, actual.command)
if database_name:
self.test.assertEqual(database_name, actual.database_name)
self.assertHasDatabaseName(spec, actual)
self.assertHasServiceId(spec, actual)
elif name == "commandSucceededEvent":
self.test.assertIsInstance(actual, CommandSucceededEvent)
reply = spec.get("reply")
if reply:
self.match_result(reply, actual.reply)
self.assertHasDatabaseName(spec, actual)
self.assertHasServiceId(spec, actual)
elif name == "commandFailedEvent":
self.test.assertIsInstance(actual, CommandFailedEvent)
self.assertHasServiceId(spec, actual)
self.assertHasDatabaseName(spec, actual)
elif name == "poolCreatedEvent":
self.test.assertIsInstance(actual, PoolCreatedEvent)
elif name == "poolReadyEvent":
@ -863,7 +868,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string("1.12")
SCHEMA_VERSION = Version.from_string("1.15")
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
TEST_SPEC: Any