PYTHON-3668 Use polling SDAM mode on FaaS and add sdamMode=auto/stream/poll (#1360)

Disable streaming SDAM by default on AWS Lambda and similar FaaS platforms.
Introduce the serverMonitoringMode=stream/poll/auto URI option.
Add Unified Test Format version 1.17 to add support for server heartbeat events.
This commit is contained in:
Shane Harvey 2023-10-12 13:19:16 -07:00 committed by GitHub
parent c0f463f6d3
commit 4c00227c1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 704 additions and 52 deletions

View File

@ -215,6 +215,9 @@ class ClientOptions:
self.__auto_encryption_opts = options.get("auto_encryption_opts")
self.__load_balanced = options.get("loadbalanced")
self.__timeout = options.get("timeoutms")
self.__server_monitoring_mode = options.get(
"servermonitoringmode", common.SERVER_MONITORING_MODE
)
@property
def _options(self) -> Mapping[str, Any]:
@ -284,7 +287,7 @@ class ClientOptions:
def timeout(self) -> Optional[float]:
"""The configured timeoutMS converted to seconds, or None.
.. versionadded: 4.2
.. versionadded:: 4.2
"""
return self.__timeout
@ -318,3 +321,11 @@ class ClientOptions:
"""
assert self.__pool_options._event_listeners is not None
return self.__pool_options._event_listeners.event_listeners()
@property
def server_monitoring_mode(self) -> str:
"""The configured serverMonitoringMode option.
.. versionadded:: 4.5
"""
return self.__server_monitoring_mode

View File

@ -136,6 +136,9 @@ _MAX_END_SESSIONS = 10000
# Default value for srvServiceName
SRV_SERVICE_NAME = "mongodb"
# Default value for serverMonitoringMode
SERVER_MONITORING_MODE = "auto" # poll/stream/auto
def partition_node(node: str) -> tuple[str, int]:
"""Split a host:port string into (host, int(port)) pair."""
@ -664,6 +667,15 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo
raise TypeError(f"{option} must be a str or int representing DatetimeConversion")
def validate_server_monitoring_mode(option: str, value: str) -> str:
"""Validate the serverMonitoringMode option."""
if value not in {"auto", "stream", "poll"}:
raise ValueError(
f'{option}={value!r} is invalid. Must be one of "auto", "stream", or "poll"'
)
return value
# Dictionary where keys are the names of public URI options, and values
# are lists of aliases for that option.
URI_OPTIONS_ALIAS_MAP: dict[str, list[str]] = {
@ -712,6 +724,7 @@ URI_OPTIONS_VALIDATOR_MAP: dict[str, Callable[[Any, Any], Any]] = {
"srvservicename": validate_string,
"srvmaxhosts": validate_non_negative_integer,
"timeoutms": validate_timeoutms,
"servermonitoringmode": validate_server_monitoring_mode,
}
# Dictionary where keys are the names of URI options specific to pymongo,

View File

@ -330,6 +330,8 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
- `heartbeatFrequencyMS`: (optional) The number of milliseconds
between periodic server checks, or None to accept the default
frequency of 10 seconds.
- `serverMonitoringMode`: (optional) The server monitoring mode to use.
Valid values are the strings: "auto", "stream", "poll". Defaults to "auto".
- `appname`: (string or None) The name of the application that
created this MongoClient instance. The server will log this value
upon establishing each connection. It is also recorded in the slow
@ -585,6 +587,9 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.
.. versionchanged:: 4.2
Added the ``timeoutMS`` keyword argument.
@ -846,6 +851,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
load_balanced=options.load_balanced,
srv_service_name=srv_service_name,
srv_max_hosts=srv_max_hosts,
server_monitoring_mode=options.server_monitoring_mode,
)
self._init_background()

View File

@ -27,6 +27,7 @@ from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelle
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.periodic_executor import _shutdown_executors
from pymongo.pool import _is_faas
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.srv_resolver import _SrvResolver
@ -138,7 +139,12 @@ class Monitor(MonitorBase):
topology_settings,
topology._create_pool_for_monitor(server_description.address),
)
self.heartbeater = None
if topology_settings.server_monitoring_mode == "stream":
self._stream = True
elif topology_settings.server_monitoring_mode == "poll":
self._stream = False
else:
self._stream = not _is_faas()
def cancel_check(self) -> None:
"""Cancel any concurrent hello check.
@ -200,7 +206,7 @@ class Monitor(MonitorBase):
self._server_description, reset_pool=self._server_description.error
)
if (
if self._stream and (
self._server_description.is_server_type_known
and self._server_description.topology_version
):
@ -237,7 +243,7 @@ class Monitor(MonitorBase):
address = sd.address
duration = time.monotonic() - start
if self._publish:
awaited = bool(sd.is_server_type_known and sd.topology_version)
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
assert self._listeners is not None
self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited)
self._reset_connection()
@ -255,7 +261,16 @@ class Monitor(MonitorBase):
address = self._server_description.address
if self._publish:
assert self._listeners is not None
self._listeners.publish_server_heartbeat_started(address)
sd = self._server_description
# XXX: "awaited" could be incorrectly set to True in the rare case
# the pool checkout closes and recreates a connection.
awaited = bool(
self._pool.conns
and self._stream
and sd.is_server_type_known
and sd.topology_version
)
self._listeners.publish_server_heartbeat_started(address, awaited)
if self._cancel_context and self._cancel_context.cancelled:
self._reset_connection()
@ -284,7 +299,9 @@ class Monitor(MonitorBase):
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
response = Hello(conn._next_reply(), awaitable=True)
elif conn.performed_handshake and self._server_description.topology_version:
elif (
self._stream and conn.performed_handshake and self._server_description.topology_version
):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,

