From 4c00227c1d350a64d1af6672cb6de26072082b81 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 12 Oct 2023 13:19:16 -0700 Subject: [PATCH] PYTHON-3668 Use polling SDAM mode on FaaS and add sdamMode=auto/stream/poll (#1360) Disable streaming SDAM by default on AWS Lambda and similar FaaS platforms. Introduce the serverMonitoringMode=stream/poll/auto URI option. Add Unified Test Format version 1.17 to add support for server heartbeat events. --- pymongo/client_options.py | 13 +- pymongo/common.py | 13 + pymongo/mongo_client.py | 6 + pymongo/monitor.py | 27 +- pymongo/monitoring.py | 38 +- pymongo/pool.py | 4 + pymongo/settings.py | 7 + .../unified/auth-error.json | 2 +- .../unified/auth-misc-command-error.json | 2 +- .../unified/auth-network-error.json | 2 +- .../unified/auth-network-timeout-error.json | 2 +- .../unified/auth-shutdown-error.json | 2 +- .../unified/cancel-server-check.json | 2 +- .../unified/connectTimeoutMS.json | 2 +- .../unified/find-network-error.json | 2 +- .../unified/find-network-timeout-error.json | 2 +- .../unified/find-shutdown-error.json | 2 +- .../unified/hello-command-error.json | 2 +- .../unified/hello-network-error.json | 2 +- .../unified/hello-timeout.json | 2 +- .../unified/insert-network-error.json | 2 +- .../unified/insert-shutdown-error.json | 2 +- .../unified/minPoolSize-error.json | 2 +- .../unified/pool-cleared-error.json | 2 +- .../rediscover-quickly-after-step-down.json | 2 +- .../unified/serverMonitoringMode.json | 449 ++++++++++++++++++ test/lambda/mongodb/app.py | 12 +- test/test_discovery_and_monitoring.py | 46 ++ test/test_monitoring.py | 4 +- test/unified_format.py | 54 ++- test/uri_options/sdam-options.json | 46 ++ test/utils.py | 1 + 32 files changed, 704 insertions(+), 52 deletions(-) create mode 100644 test/discovery_and_monitoring/unified/serverMonitoringMode.json create mode 100644 test/uri_options/sdam-options.json diff --git a/pymongo/client_options.py b/pymongo/client_options.py index d1342652a..d5f9cfccc 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -215,6 +215,9 @@ class ClientOptions: self.__auto_encryption_opts = options.get("auto_encryption_opts") self.__load_balanced = options.get("loadbalanced") self.__timeout = options.get("timeoutms") + self.__server_monitoring_mode = options.get( + "servermonitoringmode", common.SERVER_MONITORING_MODE + ) @property def _options(self) -> Mapping[str, Any]: @@ -284,7 +287,7 @@ class ClientOptions: def timeout(self) -> Optional[float]: """The configured timeoutMS converted to seconds, or None. - .. versionadded: 4.2 + .. versionadded:: 4.2 """ return self.__timeout @@ -318,3 +321,11 @@ class ClientOptions: """ assert self.__pool_options._event_listeners is not None return self.__pool_options._event_listeners.event_listeners() + + @property + def server_monitoring_mode(self) -> str: + """The configured serverMonitoringMode option. + + .. versionadded:: 4.5 + """ + return self.__server_monitoring_mode diff --git a/pymongo/common.py b/pymongo/common.py index fad24030f..794b7e31a 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -136,6 +136,9 @@ _MAX_END_SESSIONS = 10000 # Default value for srvServiceName SRV_SERVICE_NAME = "mongodb" +# Default value for serverMonitoringMode +SERVER_MONITORING_MODE = "auto" # poll/stream/auto + def partition_node(node: str) -> tuple[str, int]: """Split a host:port string into (host, int(port)) pair.""" @@ -664,6 +667,15 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo raise TypeError(f"{option} must be a str or int representing DatetimeConversion") +def validate_server_monitoring_mode(option: str, value: str) -> str: + """Validate the serverMonitoringMode option.""" + if value not in {"auto", "stream", "poll"}: + raise ValueError( + f'{option}={value!r} is invalid. Must be one of "auto", "stream", or "poll"' + ) + return value + + # Dictionary where keys are the names of public URI options, and values # are lists of aliases for that option. URI_OPTIONS_ALIAS_MAP: dict[str, list[str]] = { @@ -712,6 +724,7 @@ URI_OPTIONS_VALIDATOR_MAP: dict[str, Callable[[Any, Any], Any]] = { "srvservicename": validate_string, "srvmaxhosts": validate_non_negative_integer, "timeoutms": validate_timeoutms, + "servermonitoringmode": validate_server_monitoring_mode, } # Dictionary where keys are the names of URI options specific to pymongo, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index becddb65f..72ea671fe 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -330,6 +330,8 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): - `heartbeatFrequencyMS`: (optional) The number of milliseconds between periodic server checks, or None to accept the default frequency of 10 seconds. + - `serverMonitoringMode`: (optional) The server monitoring mode to use. + Valid values are the strings: "auto", "stream", "poll". Defaults to "auto". - `appname`: (string or None) The name of the application that created this MongoClient instance. The server will log this value upon establishing each connection. It is also recorded in the slow @@ -585,6 +587,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): .. seealso:: The MongoDB documentation on `connections `_. + .. versionchanged:: 4.5 + Added the ``serverMonitoringMode`` keyword argument. + .. versionchanged:: 4.2 Added the ``timeoutMS`` keyword argument. @@ -846,6 +851,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): load_balanced=options.load_balanced, srv_service_name=srv_service_name, srv_max_hosts=srv_max_hosts, + server_monitoring_mode=options.server_monitoring_mode, ) self._init_background() diff --git a/pymongo/monitor.py b/pymongo/monitor.py index d52e1e4c2..92b12f731 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -27,6 +27,7 @@ from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelle from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.periodic_executor import _shutdown_executors +from pymongo.pool import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription from pymongo.srv_resolver import _SrvResolver @@ -138,7 +139,12 @@ class Monitor(MonitorBase): topology_settings, topology._create_pool_for_monitor(server_description.address), ) - self.heartbeater = None + if topology_settings.server_monitoring_mode == "stream": + self._stream = True + elif topology_settings.server_monitoring_mode == "poll": + self._stream = False + else: + self._stream = not _is_faas() def cancel_check(self) -> None: """Cancel any concurrent hello check. @@ -200,7 +206,7 @@ class Monitor(MonitorBase): self._server_description, reset_pool=self._server_description.error ) - if ( + if self._stream and ( self._server_description.is_server_type_known and self._server_description.topology_version ): @@ -237,7 +243,7 @@ class Monitor(MonitorBase): address = sd.address duration = time.monotonic() - start if self._publish: - awaited = bool(sd.is_server_type_known and sd.topology_version) + awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) assert self._listeners is not None self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) self._reset_connection() @@ -255,7 +261,16 @@ class Monitor(MonitorBase): address = self._server_description.address if self._publish: assert self._listeners is not None - self._listeners.publish_server_heartbeat_started(address) + sd = self._server_description + # XXX: "awaited" could be incorrectly set to True in the rare case + # the pool checkout closes and recreates a connection. + awaited = bool( + self._pool.conns + and self._stream + and sd.is_server_type_known + and sd.topology_version + ) + self._listeners.publish_server_heartbeat_started(address, awaited) if self._cancel_context and self._cancel_context.cancelled: self._reset_connection() @@ -284,7 +299,9 @@ class Monitor(MonitorBase): if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). response = Hello(conn._next_reply(), awaitable=True) - elif conn.performed_handshake and self._server_description.topology_version: + elif ( + self._stream and conn.performed_handshake and self._server_description.topology_version + ): # Initiate streaming hello (MongoDB 4.4+). response = conn._hello( cluster_time, diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 5bc3fda49..d8f370a12 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -1292,10 +1292,11 @@ class TopologyClosedEvent(TopologyEvent): class _ServerHeartbeatEvent: """Base class for server heartbeat events.""" - __slots__ = "__connection_id" + __slots__ = ("__connection_id", "__awaited") - def __init__(self, connection_id: _Address) -> None: + def __init__(self, connection_id: _Address, awaited: bool = False) -> None: self.__connection_id = connection_id + self.__awaited = awaited @property def connection_id(self) -> _Address: @@ -1304,8 +1305,16 @@ class _ServerHeartbeatEvent: """ return self.__connection_id + @property + def awaited(self) -> bool: + """Whether the heartbeat was issued as an awaitable hello command. + + .. versionadded:: 4.6 + """ + return self.__awaited + def __repr__(self) -> str: - return f"<{self.__class__.__name__} {self.connection_id}>" + return f"<{self.__class__.__name__} {self.connection_id} awaited: {self.awaited}>" class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent): @@ -1323,15 +1332,14 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent): .. versionadded:: 3.3 """ - __slots__ = ("__duration", "__reply", "__awaited") + __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Hello, connection_id: _Address, awaited: bool = False ) -> None: - super().__init__(connection_id) + super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply - self.__awaited = awaited @property def duration(self) -> float: @@ -1350,8 +1358,10 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent): If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. + + .. versionadded:: 3.11 """ - return self.__awaited + return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {}>".format( @@ -1370,15 +1380,14 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent): .. versionadded:: 3.3 """ - __slots__ = ("__duration", "__reply", "__awaited") + __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Exception, connection_id: _Address, awaited: bool = False ) -> None: - super().__init__(connection_id) + super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply - self.__awaited = awaited @property def duration(self) -> float: @@ -1397,8 +1406,10 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent): If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. + + .. versionadded:: 3.11 """ - return self.__awaited + return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {!r}>".format( @@ -1602,14 +1613,15 @@ class _EventListeners: except Exception: _handle_exception() - def publish_server_heartbeat_started(self, connection_id: _Address) -> None: + def publish_server_heartbeat_started(self, connection_id: _Address, awaited: bool) -> None: """Publish a ServerHeartbeatStartedEvent to all server heartbeat listeners. :Parameters: - `connection_id`: The address (host, port) pair of the connection. + - `awaited`: True if this heartbeat is part of an awaitable hello command. """ - event = ServerHeartbeatStartedEvent(connection_id) + event = ServerHeartbeatStartedEvent(connection_id, awaited) for subscriber in self.__server_heartbeat_listeners: try: subscriber.started(event) diff --git a/pymongo/pool.py b/pymongo/pool.py index 600a42853..afe3a4313 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -290,6 +290,10 @@ def _is_vercel() -> bool: return bool(os.getenv("VERCEL")) +def _is_faas() -> bool: + return _is_lambda() or _is_azure_func() or _is_gcp_func() or _is_vercel() + + def _getenv_int(key: str) -> Optional[int]: """Like os.getenv but returns an int, or None if the value is missing/malformed.""" val = os.getenv(key) diff --git a/pymongo/settings.py b/pymongo/settings.py index a4be2295d..4a3e7be4c 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -46,6 +46,7 @@ class TopologySettings: load_balanced: Optional[bool] = None, srv_service_name: str = common.SRV_SERVICE_NAME, srv_max_hosts: int = 0, + server_monitoring_mode: str = common.SERVER_MONITORING_MODE, ): """Represent MongoClient's configuration. @@ -72,6 +73,7 @@ class TopologySettings: self._load_balanced = load_balanced self._srv_service_name = srv_service_name self._srv_max_hosts = srv_max_hosts or 0 + self._server_monitoring_mode = server_monitoring_mode self._topology_id = ObjectId() # Store the allocation traceback to catch unclosed clients in the @@ -146,6 +148,11 @@ class TopologySettings: """The srvMaxHosts.""" return self._srv_max_hosts + @property + def server_monitoring_mode(self) -> str: + """The serverMonitoringMode.""" + return self._server_monitoring_mode + def get_topology_type(self) -> int: if self.load_balanced: return TOPOLOGY_TYPE.LoadBalanced diff --git a/test/discovery_and_monitoring/unified/auth-error.json b/test/discovery_and_monitoring/unified/auth-error.json index 5c78ecfe5..62d26494c 100644 --- a/test/discovery_and_monitoring/unified/auth-error.json +++ b/test/discovery_and_monitoring/unified/auth-error.json @@ -1,6 +1,6 @@ { "description": "auth-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-misc-command-error.json b/test/discovery_and_monitoring/unified/auth-misc-command-error.json index 6e1b64546..fd62fe604 100644 --- a/test/discovery_and_monitoring/unified/auth-misc-command-error.json +++ b/test/discovery_and_monitoring/unified/auth-misc-command-error.json @@ -1,6 +1,6 @@ { "description": "auth-misc-command-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 7606d2db7..84763af32 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -1,6 +1,6 @@ { "description": "auth-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json index 22066e8ba..3cf9576eb 100644 --- a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json @@ -1,6 +1,6 @@ { "description": "auth-network-timeout-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-shutdown-error.json b/test/discovery_and_monitoring/unified/auth-shutdown-error.json index 5dd7b5bb6..b9e503af6 100644 --- a/test/discovery_and_monitoring/unified/auth-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/auth-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "auth-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/cancel-server-check.json b/test/discovery_and_monitoring/unified/cancel-server-check.json index 896cc8d08..a60ccfcb4 100644 --- a/test/discovery_and_monitoring/unified/cancel-server-check.json +++ b/test/discovery_and_monitoring/unified/cancel-server-check.json @@ -1,6 +1,6 @@ { "description": "cancel-server-check", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.0", diff --git a/test/discovery_and_monitoring/unified/connectTimeoutMS.json b/test/discovery_and_monitoring/unified/connectTimeoutMS.json index 67a4d9da1..d3e860a9c 100644 --- a/test/discovery_and_monitoring/unified/connectTimeoutMS.json +++ b/test/discovery_and_monitoring/unified/connectTimeoutMS.json @@ -1,6 +1,6 @@ { "description": "connectTimeoutMS", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-network-error.json b/test/discovery_and_monitoring/unified/find-network-error.json index 651466bfa..c1b6db40c 100644 --- a/test/discovery_and_monitoring/unified/find-network-error.json +++ b/test/discovery_and_monitoring/unified/find-network-error.json @@ -1,6 +1,6 @@ { "description": "find-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-network-timeout-error.json b/test/discovery_and_monitoring/unified/find-network-timeout-error.json index 2bde6daa5..e5ac9f21a 100644 --- a/test/discovery_and_monitoring/unified/find-network-timeout-error.json +++ b/test/discovery_and_monitoring/unified/find-network-timeout-error.json @@ -1,6 +1,6 @@ { "description": "find-network-timeout-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-shutdown-error.json b/test/discovery_and_monitoring/unified/find-shutdown-error.json index 624ad352f..6e5a2cac0 100644 --- a/test/discovery_and_monitoring/unified/find-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/find-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "find-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/hello-command-error.json b/test/discovery_and_monitoring/unified/hello-command-error.json index 7d6046b76..9afea87e7 100644 --- a/test/discovery_and_monitoring/unified/hello-command-error.json +++ b/test/discovery_and_monitoring/unified/hello-command-error.json @@ -1,6 +1,6 @@ { "description": "hello-command-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/hello-network-error.json b/test/discovery_and_monitoring/unified/hello-network-error.json index f44b26a9f..55373c90c 100644 --- a/test/discovery_and_monitoring/unified/hello-network-error.json +++ b/test/discovery_and_monitoring/unified/hello-network-error.json @@ -1,6 +1,6 @@ { "description": "hello-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/hello-timeout.json b/test/discovery_and_monitoring/unified/hello-timeout.json index dfa6b48d6..fe7cf4e78 100644 --- a/test/discovery_and_monitoring/unified/hello-timeout.json +++ b/test/discovery_and_monitoring/unified/hello-timeout.json @@ -1,6 +1,6 @@ { "description": "hello-timeout", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/insert-network-error.json b/test/discovery_and_monitoring/unified/insert-network-error.json index e4ba6684a..bfe41a4cb 100644 --- a/test/discovery_and_monitoring/unified/insert-network-error.json +++ b/test/discovery_and_monitoring/unified/insert-network-error.json @@ -1,6 +1,6 @@ { "description": "insert-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/insert-shutdown-error.json b/test/discovery_and_monitoring/unified/insert-shutdown-error.json index 3c724fa5e..af7c6c987 100644 --- a/test/discovery_and_monitoring/unified/insert-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/insert-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "insert-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/minPoolSize-error.json b/test/discovery_and_monitoring/unified/minPoolSize-error.json index 0234ac992..7e294baf6 100644 --- a/test/discovery_and_monitoring/unified/minPoolSize-error.json +++ b/test/discovery_and_monitoring/unified/minPoolSize-error.json @@ -1,6 +1,6 @@ { "description": "minPoolSize-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/pool-cleared-error.json b/test/discovery_and_monitoring/unified/pool-cleared-error.json index 9a7dfd901..b7f6924f2 100644 --- a/test/discovery_and_monitoring/unified/pool-cleared-error.json +++ b/test/discovery_and_monitoring/unified/pool-cleared-error.json @@ -1,6 +1,6 @@ { "description": "pool-cleared-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json b/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json index c7c249485..3147a07a1 100644 --- a/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json +++ b/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json @@ -1,6 +1,6 @@ { "description": "rediscover-quickly-after-step-down", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/serverMonitoringMode.json b/test/discovery_and_monitoring/unified/serverMonitoringMode.json new file mode 100644 index 000000000..7d681b4f9 --- /dev/null +++ b/test/discovery_and_monitoring/unified/serverMonitoringMode.json @@ -0,0 +1,449 @@ +{ + "description": "serverMonitoringMode", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "topologies": [ + "single", + "sharded", + "sharded-replicaset" + ], + "serverless": "forbid" + } + ], + "tests": [ + { + "description": "connect with serverMonitoringMode=auto >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=auto <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=poll", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "poll", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + } + ] +} diff --git a/test/lambda/mongodb/app.py b/test/lambda/mongodb/app.py index 66c216467..d56fbec3a 100644 --- a/test/lambda/mongodb/app.py +++ b/test/lambda/mongodb/app.py @@ -18,6 +18,7 @@ from pymongo.monitoring import ( open_connections = 0 heartbeat_count = 0 +streaming_heartbeat_count = 0 total_heartbeat_duration = 0 total_commands = 0 total_command_duration = 0 @@ -49,9 +50,11 @@ class ServerHeartbeatHandler(ServerHeartbeatListener): print("server heartbeat started", event) def succeeded(self, event): - global heartbeat_count, total_heartbeat_duration + global heartbeat_count, total_heartbeat_duration, streaming_heartbeat_count heartbeat_count += 1 total_heartbeat_duration += event.duration + if event.awaited: + streaming_heartbeat_count += 1 print("server heartbeat succeeded", event) def failed(self, event): @@ -115,7 +118,9 @@ print("Connected") def create_response(): return dict( averageCommandDuration=total_command_duration / total_commands, - averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count, + averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count + if heartbeat_count + else 0, openConnections=open_connections, heartbeatCount=heartbeat_count, ) @@ -145,5 +150,8 @@ def lambda_handler(event, context): response = json.dumps(create_response()) reset() print("finished!") + assert ( + streaming_heartbeat_count == 0 + ), f"streaming_heartbeat_count was {streaming_heartbeat_count} not 0" return dict(statusCode=200, body=response) diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index accd1b703..8946f256a 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -34,6 +34,7 @@ from test.utils import ( single_client, wait_until, ) +from unittest.mock import patch from bson import Timestamp, json_util from pymongo import common, monitoring @@ -341,6 +342,51 @@ class TestPoolManagement(IntegrationTest): listener.wait_for_event(monitoring.PoolReadyEvent, 1) +class TestSdamMode(IntegrationTest): + @client_context.require_no_serverless + @client_context.require_no_load_balancer + def setUp(self): + super().setUp() + + def test_rtt_connection_is_enabled_stream(self): + client = rs_or_single_client(serverMonitoringMode="stream") + self.addCleanup(client.close) + client.admin.command("ping") + for _, server in client._topology._servers.items(): + monitor = server._monitor + self.assertTrue(monitor._stream) + if client_context.version >= (4, 4): + self.assertIsNotNone(monitor._rtt_monitor._executor._thread) + else: + self.assertIsNone(monitor._rtt_monitor._executor._thread) + + def test_rtt_connection_is_disabled_poll(self): + client = rs_or_single_client(serverMonitoringMode="poll") + self.addCleanup(client.close) + self.assert_rtt_connection_is_disabled(client) + + def test_rtt_connection_is_disabled_auto(self): + envs = [ + {"AWS_EXECUTION_ENV": "AWS_Lambda_python3.9"}, + {"FUNCTIONS_WORKER_RUNTIME": "python"}, + {"K_SERVICE": "gcpservicename"}, + {"FUNCTION_NAME": "gcpfunctionname"}, + {"VERCEL": "1"}, + ] + for env in envs: + with patch.dict("os.environ", env): + client = rs_or_single_client(serverMonitoringMode="auto") + self.addCleanup(client.close) + self.assert_rtt_connection_is_disabled(client) + + def assert_rtt_connection_is_disabled(self, client): + client.admin.command("ping") + for _, server in client._topology._servers.items(): + monitor = server._monitor + self.assertFalse(monitor._stream) + self.assertIsNone(monitor._rtt_monitor._executor._thread) + + # Generate unified tests. globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__)) diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 8ccc844d3..e135a52e7 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -1185,7 +1185,9 @@ class TestEventClasses(unittest.TestCase): def test_server_heartbeat_event_repr(self): connection_id = ("localhost", 27017) event = monitoring.ServerHeartbeatStartedEvent(connection_id) - self.assertEqual(repr(event), "") + self.assertEqual( + repr(event), "" + ) delta = 0.1 event = monitoring.ServerHeartbeatSucceededEvent( delta, {"ok": 1}, connection_id # type: ignore[arg-type] diff --git a/test/unified_format.py b/test/unified_format.py index 80c6f0334..a6676c601 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -100,6 +100,10 @@ from pymongo.monitoring import ( PoolReadyEvent, ServerClosedEvent, ServerDescriptionChangedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatListener, + ServerHeartbeatStartedEvent, + ServerHeartbeatSucceededEvent, ServerListener, ServerOpeningEvent, TopologyEvent, @@ -107,6 +111,7 @@ from pymongo.monitoring import ( _ConnectionEvent, _PoolEvent, _ServerEvent, + _ServerHeartbeatEvent, ) from pymongo.operations import SearchIndexModel from pymongo.read_concern import ReadConcern @@ -288,7 +293,7 @@ class NonLazyCursor: self.client = None -class EventListenerUtil(CMAPListener, CommandListener, ServerListener): +class EventListenerUtil(CMAPListener, CommandListener, ServerListener, ServerHeartbeatListener): def __init__( self, observe_events, ignore_commands, observe_sensitive_commands, store_events, entity_map ): @@ -319,7 +324,11 @@ class EventListenerUtil(CMAPListener, CommandListener, ServerListener): return [e for e in self.events if isinstance(e, _CommandEvent)] if event_type == "cmap": return [e for e in self.events if isinstance(e, (_ConnectionEvent, _PoolEvent))] - return [e for e in self.events if isinstance(e, (_ServerEvent, TopologyEvent))] + return [ + e + for e in self.events + if isinstance(e, (_ServerEvent, TopologyEvent, _ServerHeartbeatEvent)) + ] def add_event(self, event): event_name = type(event).__name__.lower() @@ -339,23 +348,32 @@ class EventListenerUtil(CMAPListener, CommandListener, ServerListener): self.add_event(event) def started(self, event): - if event.command == {}: - # Command is redacted. Observe only if flag is set. - if self._observe_sensitive_commands: + if isinstance(event, CommandStartedEvent): + if event.command == {}: + # Command is redacted. Observe only if flag is set. + if self._observe_sensitive_commands: + self._command_event(event) + else: self._command_event(event) else: - self._command_event(event) + self.add_event(event) def succeeded(self, event): - if event.reply == {}: - # Command is redacted. Observe only if flag is set. - if self._observe_sensitive_commands: + if isinstance(event, CommandSucceededEvent): + if event.reply == {}: + # Command is redacted. Observe only if flag is set. + if self._observe_sensitive_commands: + self._command_event(event) + else: self._command_event(event) else: - self._command_event(event) + self.add_event(event) def failed(self, event): - self._command_event(event) + if isinstance(event, CommandFailedEvent): + self._command_event(event) + else: + self.add_event(event) def opened(self, event: ServerOpeningEvent) -> None: self.add_event(event) @@ -833,6 +851,18 @@ class MatchEvaluatorUtil: ) if "newDescription" in spec: self.match_server_description(actual.new_description, spec["newDescription"]) + elif name == "serverHeartbeatStartedEvent": + self.test.assertIsInstance(actual, ServerHeartbeatStartedEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) + elif name == "serverHeartbeatSucceededEvent": + self.test.assertIsInstance(actual, ServerHeartbeatSucceededEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) + elif name == "serverHeartbeatFailedEvent": + self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) else: raise Exception(f"Unsupported event type {name}") @@ -868,7 +898,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): a class attribute ``TEST_SPEC``. """ - SCHEMA_VERSION = Version.from_string("1.15") + SCHEMA_VERSION = Version.from_string("1.17") RUN_ON_LOAD_BALANCER = True RUN_ON_SERVERLESS = True TEST_SPEC: Any diff --git a/test/uri_options/sdam-options.json b/test/uri_options/sdam-options.json new file mode 100644 index 000000000..673f5607e --- /dev/null +++ b/test/uri_options/sdam-options.json @@ -0,0 +1,46 @@ +{ + "tests": [ + { + "description": "serverMonitoringMode=auto", + "uri": "mongodb://example.com/?serverMonitoringMode=auto", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "auto" + } + }, + { + "description": "serverMonitoringMode=stream", + "uri": "mongodb://example.com/?serverMonitoringMode=stream", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "stream" + } + }, + { + "description": "serverMonitoringMode=poll", + "uri": "mongodb://example.com/?serverMonitoringMode=poll", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "poll" + } + }, + { + "description": "invalid serverMonitoringMode", + "uri": "mongodb://example.com/?serverMonitoringMode=invalid", + "valid": true, + "warning": true, + "hosts": null, + "auth": null, + "options": {} + } + ] +} diff --git a/test/utils.py b/test/utils.py index 776aba823..51a7903c4 100644 --- a/test/utils.py +++ b/test/utils.py @@ -300,6 +300,7 @@ class MockPool: self._lock = _create_lock() self.opts = options self.operation_count = 0 + self.conns = [] def stale_generation(self, gen, service_id): return self.gen.stale(gen, service_id)