From d82946334eaf6209a0c40afd23257c6f581deaa0 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 20 Oct 2023 13:42:59 -0700 Subject: [PATCH] PYTHON-3953 - PyMongo should send killCursors on MaxTimeMSExpired Error (#1372) --- pymongo/command_cursor.py | 7 +++-- pymongo/cursor.py | 7 ++--- test/test_client.py | 4 +++ test/test_cursor.py | 54 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index e0258f90f..42becece2 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -183,8 +183,11 @@ class CommandCursor(Generic[_DocumentType]): if exc.code in _CURSOR_CLOSED_ERRORS: # Don't send killCursors because the cursor is already closed. self.__killed = True - # Return the session and pinned connection, if necessary. - self.close() + if exc.timeout: + self.__die(False) + else: + # Return the session and pinned connection, if necessary. + self.close() raise except ConnectionFailure: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/cursor.py b/pymongo/cursor.py index df154067a..6dfb3ba90 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -73,7 +73,6 @@ if TYPE_CHECKING: _CURSOR_CLOSED_ERRORS = frozenset( [ 43, # CursorNotFound - 50, # MaxTimeMSExpired 175, # QueryPlanKilled 237, # CursorKilled # On a tailable cursor, the following errors mean the capped collection @@ -1065,7 +1064,10 @@ class Cursor(Generic[_DocumentType]): if exc.code in _CURSOR_CLOSED_ERRORS or self.__exhaust: # Don't send killCursors because the cursor is already closed. self.__killed = True - self.close() + if exc.timeout: + self.__die(False) + else: + self.close() # If this is a tailable cursor the error is likely # due to capped collection roll over. Setting # self.__killed to True ensures Cursor.alive will be @@ -1077,7 +1079,6 @@ class Cursor(Generic[_DocumentType]): return raise except ConnectionFailure: - # Don't send killCursors because the cursor is already closed. self.__killed = True self.close() raise diff --git a/test/test_client.py b/test/test_client.py index c929b7525..c9d1de7e2 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1961,6 +1961,10 @@ class TestExhaustCursor(IntegrationTest): self.assertRaises(ConnectionFailure, list, cursor) self.assertTrue(conn.closed) + wait_until( + lambda: len(client._MongoClient__kill_cursors_queue) == 0, + "waited for all killCursor requests to complete", + ) # The socket was closed and the semaphore was decremented. self.assertNotIn(conn, pool.conns) self.assertEqual(0, pool.requests) diff --git a/test/test_cursor.py b/test/test_cursor.py index 284a5ac97..212de4d28 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -35,6 +35,7 @@ from test.utils import ( OvertCommandListener, ignore_deprecations, rs_or_single_client, + wait_until, ) from bson import decode_all @@ -1226,6 +1227,59 @@ class TestCursor(IntegrationTest): else: self.assertEqual(0, len(listener.started_events)) + @client_context.require_failCommand_appName + def test_timeout_kills_cursor_asynchronously(self): + listener = AllowListEventListener("killCursors") + client = rs_or_single_client(event_listeners=[listener]) + self.addCleanup(client.close) + coll = client[self.db.name].test_timeout_kills_cursor + + # Add some test data. + docs_inserted = 10 + coll.insert_many([{"i": i} for i in range(docs_inserted)]) + + listener.reset() + + cursor = coll.find({}, batch_size=1) + cursor.next() + + # Mock getMore commands timing out. + mock_timeout_errors = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "errorCode": 50, + "failCommands": ["getMore"], + }, + } + + with self.fail_point(mock_timeout_errors): + with self.assertRaises(ExecutionTimeout): + cursor.next() + + def assertCursorKilled(): + wait_until( + lambda: len(client._MongoClient__kill_cursors_queue) == 0, + "waited for all killCursor requests to complete", + ) + + self.assertEqual(1, len(listener.started_events)) + self.assertEqual("killCursors", listener.started_events[0].command_name) + self.assertEqual(1, len(listener.succeeded_events)) + self.assertEqual("killCursors", listener.succeeded_events[0].command_name) + + assertCursorKilled() + listener.reset() + + cursor = coll.aggregate([], batchSize=1) + cursor.next() + + with self.fail_point(mock_timeout_errors): + with self.assertRaises(ExecutionTimeout): + cursor.next() + + assertCursorKilled() + def test_delete_not_initialized(self): # Creating a cursor with invalid arguments will not run __init__ # but will still call __del__, eg test.find(invalidKwarg=1).