View File

@ -1292,10 +1292,11 @@ class TopologyClosedEvent(TopologyEvent):
class _ServerHeartbeatEvent:
"""Base class for server heartbeat events."""
__slots__ = "__connection_id"
__slots__ = ("__connection_id", "__awaited")
def __init__(self, connection_id: _Address) -> None:
def __init__(self, connection_id: _Address, awaited: bool = False) -> None:
self.__connection_id = connection_id
self.__awaited = awaited
@property
def connection_id(self) -> _Address:
@ -1304,8 +1305,16 @@ class _ServerHeartbeatEvent:
"""
return self.__connection_id
@property
def awaited(self) -> bool:
"""Whether the heartbeat was issued as an awaitable hello command.
.. versionadded:: 4.6
"""
return self.__awaited
def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.connection_id}>"
return f"<{self.__class__.__name__} {self.connection_id} awaited: {self.awaited}>"
class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent):
@ -1323,15 +1332,14 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
.. versionadded:: 3.3
"""
__slots__ = ("__duration", "__reply", "__awaited")
__slots__ = ("__duration", "__reply")
def __init__(
self, duration: float, reply: Hello, connection_id: _Address, awaited: bool = False
) -> None:
super().__init__(connection_id)
super().__init__(connection_id, awaited)
self.__duration = duration
self.__reply = reply
self.__awaited = awaited
@property
def duration(self) -> float:
@ -1350,8 +1358,10 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
If true, then :meth:`duration` reflects the sum of the round trip time
to the server and the time that the server waited before sending a
response.
.. versionadded:: 3.11
"""
return self.__awaited
return super().awaited
def __repr__(self) -> str:
return "<{} {} duration: {}, awaited: {}, reply: {}>".format(
@ -1370,15 +1380,14 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
.. versionadded:: 3.3
"""
__slots__ = ("__duration", "__reply", "__awaited")
__slots__ = ("__duration", "__reply")
def __init__(
self, duration: float, reply: Exception, connection_id: _Address, awaited: bool = False
) -> None:
super().__init__(connection_id)
super().__init__(connection_id, awaited)
self.__duration = duration
self.__reply = reply
self.__awaited = awaited
@property
def duration(self) -> float:
@ -1397,8 +1406,10 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
If true, then :meth:`duration` reflects the sum of the round trip time
to the server and the time that the server waited before sending a
response.
.. versionadded:: 3.11
"""
return self.__awaited
return super().awaited
def __repr__(self) -> str:
return "<{} {} duration: {}, awaited: {}, reply: {!r}>".format(
@ -1602,14 +1613,15 @@ class _EventListeners:
except Exception:
_handle_exception()
def publish_server_heartbeat_started(self, connection_id: _Address) -> None:
def publish_server_heartbeat_started(self, connection_id: _Address, awaited: bool) -> None:
"""Publish a ServerHeartbeatStartedEvent to all server heartbeat
listeners.
:Parameters:
- `connection_id`: The address (host, port) pair of the connection.
- `awaited`: True if this heartbeat is part of an awaitable hello command.
"""
event = ServerHeartbeatStartedEvent(connection_id)
event = ServerHeartbeatStartedEvent(connection_id, awaited)
for subscriber in self.__server_heartbeat_listeners:
try:
subscriber.started(event)

View File

@ -290,6 +290,10 @@ def _is_vercel() -> bool:
return bool(os.getenv("VERCEL"))
def _is_faas() -> bool:
return _is_lambda() or _is_azure_func() or _is_gcp_func() or _is_vercel()
def _getenv_int(key: str) -> Optional[int]:
"""Like os.getenv but returns an int, or None if the value is missing/malformed."""
val = os.getenv(key)

View File

@ -46,6 +46,7 @@ class TopologySettings:
load_balanced: Optional[bool] = None,
srv_service_name: str = common.SRV_SERVICE_NAME,
srv_max_hosts: int = 0,
server_monitoring_mode: str = common.SERVER_MONITORING_MODE,
):
"""Represent MongoClient's configuration.
@ -72,6 +73,7 @@ class TopologySettings:
self._load_balanced = load_balanced
self._srv_service_name = srv_service_name
self._srv_max_hosts = srv_max_hosts or 0
self._server_monitoring_mode = server_monitoring_mode
self._topology_id = ObjectId()
# Store the allocation traceback to catch unclosed clients in the
@ -146,6 +148,11 @@ class TopologySettings:
"""The srvMaxHosts."""
return self._srv_max_hosts
@property
def server_monitoring_mode(self) -> str:
"""The serverMonitoringMode."""
return self._server_monitoring_mode
def get_topology_type(self) -> int:
if self.load_balanced:
return TOPOLOGY_TYPE.LoadBalanced

