From 09897b698e0ef6cf429c93ac726c4e65dccb53f7 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 15:13:38 -0400 Subject: [PATCH 1/4] PYTHON-5212 - Do not hold Topology lock while resetting pool (#2301) --- pymongo/asynchronous/pool.py | 31 ++++++-- pymongo/asynchronous/topology.py | 11 ++- pymongo/synchronous/pool.py | 31 ++++++-- pymongo/synchronous/topology.py | 11 ++- test/__init__.py | 8 ++ test/asynchronous/__init__.py | 8 ++ .../test_discovery_and_monitoring.py | 73 +++++++++++++++++++ test/test_discovery_and_monitoring.py | 71 ++++++++++++++++++ tools/synchro.py | 11 ++- 9 files changed, 230 insertions(+), 25 deletions(-) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a67cc5f3c..8b18ab927 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio import collections import contextlib import logging @@ -860,8 +861,14 @@ class Pool: # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. if close: - for conn in sockets: - await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) + if not _IS_SYNC: + await asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], + return_exceptions=True, + ) + else: + for conn in sockets: + await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) @@ -891,8 +898,14 @@ class Pool: serverPort=self.address[1], serviceId=service_id, ) - for conn in sockets: - await conn.close_conn(ConnectionClosedReason.STALE) + if not _IS_SYNC: + await asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], + return_exceptions=True, + ) + else: + for conn in sockets: + await conn.close_conn(ConnectionClosedReason.STALE) async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the @@ -938,8 +951,14 @@ class Pool: and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): close_conns.append(self.conns.pop()) - for conn in close_conns: - await conn.close_conn(ConnectionClosedReason.IDLE) + if not _IS_SYNC: + await asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], + return_exceptions=True, + ) + else: + for conn in close_conns: + await conn.close_conn(ConnectionClosedReason.IDLE) while True: async with self.size_cond: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 32776bf7b..438dd1e35 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -529,12 +529,6 @@ class Topology: if not _IS_SYNC: self._monitor_tasks.append(self._srv_monitor) - # Clear the pool from a failed heartbeat. - if reset_pool: - server = self._servers.get(server_description.address) - if server: - await server.pool.reset(interrupt_connections=interrupt_connections) - # Wake anything waiting in select_servers(). self._condition.notify_all() @@ -557,6 +551,11 @@ class Topology: # that didn't include this server. if self._opened and self._description.has_server(server_description.address): await self._process_change(server_description, reset_pool, interrupt_connections) + # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. + if reset_pool: + server = self._servers.get(server_description.address) + if server: + await server.pool.reset(interrupt_connections=interrupt_connections) async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new seedlist on an opened topology. diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 224834af3..b3eec64f2 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio import collections import contextlib import logging @@ -858,8 +859,14 @@ class Pool: # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. if close: - for conn in sockets: - conn.close_conn(ConnectionClosedReason.POOL_CLOSED) + if not _IS_SYNC: + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets], + return_exceptions=True, + ) + else: + for conn in sockets: + conn.close_conn(ConnectionClosedReason.POOL_CLOSED) if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) @@ -889,8 +896,14 @@ class Pool: serverPort=self.address[1], serviceId=service_id, ) - for conn in sockets: - conn.close_conn(ConnectionClosedReason.STALE) + if not _IS_SYNC: + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], + return_exceptions=True, + ) + else: + for conn in sockets: + conn.close_conn(ConnectionClosedReason.STALE) def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the @@ -934,8 +947,14 @@ class Pool: and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds ): close_conns.append(self.conns.pop()) - for conn in close_conns: - conn.close_conn(ConnectionClosedReason.IDLE) + if not _IS_SYNC: + asyncio.gather( + *[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns], + return_exceptions=True, + ) + else: + for conn in close_conns: + conn.close_conn(ConnectionClosedReason.IDLE) while True: with self.size_cond: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index df23bff28..1e99adf72 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -529,12 +529,6 @@ class Topology: if not _IS_SYNC: self._monitor_tasks.append(self._srv_monitor) - # Clear the pool from a failed heartbeat. - if reset_pool: - server = self._servers.get(server_description.address) - if server: - server.pool.reset(interrupt_connections=interrupt_connections) - # Wake anything waiting in select_servers(). self._condition.notify_all() @@ -557,6 +551,11 @@ class Topology: # that didn't include this server. if self._opened and self._description.has_server(server_description.address): self._process_change(server_description, reset_pool, interrupt_connections) + # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. + if reset_pool: + server = self._servers.get(server_description.address) + if server: + server.pool.reset(interrupt_connections=interrupt_connections) def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new seedlist on an opened topology. diff --git a/test/__init__.py b/test/__init__.py index 7ae343206..39b4045e6 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -826,6 +826,14 @@ class ClientContext: lambda: _IS_SYNC, "This test only works with the synchronous API", func=func ) + def require_async(self, func): + """Run a test only if using the asynchronous API.""" # unasync: off + return self._require( + lambda: not _IS_SYNC, + "This test only works with the asynchronous API", # unasync: off + func=func, + ) + def mongos_seeds(self): return ",".join("{}:{}".format(*address) for address in self.mongoses) diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index c57bf2a88..882cb6110 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -828,6 +828,14 @@ class AsyncClientContext: lambda: _IS_SYNC, "This test only works with the synchronous API", func=func ) + def require_async(self, func): + """Run a test only if using the asynchronous API.""" # unasync: off + return self._require( + lambda: not _IS_SYNC, + "This test only works with the asynchronous API", # unasync: off + func=func, + ) + def mongos_seeds(self): return ",".join("{}:{}".format(*address) for address in self.mongoses) diff --git a/test/asynchronous/test_discovery_and_monitoring.py b/test/asynchronous/test_discovery_and_monitoring.py index fa62b25dd..cf26faf24 100644 --- a/test/asynchronous/test_discovery_and_monitoring.py +++ b/test/asynchronous/test_discovery_and_monitoring.py @@ -20,10 +20,15 @@ import os import socketserver import sys import threading +import time from asyncio import StreamReader, StreamWriter from pathlib import Path from test.asynchronous.helpers import ConcurrentRunner +from pymongo.asynchronous.pool import AsyncConnection +from pymongo.operations import _Op +from pymongo.server_selectors import writable_server_selector + sys.path[0:0] = [""] from test.asynchronous import ( @@ -370,6 +375,74 @@ class TestPoolManagement(AsyncIntegrationTest): await listener.async_wait_for_event(monitoring.ServerHeartbeatSucceededEvent, 1) await listener.async_wait_for_event(monitoring.PoolReadyEvent, 1) + @async_client_context.require_failCommand_appName + @async_client_context.require_test_commands + @async_client_context.require_async + async def test_connection_close_does_not_block_other_operations(self): + listener = CMAPHeartbeatListener() + client = await self.async_single_client( + appName="SDAMConnectionCloseTest", + event_listeners=[listener], + heartbeatFrequencyMS=500, + minPoolSize=10, + ) + server = await (await client._get_topology()).select_server( + writable_server_selector, _Op.TEST + ) + await async_wait_until( + lambda: len(server._pool.conns) == 10, + "pool initialized with 10 connections", + ) + + await client.db.test.insert_one({"x": 1}) + close_delay = 0.1 + latencies = [] + should_exit = [] + + async def run_task(): + while True: + start_time = time.monotonic() + await client.db.test.find_one({}) + elapsed = time.monotonic() - start_time + latencies.append(elapsed) + if should_exit: + break + await asyncio.sleep(0.001) + + task = ConcurrentRunner(target=run_task) + await task.start() + original_close = AsyncConnection.close_conn + try: + # Artificially delay the close operation to simulate a slow close + async def mock_close(self, reason): + await asyncio.sleep(close_delay) + await original_close(self, reason) + + AsyncConnection.close_conn = mock_close + + fail_hello = { + "mode": {"times": 4}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "errorCode": 91, + "appName": "SDAMConnectionCloseTest", + }, + } + async with self.fail_point(fail_hello): + # Wait for server heartbeat to fail + await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) + # Wait until all idle connections are closed to simulate real-world conditions + await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) + # Wait for one more find to complete after the pool has been reset, then shutdown the task + n = len(latencies) + await async_wait_until(lambda: len(latencies) >= n + 1, "run one more find") + should_exit.append(True) + await task.join() + # No operation latency should not significantly exceed close_delay + self.assertLessEqual(max(latencies), close_delay * 5.0) + finally: + AsyncConnection.close_conn = original_close + class TestServerMonitoringMode(AsyncIntegrationTest): @async_client_context.require_no_serverless diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 07720473c..9d6c94570 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -20,10 +20,15 @@ import os import socketserver import sys import threading +import time from asyncio import StreamReader, StreamWriter from pathlib import Path from test.helpers import ConcurrentRunner +from pymongo.operations import _Op +from pymongo.server_selectors import writable_server_selector +from pymongo.synchronous.pool import Connection + sys.path[0:0] = [""] from test import ( @@ -370,6 +375,72 @@ class TestPoolManagement(IntegrationTest): listener.wait_for_event(monitoring.ServerHeartbeatSucceededEvent, 1) listener.wait_for_event(monitoring.PoolReadyEvent, 1) + @client_context.require_failCommand_appName + @client_context.require_test_commands + @client_context.require_async + def test_connection_close_does_not_block_other_operations(self): + listener = CMAPHeartbeatListener() + client = self.single_client( + appName="SDAMConnectionCloseTest", + event_listeners=[listener], + heartbeatFrequencyMS=500, + minPoolSize=10, + ) + server = (client._get_topology()).select_server(writable_server_selector, _Op.TEST) + wait_until( + lambda: len(server._pool.conns) == 10, + "pool initialized with 10 connections", + ) + + client.db.test.insert_one({"x": 1}) + close_delay = 0.1 + latencies = [] + should_exit = [] + + def run_task(): + while True: + start_time = time.monotonic() + client.db.test.find_one({}) + elapsed = time.monotonic() - start_time + latencies.append(elapsed) + if should_exit: + break + time.sleep(0.001) + + task = ConcurrentRunner(target=run_task) + task.start() + original_close = Connection.close_conn + try: + # Artificially delay the close operation to simulate a slow close + def mock_close(self, reason): + time.sleep(close_delay) + original_close(self, reason) + + Connection.close_conn = mock_close + + fail_hello = { + "mode": {"times": 4}, + "data": { + "failCommands": [HelloCompat.LEGACY_CMD, "hello"], + "errorCode": 91, + "appName": "SDAMConnectionCloseTest", + }, + } + with self.fail_point(fail_hello): + # Wait for server heartbeat to fail + listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) + # Wait until all idle connections are closed to simulate real-world conditions + listener.wait_for_event(monitoring.ConnectionClosedEvent, 10) + # Wait for one more find to complete after the pool has been reset, then shutdown the task + n = len(latencies) + wait_until(lambda: len(latencies) >= n + 1, "run one more find") + should_exit.append(True) + task.join() + # No operation latency should not significantly exceed close_delay + self.assertLessEqual(max(latencies), close_delay * 5.0) + finally: + Connection.close_conn = original_close + class TestServerMonitoringMode(IntegrationTest): @client_context.require_no_serverless diff --git a/tools/synchro.py b/tools/synchro.py index f6176e203..bfe8f7112 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -288,7 +288,8 @@ def process_files( if file in docstring_translate_files: lines = translate_docstrings(lines) if file in sync_test_files: - translate_imports(lines) + lines = translate_imports(lines) + lines = process_ignores(lines) f.seek(0) f.writelines(lines) f.truncate() @@ -390,6 +391,14 @@ def translate_docstrings(lines: list[str]) -> list[str]: return [line for line in lines if line != "DOCSTRING_REMOVED"] +def process_ignores(lines: list[str]) -> list[str]: + for i in range(len(lines)): + for k, v in replacements.items(): + if "unasync: off" in lines[i] and v in lines[i]: + lines[i] = lines[i].replace(v, k) + return lines + + def unasync_directory(files: list[str], src: str, dest: str, replacements: dict[str, str]) -> None: unasync_files( files, From 42cb70e9ab4300cd27eb4340bf6e59dbc5f973e0 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 23 Apr 2025 14:43:49 -0500 Subject: [PATCH 2/4] PYTHON-5341 Fix handling of SSL tests with Stable API (#2305) --- test/asynchronous/test_ssl.py | 6 ++++++ test/test_ssl.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/test/asynchronous/test_ssl.py b/test/asynchronous/test_ssl.py index d920b77ac..4d7566a61 100644 --- a/test/asynchronous/test_ssl.py +++ b/test/asynchronous/test_ssl.py @@ -166,11 +166,14 @@ class TestSSL(AsyncIntegrationTest): @async_client_context.require_tls async def test_simple_ssl(self): + if "PyPy" in sys.version: + self.skipTest("Test is flaky on PyPy") # Expects the server to be running with ssl and with # no --sslPEMKeyFile or with --sslWeakCertificateValidation await self.assertClientWorks(self.client) @async_client_context.require_tlsCertificateKeyFile + @async_client_context.require_no_api_version @ignore_deprecations async def test_tlsCertificateKeyFilePassword(self): # Expects the server to be running with server.pem and ca.pem @@ -376,6 +379,7 @@ class TestSSL(AsyncIntegrationTest): ) @async_client_context.require_tlsCertificateKeyFile + @async_client_context.require_no_api_version @ignore_deprecations async def test_tlsCRLFile_support(self): if not hasattr(ssl, "VERIFY_CRL_CHECK_LEAF") or _ssl.IS_PYOPENSSL: @@ -531,6 +535,7 @@ class TestSSL(AsyncIntegrationTest): @async_client_context.require_auth @async_client_context.require_tlsCertificateKeyFile + @async_client_context.require_no_api_version @ignore_deprecations async def test_mongodb_x509_auth(self): host, port = await async_client_context.host, await async_client_context.port @@ -640,6 +645,7 @@ class TestSSL(AsyncIntegrationTest): self.fail("Invalid certificate accepted.") @async_client_context.require_tlsCertificateKeyFile + @async_client_context.require_no_api_version @ignore_deprecations async def test_connect_with_ca_bundle(self): def remove(path): diff --git a/test/test_ssl.py b/test/test_ssl.py index a66fe21be..7decc8203 100644 --- a/test/test_ssl.py +++ b/test/test_ssl.py @@ -166,11 +166,14 @@ class TestSSL(IntegrationTest): @client_context.require_tls def test_simple_ssl(self): + if "PyPy" in sys.version: + self.skipTest("Test is flaky on PyPy") # Expects the server to be running with ssl and with # no --sslPEMKeyFile or with --sslWeakCertificateValidation self.assertClientWorks(self.client) @client_context.require_tlsCertificateKeyFile + @client_context.require_no_api_version @ignore_deprecations def test_tlsCertificateKeyFilePassword(self): # Expects the server to be running with server.pem and ca.pem @@ -376,6 +379,7 @@ class TestSSL(IntegrationTest): ) @client_context.require_tlsCertificateKeyFile + @client_context.require_no_api_version @ignore_deprecations def test_tlsCRLFile_support(self): if not hasattr(ssl, "VERIFY_CRL_CHECK_LEAF") or _ssl.IS_PYOPENSSL: @@ -531,6 +535,7 @@ class TestSSL(IntegrationTest): @client_context.require_auth @client_context.require_tlsCertificateKeyFile + @client_context.require_no_api_version @ignore_deprecations def test_mongodb_x509_auth(self): host, port = client_context.host, client_context.port @@ -640,6 +645,7 @@ class TestSSL(IntegrationTest): self.fail("Invalid certificate accepted.") @client_context.require_tlsCertificateKeyFile + @client_context.require_no_api_version @ignore_deprecations def test_connect_with_ca_bundle(self): def remove(path): From 1bdf035802479afd232b0d41fbe3823345773e9d Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 16:32:08 -0400 Subject: [PATCH 3/4] PYTHON-5212 changelog update (#2306) --- doc/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/changelog.rst b/doc/changelog.rst index 4fff06c9c..46e7364f5 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -16,6 +16,8 @@ Version 4.12.1 is a bug fix release. errors such as: "NotImplementedError: Database objects do not implement truth value testing or bool()". - Removed Eventlet testing against Python versions newer than 3.9 since Eventlet is actively being sunset by its maintainers and has compatibility issues with PyMongo's dnspython dependency. +- Fixed a bug where MongoDB cluster topology changes could cause asynchronous operations to take much longer to complete + due to holding the Topology lock while closing stale connections. Issues Resolved ............... From 34f7d7ee4c2cbe8ba23c91a489a938af2cb46386 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 23 Apr 2025 16:32:39 -0400 Subject: [PATCH 4/4] =?UTF-8?q?PYTHON-5346=20-=20test=5Finit=5Fdisconnecte?= =?UTF-8?q?d=5Fwith=5Fsrv=20cannot=20run=20against=20shar=E2=80=A6=20(#230?= =?UTF-8?q?4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index b9deb985b..1e1faf0a2 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -850,7 +850,7 @@ class TestClient(AsyncIntegrationTest): with self.assertRaises(ConnectionFailure): await c.pymongo_test.test.find_one() - @async_client_context.require_no_standalone + @async_client_context.require_replica_set @async_client_context.require_no_load_balancer @async_client_context.require_tls async def test_init_disconnected_with_srv(self): diff --git a/test/test_client.py b/test/test_client.py index c2df8ab2b..189e58e80 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -825,7 +825,7 @@ class TestClient(IntegrationTest): with self.assertRaises(ConnectionFailure): c.pymongo_test.test.find_one() - @client_context.require_no_standalone + @client_context.require_replica_set @client_context.require_no_load_balancer @client_context.require_tls def test_init_disconnected_with_srv(self):