PYTHON-3953 - PyMongo should send killCursors on MaxTimeMSExpired Error (#1372)
This commit is contained in:
parent
61269c0f89
commit
d82946334e
@ -183,8 +183,11 @@ class CommandCursor(Generic[_DocumentType]):
|
||||
if exc.code in _CURSOR_CLOSED_ERRORS:
|
||||
# Don't send killCursors because the cursor is already closed.
|
||||
self.__killed = True
|
||||
# Return the session and pinned connection, if necessary.
|
||||
self.close()
|
||||
if exc.timeout:
|
||||
self.__die(False)
|
||||
else:
|
||||
# Return the session and pinned connection, if necessary.
|
||||
self.close()
|
||||
raise
|
||||
except ConnectionFailure:
|
||||
# Don't send killCursors because the cursor is already closed.
|
||||
|
||||
@ -73,7 +73,6 @@ if TYPE_CHECKING:
|
||||
_CURSOR_CLOSED_ERRORS = frozenset(
|
||||
[
|
||||
43, # CursorNotFound
|
||||
50, # MaxTimeMSExpired
|
||||
175, # QueryPlanKilled
|
||||
237, # CursorKilled
|
||||
# On a tailable cursor, the following errors mean the capped collection
|
||||
@ -1065,7 +1064,10 @@ class Cursor(Generic[_DocumentType]):
|
||||
if exc.code in _CURSOR_CLOSED_ERRORS or self.__exhaust:
|
||||
# Don't send killCursors because the cursor is already closed.
|
||||
self.__killed = True
|
||||
self.close()
|
||||
if exc.timeout:
|
||||
self.__die(False)
|
||||
else:
|
||||
self.close()
|
||||
# If this is a tailable cursor the error is likely
|
||||
# due to capped collection roll over. Setting
|
||||
# self.__killed to True ensures Cursor.alive will be
|
||||
@ -1077,7 +1079,6 @@ class Cursor(Generic[_DocumentType]):
|
||||
return
|
||||
raise
|
||||
except ConnectionFailure:
|
||||
# Don't send killCursors because the cursor is already closed.
|
||||
self.__killed = True
|
||||
self.close()
|
||||
raise
|
||||
|
||||
@ -1961,6 +1961,10 @@ class TestExhaustCursor(IntegrationTest):
|
||||
self.assertRaises(ConnectionFailure, list, cursor)
|
||||
self.assertTrue(conn.closed)
|
||||
|
||||
wait_until(
|
||||
lambda: len(client._MongoClient__kill_cursors_queue) == 0,
|
||||
"waited for all killCursor requests to complete",
|
||||
)
|
||||
# The socket was closed and the semaphore was decremented.
|
||||
self.assertNotIn(conn, pool.conns)
|
||||
self.assertEqual(0, pool.requests)
|
||||
|
||||
@ -35,6 +35,7 @@ from test.utils import (
|
||||
OvertCommandListener,
|
||||
ignore_deprecations,
|
||||
rs_or_single_client,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
from bson import decode_all
|
||||
@ -1226,6 +1227,59 @@ class TestCursor(IntegrationTest):
|
||||
else:
|
||||
self.assertEqual(0, len(listener.started_events))
|
||||
|
||||
@client_context.require_failCommand_appName
|
||||
def test_timeout_kills_cursor_asynchronously(self):
|
||||
listener = AllowListEventListener("killCursors")
|
||||
client = rs_or_single_client(event_listeners=[listener])
|
||||
self.addCleanup(client.close)
|
||||
coll = client[self.db.name].test_timeout_kills_cursor
|
||||
|
||||
# Add some test data.
|
||||
docs_inserted = 10
|
||||
coll.insert_many([{"i": i} for i in range(docs_inserted)])
|
||||
|
||||
listener.reset()
|
||||
|
||||
cursor = coll.find({}, batch_size=1)
|
||||
cursor.next()
|
||||
|
||||
# Mock getMore commands timing out.
|
||||
mock_timeout_errors = {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"errorCode": 50,
|
||||
"failCommands": ["getMore"],
|
||||
},
|
||||
}
|
||||
|
||||
with self.fail_point(mock_timeout_errors):
|
||||
with self.assertRaises(ExecutionTimeout):
|
||||
cursor.next()
|
||||
|
||||
def assertCursorKilled():
|
||||
wait_until(
|
||||
lambda: len(client._MongoClient__kill_cursors_queue) == 0,
|
||||
"waited for all killCursor requests to complete",
|
||||
)
|
||||
|
||||
self.assertEqual(1, len(listener.started_events))
|
||||
self.assertEqual("killCursors", listener.started_events[0].command_name)
|
||||
self.assertEqual(1, len(listener.succeeded_events))
|
||||
self.assertEqual("killCursors", listener.succeeded_events[0].command_name)
|
||||
|
||||
assertCursorKilled()
|
||||
listener.reset()
|
||||
|
||||
cursor = coll.aggregate([], batchSize=1)
|
||||
cursor.next()
|
||||
|
||||
with self.fail_point(mock_timeout_errors):
|
||||
with self.assertRaises(ExecutionTimeout):
|
||||
cursor.next()
|
||||
|
||||
assertCursorKilled()
|
||||
|
||||
def test_delete_not_initialized(self):
|
||||
# Creating a cursor with invalid arguments will not run __init__
|
||||
# but will still call __del__, eg test.find(invalidKwarg=1).
|
||||
|
||||
Loading…
Reference in New Issue
Block a user