View File

@ -1,6 +1,6 @@
{
"description": "auth-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "auth-misc-command-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "auth-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "auth-network-timeout-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "auth-shutdown-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "cancel-server-check",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.0",

View File

@ -1,6 +1,6 @@
{
"description": "connectTimeoutMS",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "find-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "find-network-timeout-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "find-shutdown-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "hello-command-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.9",

View File

@ -1,6 +1,6 @@
{
"description": "hello-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.9",

View File

@ -1,6 +1,6 @@
{
"description": "hello-timeout",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "insert-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "insert-shutdown-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -1,6 +1,6 @@
{
"description": "minPoolSize-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.9",

View File

@ -1,6 +1,6 @@
{
"description": "pool-cleared-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.9",

View File

@ -1,6 +1,6 @@
{
"description": "rediscover-quickly-after-step-down",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",

View File

@ -0,0 +1,449 @@
{
"description": "serverMonitoringMode",
"schemaVersion": "1.17",
"runOnRequirements": [
{
"topologies": [
"single",
"sharded",
"sharded-replicaset"
],
"serverless": "forbid"
}
],
"tests": [
{
"description": "connect with serverMonitoringMode=auto >=4.4",
"runOnRequirements": [
{
"minServerVersion": "4.4.0"
}
],
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"serverMonitoringMode": "auto"
},
"useMultipleMongoses": false,
"observeEvents": [
"serverHeartbeatStartedEvent",
"serverHeartbeatSucceededEvent",
"serverHeartbeatFailedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "sdam-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverHeartbeatStartedEvent": {}
},
"count": 2
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": true,
"events": [
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
},
{
"serverHeartbeatSucceededEvent": {
"awaited": false
}
},
{
"serverHeartbeatStartedEvent": {
"awaited": true
}
}
]
}
]
},
{
"description": "connect with serverMonitoringMode=auto <4.4",
"runOnRequirements": [
{
"maxServerVersion": "4.2.99"
}
],
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"serverMonitoringMode": "auto",
"heartbeatFrequencyMS": 500
},
"useMultipleMongoses": false,
"observeEvents": [
"serverHeartbeatStartedEvent",
"serverHeartbeatSucceededEvent",
"serverHeartbeatFailedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "sdam-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverHeartbeatStartedEvent": {}
},
"count": 2
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": true,
"events": [
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
},
{
"serverHeartbeatSucceededEvent": {
"awaited": false
}
},
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
}
]
}
]
},
{
"description": "connect with serverMonitoringMode=stream >=4.4",
"runOnRequirements": [
{
"minServerVersion": "4.4.0"
}
],
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"serverMonitoringMode": "stream"
},
"useMultipleMongoses": false,
"observeEvents": [
"serverHeartbeatStartedEvent",
"serverHeartbeatSucceededEvent",
"serverHeartbeatFailedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "sdam-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverHeartbeatStartedEvent": {}
},
"count": 2
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": true,
"events": [
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
},
{
"serverHeartbeatSucceededEvent": {
"awaited": false
}
},
{
"serverHeartbeatStartedEvent": {
"awaited": true
}
}
]
}
]
},
{
"description": "connect with serverMonitoringMode=stream <4.4",
"runOnRequirements": [
{
"maxServerVersion": "4.2.99"
}
],
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"serverMonitoringMode": "stream",
"heartbeatFrequencyMS": 500
},
"useMultipleMongoses": false,
"observeEvents": [
"serverHeartbeatStartedEvent",
"serverHeartbeatSucceededEvent",
"serverHeartbeatFailedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "sdam-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverHeartbeatStartedEvent": {}
},
"count": 2
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": true,
"events": [
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
},
{
"serverHeartbeatSucceededEvent": {
"awaited": false
}
},
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
}
]
}
]
},
{
"description": "connect with serverMonitoringMode=poll",
"operations": [
{
"name": "createEntities",
"object": "testRunner",
"arguments": {
"entities": [
{
"client": {
"id": "client",
"uriOptions": {
"serverMonitoringMode": "poll",
"heartbeatFrequencyMS": 500
},
"useMultipleMongoses": false,
"observeEvents": [
"serverHeartbeatStartedEvent",
"serverHeartbeatSucceededEvent",
"serverHeartbeatFailedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "sdam-tests"
}
}
]
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"client": "client",
"event": {
"serverHeartbeatStartedEvent": {}
},
"count": 2
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "sdam",
"ignoreExtraEvents": true,
"events": [
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
},
{
"serverHeartbeatSucceededEvent": {
"awaited": false
}
},
{
"serverHeartbeatStartedEvent": {
"awaited": false
}
}
]
}
]
}
]
}

