From c4e4bd638f7b82032cbd13b8e56a7593b56e964c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 19 Jan 2024 10:55:10 -0800 Subject: [PATCH] PYTHON-3175 Preemptively cancel in progress operations when SDAM heartbeats timeout (#1465) --- pymongo/monitor.py | 6 +- pymongo/monitoring.py | 30 +- pymongo/network.py | 56 +- pymongo/pool.py | 39 +- pymongo/topology.py | 16 +- ...lear-interrupting-pending-connections.json | 77 +++ ...e-run-interruptInUseConnections-false.json | 81 +++ .../unified/interruptInUse-pool-clear.json | 552 ++++++++++++++++++ test/test_client.py | 3 +- test/test_cmap.py | 7 +- ...dEvent-interruptInUseConnections-type.json | 23 + test/unified_format.py | 9 + test/utils.py | 2 +- 13 files changed, 848 insertions(+), 53 deletions(-) create mode 100644 test/cmap/pool-clear-interrupting-pending-connections.json create mode 100644 test/cmap/pool-clear-schedule-run-interruptInUseConnections-false.json create mode 100644 test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json create mode 100644 test/unified-test-format/invalid/expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type.json diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 4e6f86934..bf8525fe8 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Mapping, Optional, cast from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum -from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelled +from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.periodic_executor import _shutdown_executors @@ -204,7 +204,9 @@ class Monitor(MonitorBase): # Update the Topology and clear the server pool on error. self._topology.on_change( - self._server_description, reset_pool=self._server_description.error + self._server_description, + reset_pool=self._server_description.error, + interrupt_connections=isinstance(self._server_description.error, NetworkTimeout), ) if self._stream and ( diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 164b53ed2..07c7f5f47 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -874,15 +874,22 @@ class PoolClearedEvent(_PoolEvent): :param address: The address (host, port) pair of the server this Pool is attempting to connect to. :param service_id: The service_id this command was sent to, or ``None``. + :param interrupt_connections: True if all active connections were interrupted by the Pool during clearing. .. versionadded:: 3.9 """ - __slots__ = ("__service_id",) + __slots__ = ("__service_id", "__interrupt_connections") - def __init__(self, address: _Address, service_id: Optional[ObjectId] = None) -> None: + def __init__( + self, + address: _Address, + service_id: Optional[ObjectId] = None, + interrupt_connections: bool = False, + ) -> None: super().__init__(address) self.__service_id = service_id + self.__interrupt_connections = interrupt_connections @property def service_id(self) -> Optional[ObjectId]: @@ -894,8 +901,16 @@ class PoolClearedEvent(_PoolEvent): """ return self.__service_id + @property + def interrupt_connections(self) -> bool: + """If True, active connections are interrupted during clearing. + + .. versionadded:: 4.7 + """ + return self.__interrupt_connections + def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.address!r}, {self.__service_id!r})" + return f"{self.__class__.__name__}({self.address!r}, {self.__service_id!r}, {self.__interrupt_connections!r})" class PoolClosedEvent(_PoolEvent): @@ -1775,9 +1790,14 @@ class _EventListeners: except Exception: _handle_exception() - def publish_pool_cleared(self, address: _Address, service_id: Optional[ObjectId]) -> None: + def publish_pool_cleared( + self, + address: _Address, + service_id: Optional[ObjectId], + interrupt_connections: bool = False, + ) -> None: """Publish a :class:`PoolClearedEvent` to all pool listeners.""" - event = PoolClearedEvent(address, service_id) + event = PoolClearedEvent(address, service_id, interrupt_connections) for subscriber in self.__cmap_listeners: try: subscriber.pool_cleared(event) diff --git a/pymongo/network.py b/pymongo/network.py index a6a308c3d..ce4c76799 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -296,35 +296,35 @@ _POLL_TIMEOUT = 0.5 def wait_for_read(conn: Connection, deadline: Optional[float]) -> None: """Block until at least one byte is read, or a timeout, or a cancel.""" - context = conn.cancel_context - # Only Monitor connections can be cancelled. - if context: - sock = conn.conn - timed_out = False - while True: - # SSLSocket can have buffered data which won't be caught by select. - if hasattr(sock, "pending") and sock.pending() > 0: - readable = True + sock = conn.conn + timed_out = False + # Check if the connection's socket has been manually closed + if sock.fileno() == -1: + return + while True: + # SSLSocket can have buffered data which won't be caught by select. + if hasattr(sock, "pending") and sock.pending() > 0: + readable = True + else: + # Wait up to 500ms for the socket to become readable and then + # check for cancellation. + if deadline: + remaining = deadline - time.monotonic() + # When the timeout has expired perform one final check to + # see if the socket is readable. This helps avoid spurious + # timeouts on AWS Lambda and other FaaS environments. + if remaining <= 0: + timed_out = True + timeout = max(min(remaining, _POLL_TIMEOUT), 0) else: - # Wait up to 500ms for the socket to become readable and then - # check for cancellation. - if deadline: - remaining = deadline - time.monotonic() - # When the timeout has expired perform one final check to - # see if the socket is readable. This helps avoid spurious - # timeouts on AWS Lambda and other FaaS environments. - if remaining <= 0: - timed_out = True - timeout = max(min(remaining, _POLL_TIMEOUT), 0) - else: - timeout = _POLL_TIMEOUT - readable = conn.socket_checker.select(sock, read=True, timeout=timeout) - if context.cancelled: - raise _OperationCancelled("hello cancelled") - if readable: - return - if timed_out: - raise socket.timeout("timed out") + timeout = _POLL_TIMEOUT + readable = conn.socket_checker.select(sock, read=True, timeout=timeout) + if conn.cancel_context.cancelled: + raise _OperationCancelled("operation cancelled") + if readable: + return + if timed_out: + raise socket.timeout("timed out") # Errors raised by sockets (and TLS sockets) when in non-blocking mode. diff --git a/pymongo/pool.py b/pymongo/pool.py index 610fa5b97..fb7b45bc5 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -739,10 +739,7 @@ class Connection: self.pool_gen = pool.gen self.generation = self.pool_gen.get_overall() self.ready = False - self.cancel_context: Optional[_CancellationContext] = None - if not pool.handshake: - # This is a Monitor connection. - self.cancel_context = _CancellationContext() + self.cancel_context: _CancellationContext = _CancellationContext() self.opts = pool.opts self.more_to_come: bool = False # For load balancer support. @@ -1112,8 +1109,7 @@ class Connection: if self.closed: return self.closed = True - if self.cancel_context: - self.cancel_context.cancel() + self.cancel_context.cancel() # Note: We catch exceptions to avoid spurious errors on interpreter # shutdown. try: @@ -1378,6 +1374,7 @@ class Pool: # and returned to pool from the left side. Stale sockets removed # from the right side. self.conns: collections.deque = collections.deque() + self.active_contexts: set[_CancellationContext] = set() self.lock = _create_lock() self.active_sockets = 0 # Monotonically increasing connection ID required for CMAP Events. @@ -1442,7 +1439,11 @@ class Pool: return self.state == PoolState.CLOSED def _reset( - self, close: bool, pause: bool = True, service_id: Optional[ObjectId] = None + self, + close: bool, + pause: bool = True, + service_id: Optional[ObjectId] = None, + interrupt_connections: bool = False, ) -> None: old_state = self.state with self.size_cond: @@ -1475,6 +1476,10 @@ class Pool: self._max_connecting_cond.notify_all() self.size_cond.notify_all() + if interrupt_connections: + for context in self.active_contexts: + context.cancel() + listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* @@ -1488,7 +1493,11 @@ class Pool: else: if old_state != PoolState.PAUSED and self.enabled_for_cmap: assert listeners is not None - listeners.publish_pool_cleared(self.address, service_id=service_id) + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) @@ -1501,8 +1510,10 @@ class Pool: for _socket in self.conns: _socket.update_is_writable(self.is_writable) - def reset(self, service_id: Optional[ObjectId] = None) -> None: - self._reset(close=False, service_id=service_id) + def reset( + self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False + ) -> None: + self._reset(close=False, service_id=service_id, interrupt_connections=interrupt_connections) def reset_without_pause(self) -> None: self._reset(close=False, pause=False) @@ -1558,6 +1569,7 @@ class Pool: conn.close_conn(ConnectionClosedReason.STALE) return self.conns.appendleft(conn) + self.active_contexts.discard(conn.cancel_context) finally: if incremented: # Notify after adding the socket to the pool. @@ -1602,6 +1614,8 @@ class Pool: raise conn = Connection(sock, self, self.address, conn_id) # type: ignore[arg-type] + with self.lock: + self.active_contexts.add(conn.cancel_context) try: if self.handshake: conn.hello() @@ -1644,6 +1658,8 @@ class Pool: assert listeners is not None listeners.publish_connection_checked_out(self.address, conn.id) try: + with self.lock: + self.active_contexts.add(conn.cancel_context) yield conn except BaseException: # Exception in caller. Ensure the connection gets returned. @@ -1731,7 +1747,6 @@ class Pool: with self.lock: self.active_sockets += 1 incremented = True - while conn is None: # CMAP: we MUST wait for either maxConnecting OR for a socket # to be checked back into the pool. @@ -1794,6 +1809,8 @@ class Pool: conn.pinned_cursor = False self.__pinned_sockets.discard(conn) listeners = self.opts._event_listeners + with self.lock: + self.active_contexts.discard(conn.cancel_context) if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) diff --git a/pymongo/topology.py b/pymongo/topology.py index 81316e3e2..b5afc31b2 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -330,7 +330,10 @@ class Topology: return self.select_server(any_server_selector, server_selection_timeout, address) def _process_change( - self, server_description: ServerDescription, reset_pool: bool = False + self, + server_description: ServerDescription, + reset_pool: bool = False, + interrupt_connections: bool = False, ) -> None: """Process a new ServerDescription on an opened topology. @@ -387,12 +390,17 @@ class Topology: if reset_pool: server = self._servers.get(server_description.address) if server: - server.pool.reset() + server.pool.reset(interrupt_connections=interrupt_connections) # Wake waiters in select_servers(). self._condition.notify_all() - def on_change(self, server_description: ServerDescription, reset_pool: bool = False) -> None: + def on_change( + self, + server_description: ServerDescription, + reset_pool: bool = False, + interrupt_connections: bool = False, + ) -> None: """Process a new ServerDescription after an hello call completes.""" # We do no I/O holding the lock. with self._lock: @@ -405,7 +413,7 @@ class Topology: # change removed it. E.g., we got a host list from the primary # that didn't include this server. if self._opened and self._description.has_server(server_description.address): - self._process_change(server_description, reset_pool) + self._process_change(server_description, reset_pool, interrupt_connections) def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new seedlist on an opened topology. diff --git a/test/cmap/pool-clear-interrupting-pending-connections.json b/test/cmap/pool-clear-interrupting-pending-connections.json new file mode 100644 index 000000000..05966a399 --- /dev/null +++ b/test/cmap/pool-clear-interrupting-pending-connections.json @@ -0,0 +1,77 @@ +{ + "version": 1, + "style": "integration", + "description": "clear with interruptInUseConnections = true closes pending connections", + "runOn": [ + { + "minServerVersion": "4.9.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000 + } + }, + "poolOptions": { + "minPoolSize": 0 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 1 + }, + { + "name": "clear", + "interruptInUseConnections": true + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated" + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": true + }, + { + "type": "ConnectionClosed" + }, + { + "type": "ConnectionCheckOutFailed" + } + ], + "ignore": [ + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionPoolCreated", + "ConnectionPoolReady" + ] +} \ No newline at end of file diff --git a/test/cmap/pool-clear-schedule-run-interruptInUseConnections-false.json b/test/cmap/pool-clear-schedule-run-interruptInUseConnections-false.json new file mode 100644 index 000000000..b84541611 --- /dev/null +++ b/test/cmap/pool-clear-schedule-run-interruptInUseConnections-false.json @@ -0,0 +1,81 @@ +{ + "version": 1, + "style": "unit", + "description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)", + "poolOptions": { + "backgroundThreadIntervalMS": 10000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "clear", + "interruptInUseConnections": false + }, + { + "name": "waitForEvent", + "event": "ConnectionPoolCleared", + "count": 1, + "timeout": 1000 + }, + { + "name": "waitForEvent", + "event": "ConnectionClosed", + "count": 1, + "timeout": 1000 + }, + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": false + }, + { + "type": "ConnectionClosed", + "connectionId": 2, + "reason": "stale", + "address": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionPoolReady", + "ConnectionReady", + "ConnectionCheckOutStarted", + "ConnectionPoolCreated" + ] +} \ No newline at end of file diff --git a/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json b/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json new file mode 100644 index 000000000..6fdef55b4 --- /dev/null +++ b/test/discovery_and_monitoring/unified/interruptInUse-pool-clear.json @@ -0,0 +1,552 @@ +{ + "description": "interruptInUse", + "schemaVersion": "1.11", + "runOnRequirements": [ + { + "minServerVersion": "4.9", + "topologies": [ + "replicaset", + "sharded" + ], + "serverless": "forbid" + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [] + } + ], + "tests": [ + { + "description": "Connection pool clear uses interruptInUseConnections=true after monitor timeout", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUse", + "retryReads": false, + "minPoolSize": 0 + }, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandStartedEvent", + "commandSucceededEvent", + "commandFailedEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { "_id" : 1 } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { "$where": "sleep(2000) || true" } + }, + "expectError": { + "isError": true + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUse" + } + }, + "client": "setupClient" + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": { } + }, + { + "connectionCheckedInEvent": { } + }, + { + "connectionCheckedOutEvent": { } + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": { } + } + ] + } + ], + "outcome": [{ + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [{ "_id": 1 }] + }] + }, + { + "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUseRetryable", + "retryReads": true, + "minPoolSize": 0 + }, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandFailedEvent", + "commandStartedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { "_id" : 1 } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { "$where": "sleep(2000) || true" } + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUseRetryable" + } + }, + "client": "setupClient" + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "commandName": "find" + } + }, + { + "commandSucceededEvent": { + "commandName": "find" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": { } + }, + { + "connectionCheckedInEvent": { } + }, + { + "connectionCheckedOutEvent": { } + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": { } + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ], + "outcome": [{ + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [{ "_id": 1 }] + }] + }, + { + "description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "uriOptions": { + "connectTimeoutMS": 500, + "heartbeatFrequencyMS": 500, + "appname": "interruptInUseRetryableWrite", + "retryWrites": true, + "minPoolSize": 0 + }, + "observeEvents": [ + "poolClearedEvent", + "connectionClosedEvent", + "commandFailedEvent", + "commandStartedEvent", + "commandSucceededEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ]} + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "interruptInUse" + } + }, + { + "thread": { + "id": "thread1" + } + } + ] + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { "_id": 1 } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "thread": "thread1", + "operation": { + "name": "updateOne", + "object": "collection", + "arguments": { + "filter": { "$where": "sleep(2000) || true" }, + "update": [ { "$set": { "a": "bar" } } ] + } + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 4 + }, + "data": { + "failCommands": [ + "hello", "isMaster" + ], + "blockConnection": true, + "blockTimeMS": 1500, + "appName": "interruptInUseRetryableWrite" + } + }, + "client": "setupClient" + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "thread": "thread1" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "commandName": "insert" + } + }, + { + "commandSucceededEvent": { + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "commandName": "update" + } + }, + { + "commandFailedEvent": { + "commandName": "update" + } + }, + { + "commandStartedEvent": { + "commandName": "update" + } + }, + { + "commandSucceededEvent": { + "commandName": "update" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCheckedOutEvent": { } + }, + { + "connectionCheckedInEvent": { } + }, + { + "connectionCheckedOutEvent": { } + }, + { + "poolClearedEvent": { + "interruptInUseConnections": true + } + }, + { + "connectionCheckedInEvent": {} + }, + { + "connectionClosedEvent": { } + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ], + "outcome": [{ + "collectionName": "interruptInUse", + "databaseName": "sdam-tests", + "documents": [{ "_id": 1, "a" : "bar"}] + }] + } + ] +} \ No newline at end of file diff --git a/test/test_client.py b/test/test_client.py index 089b1673b..0b94b2c3b 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -623,7 +623,8 @@ class TestClient(IntegrationTest): client = rs_or_single_client(minPoolSize=10) server = client._get_topology().select_server(readable_server_selector) wait_until( - lambda: len(server._pool.conns) == 10, "pool initialized with 10 connections" + lambda: len(server._pool.conns) == 10, + "pool initialized with 10 connections", ) # Assert that if a socket is closed, a new one takes its place diff --git a/test/test_cmap.py b/test/test_cmap.py index 59757434e..43caadfa7 100644 --- a/test/test_cmap.py +++ b/test/test_cmap.py @@ -144,7 +144,10 @@ class TestCMAP(IntegrationTest): def clear(self, op): """Run the 'clear' operation.""" - self.pool.reset() + if "interruptInUseConnections" in op: + self.pool.reset(interrupt_connections=op["interruptInUseConnections"]) + else: + self.pool.reset() def close(self, op): """Run the 'close' operation.""" @@ -173,6 +176,8 @@ class TestCMAP(IntegrationTest): if attr == "type": continue c2s = camel_to_snake(attr) + if c2s == "interrupt_in_use_connections": + c2s = "interrupt_connections" actual_val = getattr(actual, c2s) if expected_val == 42: self.assertIsNotNone(actual_val) diff --git a/test/unified-test-format/invalid/expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type.json b/test/unified-test-format/invalid/expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type.json new file mode 100644 index 000000000..e25955a0e --- /dev/null +++ b/test/unified-test-format/invalid/expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type.json @@ -0,0 +1,23 @@ +{ + "description": "expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type", + "schemaVersion": "1.11", + "tests": [ + { + "description": "foo", + "operations": [], + "expectEvents": [ + { + "client": "client0", + "eventType": "cmap", + "events": [ + { + "poolClearedEvent": { + "interruptInUseConnections": "foo" + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/test/unified_format.py b/test/unified_format.py index d4080bdc0..141a61bd3 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -779,6 +779,14 @@ class MatchEvaluatorUtil: else: self.test.assertIsNone(actual.service_id) + def assertHasInterruptInUseConnections(self, spec, actual): + if "interruptInUseConnections" in spec: + self.test.assertEqual( + spec.get("interruptInUseConnections"), actual.interrupt_connections + ) + else: + self.test.assertIsInstance(actual.interrupt_connections, bool) + def assertHasServerConnectionId(self, spec, actual): if "hasServerConnectionId" in spec: if spec.get("hasServerConnectionId"): @@ -836,6 +844,7 @@ class MatchEvaluatorUtil: elif name == "poolClearedEvent": self.test.assertIsInstance(actual, PoolClearedEvent) self.assertHasServiceId(spec, actual) + self.assertHasInterruptInUseConnections(spec, actual) elif name == "poolClosedEvent": self.test.assertIsInstance(actual, PoolClosedEvent) elif name == "connectionCreatedEvent": diff --git a/test/utils.py b/test/utils.py index 08d2f1128..85e952bf8 100644 --- a/test/utils.py +++ b/test/utils.py @@ -319,7 +319,7 @@ class MockPool: def ready(self): pass - def reset(self, service_id=None): + def reset(self, service_id=None, interrupt_connections=False): self._reset() def reset_without_pause(self):