From 28697df6f850f5c7eebcc7445c4f3adc50c1230e Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 28 Aug 2024 11:39:03 -0700 Subject: [PATCH 1/2] PYTHON-4691 Fix non-UTC timezones with DATETIME_CLAMP/DATETIME_AUTO (#1811) --- bson/datetime_ms.py | 34 +++++++++++++----- test/test_bson.py | 84 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 98 insertions(+), 20 deletions(-) diff --git a/bson/datetime_ms.py b/bson/datetime_ms.py index 112871a16..48e57e0d1 100644 --- a/bson/datetime_ms.py +++ b/bson/datetime_ms.py @@ -114,17 +114,40 @@ class DatetimeMS: return self._value +def _datetime_to_millis(dtm: datetime.datetime) -> int: + """Convert datetime to milliseconds since epoch UTC.""" + if dtm.utcoffset() is not None: + dtm = dtm - dtm.utcoffset() # type: ignore + return int(calendar.timegm(dtm.timetuple()) * 1000 + dtm.microsecond // 1000) + + +_MIN_UTC = datetime.datetime.min.replace(tzinfo=utc) +_MAX_UTC = datetime.datetime.max.replace(tzinfo=utc) +_MIN_UTC_MS = _datetime_to_millis(_MIN_UTC) +_MAX_UTC_MS = _datetime_to_millis(_MAX_UTC) + + # Inclusive and exclusive min and max for timezones. # Timezones are hashed by their offset, which is a timedelta # and therefore there are more than 24 possible timezones. @functools.lru_cache(maxsize=None) def _min_datetime_ms(tz: datetime.timezone = datetime.timezone.utc) -> int: - return _datetime_to_millis(datetime.datetime.min.replace(tzinfo=tz)) + delta = tz.utcoffset(_MIN_UTC) + if delta is not None: + offset_millis = (delta.days * 86400 + delta.seconds) * 1000 + delta.microseconds // 1000 + else: + offset_millis = 0 + return max(_MIN_UTC_MS, _MIN_UTC_MS - offset_millis) @functools.lru_cache(maxsize=None) def _max_datetime_ms(tz: datetime.timezone = datetime.timezone.utc) -> int: - return _datetime_to_millis(datetime.datetime.max.replace(tzinfo=tz)) + delta = tz.utcoffset(_MAX_UTC) + if delta is not None: + offset_millis = (delta.days * 86400 + delta.seconds) * 1000 + delta.microseconds // 1000 + else: + offset_millis = 0 + return min(_MAX_UTC_MS, _MAX_UTC_MS - offset_millis) def _millis_to_datetime( @@ -162,10 +185,3 @@ def _millis_to_datetime( return DatetimeMS(millis) else: raise ValueError("datetime_conversion must be an element of DatetimeConversion") - - -def _datetime_to_millis(dtm: datetime.datetime) -> int: - """Convert datetime to milliseconds since epoch UTC.""" - if dtm.utcoffset() is not None: - dtm = dtm - dtm.utcoffset() # type: ignore - return int(calendar.timegm(dtm.timetuple()) * 1000 + dtm.microsecond // 1000) diff --git a/test/test_bson.py b/test/test_bson.py index 8c8fe6018..4996c46b9 100644 --- a/test/test_bson.py +++ b/test/test_bson.py @@ -1252,54 +1252,116 @@ class TestDatetimeConversion(unittest.TestCase): def test_clamping(self): # Test clamping from below and above. - opts1 = CodecOptions( + opts = CodecOptions( datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True, tzinfo=datetime.timezone.utc, ) below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1)}) - dec_below = decode(below, opts1) + dec_below = decode(below, opts) self.assertEqual( dec_below["x"], datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) ) above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1)}) - dec_above = decode(above, opts1) + dec_above = decode(above, opts) self.assertEqual( dec_above["x"], datetime.datetime.max.replace(tzinfo=datetime.timezone.utc, microsecond=999000), ) - def test_tz_clamping(self): + def test_tz_clamping_local(self): # Naive clamping to local tz. - opts1 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=False) + opts = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=False) below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 24 * 60 * 60)}) - dec_below = decode(below, opts1) + dec_below = decode(below, opts) self.assertEqual(dec_below["x"], datetime.datetime.min) above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 24 * 60 * 60)}) - dec_above = decode(above, opts1) + dec_above = decode(above, opts) self.assertEqual( dec_above["x"], datetime.datetime.max.replace(microsecond=999000), ) - # Aware clamping. - opts2 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True) + def test_tz_clamping_utc(self): + # Aware clamping default utc. + opts = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True) below = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 24 * 60 * 60)}) - dec_below = decode(below, opts2) + dec_below = decode(below, opts) self.assertEqual( dec_below["x"], datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) ) above = encode({"x": DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 24 * 60 * 60)}) - dec_above = decode(above, opts2) + dec_above = decode(above, opts) self.assertEqual( dec_above["x"], datetime.datetime.max.replace(tzinfo=datetime.timezone.utc, microsecond=999000), ) + def test_tz_clamping_non_utc(self): + for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]: + opts = CodecOptions( + datetime_conversion=DatetimeConversion.DATETIME_CLAMP, tz_aware=True, tzinfo=tz + ) + # Min/max values in this timezone which can be represented in both BSON and datetime UTC. + try: + min_tz = datetime.datetime.min.replace(tzinfo=utc).astimezone(tz) + except OverflowError: + min_tz = datetime.datetime.min.replace(tzinfo=tz) + try: + max_tz = datetime.datetime.max.replace(tzinfo=utc, microsecond=999000).astimezone( + tz + ) + except OverflowError: + max_tz = datetime.datetime.max.replace(tzinfo=tz, microsecond=999000) + + for in_range in [ + min_tz, + min_tz + datetime.timedelta(milliseconds=1), + max_tz - datetime.timedelta(milliseconds=1), + max_tz, + ]: + doc = decode(encode({"x": in_range}), opts) + self.assertEqual(doc["x"], in_range) + + for too_low in [ + DatetimeMS(_datetime_to_millis(min_tz) - 1), + DatetimeMS(_datetime_to_millis(min_tz) - 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(min_tz) - 1 - 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1), + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(datetime.datetime.min) - 1 - 60 * 60 * 1000), + ]: + doc = decode(encode({"x": too_low}), opts) + self.assertEqual(doc["x"], min_tz) + + for too_high in [ + DatetimeMS(_datetime_to_millis(max_tz) + 1), + DatetimeMS(_datetime_to_millis(max_tz) + 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(max_tz) + 1 + 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1), + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 60 * 60 * 1000), + DatetimeMS(_datetime_to_millis(datetime.datetime.max) + 1 + 60 * 60 * 1000), + ]: + doc = decode(encode({"x": too_high}), opts) + self.assertEqual(doc["x"], max_tz) + + def test_tz_clamping_non_utc_simple(self): + dtm = datetime.datetime(2024, 8, 23) + encoded = encode({"d": dtm}) + self.assertEqual(decode(encoded)["d"], dtm) + for conversion in [ + DatetimeConversion.DATETIME, + DatetimeConversion.DATETIME_CLAMP, + DatetimeConversion.DATETIME_AUTO, + ]: + for tz in [FixedOffset(60, "+1H"), FixedOffset(-60, "-1H")]: + opts = CodecOptions(datetime_conversion=conversion, tz_aware=True, tzinfo=tz) + self.assertEqual(decode(encoded, opts)["d"], dtm.replace(tzinfo=utc).astimezone(tz)) + def test_datetime_auto(self): # Naive auto, in range. opts1 = CodecOptions(datetime_conversion=DatetimeConversion.DATETIME_AUTO) From c6967ab139cbc68099e84e2bb6f3789e65804e09 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 28 Aug 2024 14:48:49 -0400 Subject: [PATCH 2/2] PYTHON-3472 - Add log messages to SDAM spec (#1771) Co-authored-by: Jib --- pymongo/asynchronous/monitor.py | 56 +- pymongo/asynchronous/server.py | 17 +- pymongo/asynchronous/topology.py | 59 +- pymongo/logger.py | 14 +- pymongo/synchronous/monitor.py | 56 +- pymongo/synchronous/server.py | 17 +- pymongo/synchronous/topology.py | 59 +- .../rs/compatible.json | 2 +- .../rs/compatible_unknown.json | 2 +- .../sharded/compatible.json | 2 +- .../single/compatible.json | 2 +- .../single/too_old_then_upgraded.json | 4 +- .../unified/logging-loadbalanced.json | 150 +++++ .../unified/logging-replicaset.json | 606 ++++++++++++++++++ .../unified/logging-sharded.json | 492 ++++++++++++++ .../unified/logging-standalone.json | 517 +++++++++++++++ test/unified_format.py | 55 +- 17 files changed, 2064 insertions(+), 46 deletions(-) create mode 100644 test/discovery_and_monitoring/unified/logging-loadbalanced.json create mode 100644 test/discovery_and_monitoring/unified/logging-replicaset.json create mode 100644 test/discovery_and_monitoring/unified/logging-sharded.json create mode 100644 test/discovery_and_monitoring/unified/logging-standalone.json diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index d2ac8868e..f9e912b08 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -17,6 +17,7 @@ from __future__ import annotations import atexit +import logging import time import weakref from typing import TYPE_CHECKING, Any, Mapping, Optional, cast @@ -28,6 +29,7 @@ from pymongo.asynchronous.periodic_executor import _shutdown_executors from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock +from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription @@ -257,10 +259,21 @@ class Monitor(MonitorBase): sd = self._server_description address = sd.address duration = _monotonic_duration(start) + awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) if self._publish: - awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) assert self._listeners is not None self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + durationMS=duration * 1000, + failure=error, + message=_SDAMStatusMessage.HEARTBEAT_FAIL, + ) await self._reset_connection() if isinstance(error, _OperationCancelled): raise @@ -274,22 +287,32 @@ class Monitor(MonitorBase): Returns a ServerDescription, or raises an exception. """ address = self._server_description.address + sd = self._server_description + + # XXX: "awaited" could be incorrectly set to True in the rare case + # the pool checkout closes and recreates a connection. + awaited = bool( + self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version + ) if self._publish: assert self._listeners is not None - sd = self._server_description - # XXX: "awaited" could be incorrectly set to True in the rare case - # the pool checkout closes and recreates a connection. - awaited = bool( - self._pool.conns - and self._stream - and sd.is_server_type_known - and sd.topology_version - ) self._listeners.publish_server_heartbeat_started(address, awaited) if self._cancel_context and self._cancel_context.cancelled: await self._reset_connection() async with self._pool.checkout() as conn: + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + message=_SDAMStatusMessage.HEARTBEAT_START, + ) + self._cancel_context = conn.cancel_context response, round_trip_time = await self._check_with_socket(conn) if not response.awaitable: @@ -302,6 +325,19 @@ class Monitor(MonitorBase): self._listeners.publish_server_heartbeat_succeeded( address, round_trip_time, response, response.awaitable ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + durationMS=round_trip_time * 1000, + reply=response.document, + message=_SDAMStatusMessage.HEARTBEAT_SUCCESS, + ) return sd async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]: diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 892594c97..8d0024afd 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -30,7 +30,13 @@ from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.logger import ( + _COMMAND_LOGGER, + _SDAM_LOGGER, + _CommandStatusMessage, + _debug_log, + _SDAMStatusMessage, +) from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response @@ -99,6 +105,15 @@ class Server: (self._description.address, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + serverHost=self._description.address[0], + serverPort=self._description.address[1], + message=_SDAMStatusMessage.STOP_SERVER, + ) + await self._monitor.close() await self._pool.close() diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 4e55db498..2df30d244 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -46,8 +46,10 @@ from pymongo.errors import ( from pymongo.hello import Hello from pymongo.lock import _ACondition, _ALock, _create_lock from pymongo.logger import ( + _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, _debug_log, + _SDAMStatusMessage, _ServerSelectionStatusMessage, ) from pymongo.pool_options import PoolOptions @@ -110,6 +112,13 @@ class Topology: if self._publish_server or self._publish_tp: self._events = queue.Queue(maxsize=100) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + message=_SDAMStatusMessage.START_TOPOLOGY, + ) + if self._publish_tp: assert self._events is not None self._events.put((self._listeners.publish_topology_opened, (self._topology_id,))) @@ -124,22 +133,38 @@ class Topology: ) self._description = topology_description + initial_td = TopologyDescription( + TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings + ) if self._publish_tp: assert self._events is not None - initial_td = TopologyDescription( - TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings - ) self._events.put( ( self._listeners.publish_topology_description_changed, (initial_td, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=initial_td, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) for seed in topology_settings.seeds: if self._publish_server: assert self._events is not None self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id))) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + serverHost=seed[0], + serverPort=seed[1], + message=_SDAMStatusMessage.START_SERVER, + ) # Store the seed list to help diagnose errors in _error_message(). self._seed_addresses = list(topology_description.server_descriptions()) @@ -472,6 +497,14 @@ class Topology: (td_old, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=td_old, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) # Shutdown SRV polling for unsupported cluster types. # This is only applicable if the old topology was Unknown, and the @@ -530,6 +563,14 @@ class Topology: (td_old, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=td_old, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) async def on_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new list of nodes obtained from scanning SRV records.""" @@ -684,6 +725,18 @@ class Topology: ) ) self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=old_td, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) + _debug_log( + _SDAM_LOGGER, topologyId=self._topology_id, message=_SDAMStatusMessage.STOP_TOPOLOGY + ) + if self._publish_server or self._publish_tp: # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() diff --git a/pymongo/logger.py b/pymongo/logger.py index 2caafa778..2ff35328b 100644 --- a/pymongo/logger.py +++ b/pymongo/logger.py @@ -53,6 +53,17 @@ class _ConnectionStatusMessage(str, enum.Enum): CHECKEDIN = "Connection checked in" +class _SDAMStatusMessage(str, enum.Enum): + START_TOPOLOGY = "Starting topology monitoring" + STOP_TOPOLOGY = "Stopped topology monitoring" + START_SERVER = "Starting server monitoring" + STOP_SERVER = "Stopped server monitoring" + TOPOLOGY_CHANGE = "Topology description changed" + HEARTBEAT_START = "Server heartbeat started" + HEARTBEAT_SUCCESS = "Server heartbeat succeeded" + HEARTBEAT_FAIL = "Server heartbeat failed" + + _DEFAULT_DOCUMENT_LENGTH = 1000 _SENSITIVE_COMMANDS = [ "authenticate", @@ -73,6 +84,7 @@ _COMMAND_LOGGER = logging.getLogger("pymongo.command") _CONNECTION_LOGGER = logging.getLogger("pymongo.connection") _SERVER_SELECTION_LOGGER = logging.getLogger("pymongo.serverSelection") _CLIENT_LOGGER = logging.getLogger("pymongo.client") +_SDAM_LOGGER = logging.getLogger("pymongo.topology") _VERBOSE_CONNECTION_ERROR_REASONS = { ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed", ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed", @@ -129,7 +141,7 @@ class LogMessage: ) is_sensitive_hello = ( - self._kwargs["commandName"] in _HELLO_COMMANDS and is_speculative_authenticate + self._kwargs.get("commandName", None) in _HELLO_COMMANDS and is_speculative_authenticate ) return is_sensitive_command or is_sensitive_hello diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index e3d1f7bf2..3f9bb2ea7 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -17,6 +17,7 @@ from __future__ import annotations import atexit +import logging import time import weakref from typing import TYPE_CHECKING, Any, Mapping, Optional, cast @@ -26,6 +27,7 @@ from pymongo._csot import MovingMinimum from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock +from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription @@ -257,10 +259,21 @@ class Monitor(MonitorBase): sd = self._server_description address = sd.address duration = _monotonic_duration(start) + awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) if self._publish: - awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) assert self._listeners is not None self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + durationMS=duration * 1000, + failure=error, + message=_SDAMStatusMessage.HEARTBEAT_FAIL, + ) self._reset_connection() if isinstance(error, _OperationCancelled): raise @@ -274,22 +287,32 @@ class Monitor(MonitorBase): Returns a ServerDescription, or raises an exception. """ address = self._server_description.address + sd = self._server_description + + # XXX: "awaited" could be incorrectly set to True in the rare case + # the pool checkout closes and recreates a connection. + awaited = bool( + self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version + ) if self._publish: assert self._listeners is not None - sd = self._server_description - # XXX: "awaited" could be incorrectly set to True in the rare case - # the pool checkout closes and recreates a connection. - awaited = bool( - self._pool.conns - and self._stream - and sd.is_server_type_known - and sd.topology_version - ) self._listeners.publish_server_heartbeat_started(address, awaited) if self._cancel_context and self._cancel_context.cancelled: self._reset_connection() with self._pool.checkout() as conn: + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + message=_SDAMStatusMessage.HEARTBEAT_START, + ) + self._cancel_context = conn.cancel_context response, round_trip_time = self._check_with_socket(conn) if not response.awaitable: @@ -302,6 +325,19 @@ class Monitor(MonitorBase): self._listeners.publish_server_heartbeat_succeeded( address, round_trip_time, response, response.awaitable ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology._topology_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=address[0], + serverPort=address[1], + awaited=awaited, + durationMS=round_trip_time * 1000, + reply=response.document, + message=_SDAMStatusMessage.HEARTBEAT_SUCCESS, + ) return sd def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]: diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 347155784..ed48cc6cc 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -29,7 +29,13 @@ from typing import ( from bson import _decode_all_selective from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.logger import ( + _COMMAND_LOGGER, + _SDAM_LOGGER, + _CommandStatusMessage, + _debug_log, + _SDAMStatusMessage, +) from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth @@ -99,6 +105,15 @@ class Server: (self._description.address, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + serverHost=self._description.address[0], + serverPort=self._description.address[1], + message=_SDAMStatusMessage.STOP_SERVER, + ) + self._monitor.close() self._pool.close() diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 8542f67bb..54a9d8a69 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -41,8 +41,10 @@ from pymongo.errors import ( from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import ( + _SDAM_LOGGER, _SERVER_SELECTION_LOGGER, _debug_log, + _SDAMStatusMessage, _ServerSelectionStatusMessage, ) from pymongo.pool_options import PoolOptions @@ -110,6 +112,13 @@ class Topology: if self._publish_server or self._publish_tp: self._events = queue.Queue(maxsize=100) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + message=_SDAMStatusMessage.START_TOPOLOGY, + ) + if self._publish_tp: assert self._events is not None self._events.put((self._listeners.publish_topology_opened, (self._topology_id,))) @@ -124,22 +133,38 @@ class Topology: ) self._description = topology_description + initial_td = TopologyDescription( + TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings + ) if self._publish_tp: assert self._events is not None - initial_td = TopologyDescription( - TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings - ) self._events.put( ( self._listeners.publish_topology_description_changed, (initial_td, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=initial_td, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) for seed in topology_settings.seeds: if self._publish_server: assert self._events is not None self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id))) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + serverHost=seed[0], + serverPort=seed[1], + message=_SDAMStatusMessage.START_SERVER, + ) # Store the seed list to help diagnose errors in _error_message(). self._seed_addresses = list(topology_description.server_descriptions()) @@ -472,6 +497,14 @@ class Topology: (td_old, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=td_old, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) # Shutdown SRV polling for unsupported cluster types. # This is only applicable if the old topology was Unknown, and the @@ -530,6 +563,14 @@ class Topology: (td_old, self._description, self._topology_id), ) ) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=td_old, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) def on_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new list of nodes obtained from scanning SRV records.""" @@ -682,6 +723,18 @@ class Topology: ) ) self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _SDAM_LOGGER, + topologyId=self._topology_id, + previousDescription=old_td, + newDescription=self._description, + message=_SDAMStatusMessage.TOPOLOGY_CHANGE, + ) + _debug_log( + _SDAM_LOGGER, topologyId=self._topology_id, message=_SDAMStatusMessage.STOP_TOPOLOGY + ) + if self._publish_server or self._publish_tp: # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() diff --git a/test/discovery_and_monitoring/rs/compatible.json b/test/discovery_and_monitoring/rs/compatible.json index dfd5d57df..444b13e9d 100644 --- a/test/discovery_and_monitoring/rs/compatible.json +++ b/test/discovery_and_monitoring/rs/compatible.json @@ -16,7 +16,7 @@ "b:27017" ], "minWireVersion": 0, - "maxWireVersion": 21 + "maxWireVersion": 6 } ], [ diff --git a/test/discovery_and_monitoring/rs/compatible_unknown.json b/test/discovery_and_monitoring/rs/compatible_unknown.json index 95e03ea95..cf92dd1ed 100644 --- a/test/discovery_and_monitoring/rs/compatible_unknown.json +++ b/test/discovery_and_monitoring/rs/compatible_unknown.json @@ -16,7 +16,7 @@ "b:27017" ], "minWireVersion": 0, - "maxWireVersion": 21 + "maxWireVersion": 6 } ] ], diff --git a/test/discovery_and_monitoring/sharded/compatible.json b/test/discovery_and_monitoring/sharded/compatible.json index ceb0ec24c..e531db97f 100644 --- a/test/discovery_and_monitoring/sharded/compatible.json +++ b/test/discovery_and_monitoring/sharded/compatible.json @@ -23,7 +23,7 @@ "isWritablePrimary": true, "msg": "isdbgrid", "minWireVersion": 0, - "maxWireVersion": 21 + "maxWireVersion": 6 } ] ], diff --git a/test/discovery_and_monitoring/single/compatible.json b/test/discovery_and_monitoring/single/compatible.json index 493d9b748..302927598 100644 --- a/test/discovery_and_monitoring/single/compatible.json +++ b/test/discovery_and_monitoring/single/compatible.json @@ -11,7 +11,7 @@ "helloOk": true, "isWritablePrimary": true, "minWireVersion": 0, - "maxWireVersion": 21 + "maxWireVersion": 6 } ] ], diff --git a/test/discovery_and_monitoring/single/too_old_then_upgraded.json b/test/discovery_and_monitoring/single/too_old_then_upgraded.json index c3dd98cf6..58ae7d9de 100644 --- a/test/discovery_and_monitoring/single/too_old_then_upgraded.json +++ b/test/discovery_and_monitoring/single/too_old_then_upgraded.json @@ -1,5 +1,5 @@ { - "description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 21", + "description": "Standalone with default maxWireVersion of 0 is upgraded to one with maxWireVersion 6", "uri": "mongodb://a", "phases": [ { @@ -35,7 +35,7 @@ "helloOk": true, "isWritablePrimary": true, "minWireVersion": 0, - "maxWireVersion": 21 + "maxWireVersion": 6 } ] ], diff --git a/test/discovery_and_monitoring/unified/logging-loadbalanced.json b/test/discovery_and_monitoring/unified/logging-loadbalanced.json new file mode 100644 index 000000000..45440d255 --- /dev/null +++ b/test/discovery_and_monitoring/unified/logging-loadbalanced.json @@ -0,0 +1,150 @@ +{ + "description": "loadbalanced-logging", + "schemaVersion": "1.16", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ], + "minServerVersion": "4.4" + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "topologyDescriptionChangedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring", + "topologyId": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/logging-replicaset.json b/test/discovery_and_monitoring/unified/logging-replicaset.json new file mode 100644 index 000000000..e6738225c --- /dev/null +++ b/test/discovery_and_monitoring/unified/logging-replicaset.json @@ -0,0 +1,606 @@ +{ + "description": "replicaset-logging", + "schemaVersion": "1.16", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ], + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient" + } + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "topologyDescriptionChangedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring", + "topologyId": { + "$$exists": true + } + } + } + ] + } + ] + }, + { + "description": "Successful heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatSucceededEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 3 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "serverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "reply": { + "$$matchAsDocument": { + "$$matchAsRoot": { + "ok": 1 + } + } + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "serverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "reply": { + "$$matchAsDocument": { + "$$matchAsRoot": { + "ok": 1 + } + } + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "serverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "reply": { + "$$matchAsDocument": { + "$$matchAsRoot": { + "ok": 1 + } + } + } + } + } + ] + } + ] + }, + { + "description": "Failing heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatFailedEvent" + ], + "uriOptions": { + "appname": "failingHeartbeatLoggingTest" + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "failingHeartbeatLoggingTest", + "closeConnection": true + } + }, + "client": "setupClient" + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatFailedEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/logging-sharded.json b/test/discovery_and_monitoring/unified/logging-sharded.json new file mode 100644 index 000000000..61b27f5be --- /dev/null +++ b/test/discovery_and_monitoring/unified/logging-sharded.json @@ -0,0 +1,492 @@ +{ + "description": "sharded-logging", + "schemaVersion": "1.16", + "runOnRequirements": [ + { + "topologies": [ + "sharded" + ], + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "topologyDescriptionChangedEvent" + ], + "useMultipleMongoses": true + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 3 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring", + "topologyId": { + "$$exists": true + } + } + } + ] + } + ] + }, + { + "description": "Successful heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatSucceededEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "serverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "reply": { + "$$matchAsDocument": { + "$$matchAsRoot": { + "ok": 1 + } + } + } + } + } + ] + } + ] + }, + { + "description": "Failing heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatFailedEvent" + ], + "uriOptions": { + "appname": "failingHeartbeatLoggingTest" + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "failingHeartbeatLoggingTest", + "closeConnection": true + } + }, + "client": "setupClient" + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatFailedEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/logging-standalone.json b/test/discovery_and_monitoring/unified/logging-standalone.json new file mode 100644 index 000000000..1ee6dbe89 --- /dev/null +++ b/test/discovery_and_monitoring/unified/logging-standalone.json @@ -0,0 +1,517 @@ +{ + "description": "standalone-logging", + "schemaVersion": "1.16", + "runOnRequirements": [ + { + "topologies": [ + "single" + ], + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient" + } + } + ], + "tests": [ + { + "description": "Topology lifecycle", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "topologyDescriptionChangedEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring", + "topologyId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring", + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed", + "topologyId": { + "$$exists": true + }, + "previousDescription": { + "$$exists": true + }, + "newDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring", + "topologyId": { + "$$exists": true + } + } + } + ] + } + ] + }, + { + "description": "Successful heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatSucceededEvent" + ] + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "serverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "reply": { + "$$matchAsDocument": { + "$$matchAsRoot": { + "ok": 1 + } + } + } + } + } + ] + } + ] + }, + { + "description": "Failing heartbeat", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "topology": "debug" + }, + "observeEvents": [ + "serverHeartbeatFailedEvent" + ], + "uriOptions": { + "appname": "failingHeartbeatLoggingTest" + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "failingHeartbeatLoggingTest", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatFailedEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "ignoreExtraMessages": true, + "ignoreMessages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped topology monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Stopped server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Topology description changed" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting server monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Starting topology monitoring" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat started" + } + }, + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat succeeded" + } + } + ], + "messages": [ + { + "level": "debug", + "component": "topology", + "data": { + "message": "Server heartbeat failed", + "awaited": { + "$$exists": true + }, + "topologyId": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "driverConnectionId": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "int", + "long" + ] + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/test/unified_format.py b/test/unified_format.py index d978ef84d..99fe0b169 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -45,6 +45,7 @@ from test.helpers import ( GCP_CREDS, KMIP_CREDS, LOCAL_MASTER_KEY, + client_knobs, ) from test.utils import ( CMAPListener, @@ -851,7 +852,7 @@ class MatchEvaluatorUtil: return False - def _match_document(self, expectation, actual, is_root): + def _match_document(self, expectation, actual, is_root, test=False): if self._evaluate_if_special_operation(expectation, actual): return @@ -861,35 +862,48 @@ class MatchEvaluatorUtil: continue self.test.assertIn(key, actual) - self.match_result(value, actual[key], in_recursive_call=True) + if not self.match_result(value, actual[key], in_recursive_call=True, test=test): + return False if not is_root: expected_keys = set(expectation.keys()) for key, value in expectation.items(): if value == {"$$exists": False}: expected_keys.remove(key) - self.test.assertEqual(expected_keys, set(actual.keys())) + if test: + self.test.assertEqual(expected_keys, set(actual.keys())) + else: + return set(expected_keys).issubset(set(actual.keys())) + return True - def match_result(self, expectation, actual, in_recursive_call=False): + def match_result(self, expectation, actual, in_recursive_call=False, test=True): if isinstance(expectation, abc.Mapping): - return self._match_document(expectation, actual, is_root=not in_recursive_call) + return self._match_document( + expectation, actual, is_root=not in_recursive_call, test=test + ) if isinstance(expectation, abc.MutableSequence): self.test.assertIsInstance(actual, abc.MutableSequence) for e, a in zip(expectation, actual): if isinstance(e, abc.Mapping): - self._match_document(e, a, is_root=not in_recursive_call) + self._match_document(e, a, is_root=not in_recursive_call, test=test) else: - self.match_result(e, a, in_recursive_call=True) + self.match_result(e, a, in_recursive_call=True, test=test) return None # account for flexible numerics in element-wise comparison if isinstance(expectation, int) or isinstance(expectation, float): - self.test.assertEqual(expectation, actual) + if test: + self.test.assertEqual(expectation, actual) + else: + return expectation == actual return None else: - self.test.assertIsInstance(actual, type(expectation)) - self.test.assertEqual(expectation, actual) + if test: + self.test.assertIsInstance(actual, type(expectation)) + self.test.assertEqual(expectation, actual) + else: + return isinstance(actual, type(expectation)) and expectation == actual return None def match_server_description(self, actual: ServerDescription, spec: dict) -> None: @@ -1891,6 +1905,20 @@ class UnifiedSpecTestMixinV1(IntegrationTest): else: assert server_connection_id is None + def process_ignore_messages(self, ignore_logs, actual_logs): + final_logs = [] + for log in actual_logs: + ignored = False + for ignore_log in ignore_logs: + if log["data"]["message"] == ignore_log["data"][ + "message" + ] and self.match_evaluator.match_result(ignore_log, log, test=False): + ignored = True + break + if not ignored: + final_logs.append(log) + return final_logs + def check_log_messages(self, operations, spec): def format_logs(log_list): client_to_log = defaultdict(list) @@ -1898,7 +1926,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): if log.module == "ocsp_support": continue data = json_util.loads(log.message) - client = data.pop("clientId") + client = data.pop("clientId") if "clientId" in data else data.pop("topologyId") client_to_log[client].append( { "level": log.levelname.lower(), @@ -1919,6 +1947,11 @@ class UnifiedSpecTestMixinV1(IntegrationTest): clientid = self.entity_map[client["client"]]._topology_settings._topology_id actual_logs = formatted_logs[clientid] actual_logs = [log for log in actual_logs if log["component"] in components] + + ignore_logs = client.get("ignoreMessages", []) + if ignore_logs: + actual_logs = self.process_ignore_messages(ignore_logs, actual_logs) + if client.get("ignoreExtraMessages", False): actual_logs = actual_logs[: len(client["messages"])] self.assertEqual(len(client["messages"]), len(actual_logs))