PYTHON 5212 - Use asyncio.loop.sock_connect in _async_create_connection (#2383)

This commit is contained in:
Noah Stapp 2025-06-13 15:30:10 -04:00 committed by GitHub
parent c16ef0a13e
commit 50ea82310d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 88 additions and 5 deletions

View File

@ -8,6 +8,23 @@ PyMongo 4.14 brings a number of changes including:
- Added :attr:`bson.codec_options.TypeRegistry.codecs` and :attr:`bson.codec_options.TypeRegistry.fallback_encoder` properties
to allow users to directly access the type codecs and fallback encoder for a given :class:`bson.codec_options.TypeRegistry`.
Changes in Version 4.13.2 (2025/06/17)
--------------------------------------
Version 4.13.2 is a bug fix release.
- Fixed a bug where ``AsyncMongoClient`` would block the event loop while creating new connections,
potentially significantly increasing latency for ongoing operations.
Issues Resolved
...............
See the `PyMongo 4.13.2 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.13.2 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=43937
Changes in Version 4.13.1 (2025/06/10)
--------------------------------------

View File

@ -206,7 +206,8 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s
# SOCK_CLOEXEC not supported for Unix sockets.
_set_non_inheritable_non_atomic(sock.fileno())
try:
sock.connect(host)
sock.setblocking(False)
await asyncio.get_running_loop().sock_connect(sock, host)
return sock
except OSError:
sock.close()
@ -241,14 +242,22 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s
timeout = options.connect_timeout
elif timeout <= 0:
raise socket.timeout("timed out")
sock.settimeout(timeout)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
_set_keepalive_times(sock)
sock.connect(sa)
# Socket needs to be non-blocking during connection to not block the event loop
sock.setblocking(False)
await asyncio.wait_for(
asyncio.get_running_loop().sock_connect(sock, sa), timeout=timeout
)
sock.settimeout(timeout)
return sock
except OSError as e:
err = e
except asyncio.TimeoutError as e:
sock.close()
err = socket.timeout("timed out")
err.__cause__ = e
except OSError as e:
sock.close()
err = e # type: ignore[assignment]
if err is not None:
raise err

View File

@ -0,0 +1,56 @@
# Copyright 2025-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test that the asynchronous API does not block the event loop."""
from __future__ import annotations
import asyncio
import time
from test.asynchronous import AsyncIntegrationTest
from pymongo.errors import ServerSelectionTimeoutError
class TestClientLoopUnblocked(AsyncIntegrationTest):
async def test_client_does_not_block_loop(self):
# Use an unreachable TEST-NET host to ensure that the client times out attempting to create a connection.
client = self.simple_client("192.0.2.1", serverSelectionTimeoutMS=500)
latencies = []
# If the loop is being blocked, at least one iteration will have a latency much more than 0.1 seconds
async def background_task():
start = time.monotonic()
try:
while True:
start = time.monotonic()
await asyncio.sleep(0.1)
latencies.append(time.monotonic() - start)
except asyncio.CancelledError:
latencies.append(time.monotonic() - start)
raise
t = asyncio.create_task(background_task())
with self.assertRaisesRegex(ServerSelectionTimeoutError, "No servers found yet"):
await client.admin.command("ping")
t.cancel()
with self.assertRaises(asyncio.CancelledError):
await t
self.assertLessEqual(
sorted(latencies, reverse=True)[0],
1.0,
"Background task was blocked from running",
)

View File

@ -186,6 +186,7 @@ def async_only_test(f: str) -> bool:
"test_async_cancellation.py",
"test_async_loop_safety.py",
"test_async_contextvars_reset.py",
"test_async_loop_unblocked.py",
]