View File

@ -18,6 +18,7 @@ from pymongo.monitoring import (
open_connections = 0
heartbeat_count = 0
streaming_heartbeat_count = 0
total_heartbeat_duration = 0
total_commands = 0
total_command_duration = 0
@ -49,9 +50,11 @@ class ServerHeartbeatHandler(ServerHeartbeatListener):
print("server heartbeat started", event)
def succeeded(self, event):
global heartbeat_count, total_heartbeat_duration
global heartbeat_count, total_heartbeat_duration, streaming_heartbeat_count
heartbeat_count += 1
total_heartbeat_duration += event.duration
if event.awaited:
streaming_heartbeat_count += 1
print("server heartbeat succeeded", event)
def failed(self, event):
@ -115,7 +118,9 @@ print("Connected")
def create_response():
return dict(
averageCommandDuration=total_command_duration / total_commands,
averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count,
averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count
if heartbeat_count
else 0,
openConnections=open_connections,
heartbeatCount=heartbeat_count,
)
@ -145,5 +150,8 @@ def lambda_handler(event, context):
response = json.dumps(create_response())
reset()
print("finished!")
assert (
streaming_heartbeat_count == 0
), f"streaming_heartbeat_count was {streaming_heartbeat_count} not 0"
return dict(statusCode=200, body=response)

View File

@ -34,6 +34,7 @@ from test.utils import (
single_client,
wait_until,
)
from unittest.mock import patch
from bson import Timestamp, json_util
from pymongo import common, monitoring
@ -341,6 +342,51 @@ class TestPoolManagement(IntegrationTest):
listener.wait_for_event(monitoring.PoolReadyEvent, 1)
class TestSdamMode(IntegrationTest):
@client_context.require_no_serverless
@client_context.require_no_load_balancer
def setUp(self):
super().setUp()
def test_rtt_connection_is_enabled_stream(self):
client = rs_or_single_client(serverMonitoringMode="stream")
self.addCleanup(client.close)
client.admin.command("ping")
for _, server in client._topology._servers.items():
monitor = server._monitor
self.assertTrue(monitor._stream)
if client_context.version >= (4, 4):
self.assertIsNotNone(monitor._rtt_monitor._executor._thread)
else:
self.assertIsNone(monitor._rtt_monitor._executor._thread)
def test_rtt_connection_is_disabled_poll(self):
client = rs_or_single_client(serverMonitoringMode="poll")
self.addCleanup(client.close)
self.assert_rtt_connection_is_disabled(client)
def test_rtt_connection_is_disabled_auto(self):
envs = [
{"AWS_EXECUTION_ENV": "AWS_Lambda_python3.9"},
{"FUNCTIONS_WORKER_RUNTIME": "python"},
{"K_SERVICE": "gcpservicename"},
{"FUNCTION_NAME": "gcpfunctionname"},
{"VERCEL": "1"},
]
for env in envs:
with patch.dict("os.environ", env):
client = rs_or_single_client(serverMonitoringMode="auto")
self.addCleanup(client.close)
self.assert_rtt_connection_is_disabled(client)
def assert_rtt_connection_is_disabled(self, client):
client.admin.command("ping")
for _, server in client._topology._servers.items():
monitor = server._monitor
self.assertFalse(monitor._stream)
self.assertIsNone(monitor._rtt_monitor._executor._thread)
# Generate unified tests.
globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__))

