PYTHON-5642 - getMore operations should do server selection if the server is unknown (#2621)
This commit is contained in:
parent
1a434c7c59
commit
42cf3407c8
@ -18,6 +18,13 @@ PyMongo 4.16 brings a number of changes including:
|
||||
Eventlet is actively being sunset by its maintainers and has compatibility issues with PyMongo's dnspython dependency.
|
||||
- Use Zstandard support from the standard library for Python 3.14+, and use ``backports.zstd`` for older versions.
|
||||
|
||||
Changes in Version 4.15.5 (2025/XX/XX)
|
||||
--------------------------------------
|
||||
|
||||
Version 4.15.5 is a bug fix release.
|
||||
|
||||
- Fixed a bug that could cause ``AutoReconnect("connection pool paused")`` errors when cursors fetched more documents from the database after SDAM heartbeat failures.
|
||||
|
||||
Changes in Version 4.15.4 (2025/10/21)
|
||||
--------------------------------------
|
||||
|
||||
|
||||
@ -322,7 +322,7 @@ class TopologyDescription:
|
||||
if address:
|
||||
# Ignore selectors when explicit address is requested.
|
||||
description = self.server_descriptions().get(address)
|
||||
return [description] if description else []
|
||||
return [description] if description and description.is_server_type_known else []
|
||||
|
||||
# Primary selection fast path.
|
||||
if self.topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary and type(selector) is Primary:
|
||||
|
||||
@ -17,9 +17,10 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from pymongo import AsyncMongoClient, ReadPreference
|
||||
from pymongo import AsyncMongoClient, ReadPreference, monitoring
|
||||
from pymongo.asynchronous.settings import TopologySettings
|
||||
from pymongo.asynchronous.topology import Topology
|
||||
from pymongo.errors import ServerSelectionTimeoutError
|
||||
@ -30,7 +31,7 @@ from pymongo.typings import strip_optional
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
|
||||
from test.asynchronous import AsyncIntegrationTest, async_client_context, client_knobs, unittest
|
||||
from test.asynchronous.utils import async_wait_until
|
||||
from test.asynchronous.utils_selection_tests import (
|
||||
create_selection_tests,
|
||||
@ -42,6 +43,7 @@ from test.utils_selection_tests_shared import (
|
||||
)
|
||||
from test.utils_shared import (
|
||||
FunctionCallRecorder,
|
||||
HeartbeatEventListener,
|
||||
OvertCommandListener,
|
||||
)
|
||||
|
||||
@ -207,6 +209,40 @@ class TestCustomServerSelectorFunction(AsyncIntegrationTest):
|
||||
)
|
||||
self.assertEqual(selector.call_count, 0)
|
||||
|
||||
@async_client_context.require_replica_set
|
||||
@async_client_context.require_failCommand_appName
|
||||
async def test_server_selection_getMore_blocks(self):
|
||||
hb_listener = HeartbeatEventListener()
|
||||
client = await self.async_rs_client(
|
||||
event_listeners=[hb_listener], heartbeatFrequencyMS=500, appName="heartbeatFailedClient"
|
||||
)
|
||||
coll = client.db.test
|
||||
await coll.drop()
|
||||
docs = [{"x": 1} for _ in range(5)]
|
||||
await coll.insert_many(docs)
|
||||
|
||||
fail_heartbeat = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 4},
|
||||
"data": {
|
||||
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
|
||||
"closeConnection": True,
|
||||
"appName": "heartbeatFailedClient",
|
||||
},
|
||||
}
|
||||
|
||||
def hb_failed(event):
|
||||
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
|
||||
|
||||
cursor = coll.find({}, batch_size=1)
|
||||
await cursor.next() # force initial query that will pin the address for the getMore
|
||||
|
||||
async with self.fail_point(fail_heartbeat):
|
||||
await async_wait_until(
|
||||
lambda: hb_listener.matching(hb_failed), "published failed event"
|
||||
)
|
||||
self.assertEqual(len(await cursor.to_list()), 4)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -17,9 +17,10 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from pymongo import MongoClient, ReadPreference
|
||||
from pymongo import MongoClient, ReadPreference, monitoring
|
||||
from pymongo.errors import ServerSelectionTimeoutError
|
||||
from pymongo.hello import HelloCompat
|
||||
from pymongo.operations import _Op
|
||||
@ -30,7 +31,7 @@ from pymongo.typings import strip_optional
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from test import IntegrationTest, client_context, unittest
|
||||
from test import IntegrationTest, client_context, client_knobs, unittest
|
||||
from test.utils import wait_until
|
||||
from test.utils_selection_tests import (
|
||||
create_selection_tests,
|
||||
@ -42,6 +43,7 @@ from test.utils_selection_tests_shared import (
|
||||
)
|
||||
from test.utils_shared import (
|
||||
FunctionCallRecorder,
|
||||
HeartbeatEventListener,
|
||||
OvertCommandListener,
|
||||
)
|
||||
|
||||
@ -205,6 +207,38 @@ class TestCustomServerSelectorFunction(IntegrationTest):
|
||||
topology.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1)
|
||||
self.assertEqual(selector.call_count, 0)
|
||||
|
||||
@client_context.require_replica_set
|
||||
@client_context.require_failCommand_appName
|
||||
def test_server_selection_getMore_blocks(self):
|
||||
hb_listener = HeartbeatEventListener()
|
||||
client = self.rs_client(
|
||||
event_listeners=[hb_listener], heartbeatFrequencyMS=500, appName="heartbeatFailedClient"
|
||||
)
|
||||
coll = client.db.test
|
||||
coll.drop()
|
||||
docs = [{"x": 1} for _ in range(5)]
|
||||
coll.insert_many(docs)
|
||||
|
||||
fail_heartbeat = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {"times": 4},
|
||||
"data": {
|
||||
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
|
||||
"closeConnection": True,
|
||||
"appName": "heartbeatFailedClient",
|
||||
},
|
||||
}
|
||||
|
||||
def hb_failed(event):
|
||||
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
|
||||
|
||||
cursor = coll.find({}, batch_size=1)
|
||||
cursor.next() # force initial query that will pin the address for the getMore
|
||||
|
||||
with self.fail_point(fail_heartbeat):
|
||||
wait_until(lambda: hb_listener.matching(hb_failed), "published failed event")
|
||||
self.assertEqual(len(cursor.to_list()), 4)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user