View File

@ -1185,7 +1185,9 @@ class TestEventClasses(unittest.TestCase):
def test_server_heartbeat_event_repr(self):
connection_id = ("localhost", 27017)
event = monitoring.ServerHeartbeatStartedEvent(connection_id)
self.assertEqual(repr(event), "<ServerHeartbeatStartedEvent ('localhost', 27017)>")
self.assertEqual(
repr(event), "<ServerHeartbeatStartedEvent ('localhost', 27017) awaited: False>"
)
delta = 0.1
event = monitoring.ServerHeartbeatSucceededEvent(
delta, {"ok": 1}, connection_id # type: ignore[arg-type]

View File

@ -100,6 +100,10 @@ from pymongo.monitoring import (
PoolReadyEvent,
ServerClosedEvent,
ServerDescriptionChangedEvent,
ServerHeartbeatFailedEvent,
ServerHeartbeatListener,
ServerHeartbeatStartedEvent,
ServerHeartbeatSucceededEvent,
ServerListener,
ServerOpeningEvent,
TopologyEvent,
@ -107,6 +111,7 @@ from pymongo.monitoring import (
_ConnectionEvent,
_PoolEvent,
_ServerEvent,
_ServerHeartbeatEvent,
)
from pymongo.operations import SearchIndexModel
from pymongo.read_concern import ReadConcern
@ -288,7 +293,7 @@ class NonLazyCursor:
self.client = None
class EventListenerUtil(CMAPListener, CommandListener, ServerListener):
class EventListenerUtil(CMAPListener, CommandListener, ServerListener, ServerHeartbeatListener):
def __init__(
self, observe_events, ignore_commands, observe_sensitive_commands, store_events, entity_map
):
@ -319,7 +324,11 @@ class EventListenerUtil(CMAPListener, CommandListener, ServerListener):
return [e for e in self.events if isinstance(e, _CommandEvent)]
if event_type == "cmap":
return [e for e in self.events if isinstance(e, (_ConnectionEvent, _PoolEvent))]
return [e for e in self.events if isinstance(e, (_ServerEvent, TopologyEvent))]
return [
e
for e in self.events
if isinstance(e, (_ServerEvent, TopologyEvent, _ServerHeartbeatEvent))
]
def add_event(self, event):
event_name = type(event).__name__.lower()
@ -339,23 +348,32 @@ class EventListenerUtil(CMAPListener, CommandListener, ServerListener):
self.add_event(event)
def started(self, event):
if event.command == {}:
# Command is redacted. Observe only if flag is set.
if self._observe_sensitive_commands:
if isinstance(event, CommandStartedEvent):
if event.command == {}:
# Command is redacted. Observe only if flag is set.
if self._observe_sensitive_commands:
self._command_event(event)
else:
self._command_event(event)
else:
self._command_event(event)
self.add_event(event)
def succeeded(self, event):
if event.reply == {}:
# Command is redacted. Observe only if flag is set.
if self._observe_sensitive_commands:
if isinstance(event, CommandSucceededEvent):
if event.reply == {}:
# Command is redacted. Observe only if flag is set.
if self._observe_sensitive_commands:
self._command_event(event)
else:
self._command_event(event)
else:
self._command_event(event)
self.add_event(event)
def failed(self, event):
self._command_event(event)
if isinstance(event, CommandFailedEvent):
self._command_event(event)
else:
self.add_event(event)
def opened(self, event: ServerOpeningEvent) -> None:
self.add_event(event)
@ -833,6 +851,18 @@ class MatchEvaluatorUtil:
)
if "newDescription" in spec:
self.match_server_description(actual.new_description, spec["newDescription"])
elif name == "serverHeartbeatStartedEvent":
self.test.assertIsInstance(actual, ServerHeartbeatStartedEvent)
if "awaited" in spec:
self.test.assertEqual(actual.awaited, spec["awaited"])
elif name == "serverHeartbeatSucceededEvent":
self.test.assertIsInstance(actual, ServerHeartbeatSucceededEvent)
if "awaited" in spec:
self.test.assertEqual(actual.awaited, spec["awaited"])
elif name == "serverHeartbeatFailedEvent":
self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent)
if "awaited" in spec:
self.test.assertEqual(actual.awaited, spec["awaited"])
else:
raise Exception(f"Unsupported event type {name}")
@ -868,7 +898,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string("1.15")
SCHEMA_VERSION = Version.from_string("1.17")
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
TEST_SPEC: Any

View File

@ -0,0 +1,46 @@
{
"tests": [
{
"description": "serverMonitoringMode=auto",
"uri": "mongodb://example.com/?serverMonitoringMode=auto",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "auto"
}
},
{
"description": "serverMonitoringMode=stream",
"uri": "mongodb://example.com/?serverMonitoringMode=stream",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "stream"
}
},
{
"description": "serverMonitoringMode=poll",
"uri": "mongodb://example.com/?serverMonitoringMode=poll",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "poll"
}
},
{
"description": "invalid serverMonitoringMode",
"uri": "mongodb://example.com/?serverMonitoringMode=invalid",
"valid": true,
"warning": true,
"hosts": null,
"auth": null,
"options": {}
}
]
}

View File

@ -300,6 +300,7 @@ class MockPool:
self._lock = _create_lock()
self.opts = options
self.operation_count = 0
self.conns = []
def stale_generation(self, gen, service_id):
return self.gen.stale(gen, service_id)