From 6aaa1f71aadbc156d9f06bb79ba67b4d018db3ea Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 2 May 2017 11:57:05 -0700 Subject: [PATCH] PYTHON-1269 Kill cursors synchronously in Cursor.close and MongoClient.close. --- pymongo/cursor.py | 25 ++++---- pymongo/mongo_client.py | 134 ++++++++++++++++++++++------------------ test/test_client.py | 32 ++++++++++ test/test_cursor.py | 31 +++++++++- 4 files changed, 151 insertions(+), 71 deletions(-) diff --git a/pymongo/cursor.py b/pymongo/cursor.py index e5fc46b29..f663f6bc2 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -115,6 +115,8 @@ class Cursor(object): .. mongodoc:: cursors """ self.__id = None + self.__exhaust = False + self.__exhaust_mgr = None spec = filter if spec is None: @@ -163,8 +165,6 @@ class Cursor(object): self.__collation = validate_collation_or_none(collation) # Exhaust cursor support - self.__exhaust = False - self.__exhaust_mgr = None if cursor_type == CursorType.EXHAUST: if self.__collection.database.client.is_mongos: raise InvalidOperation('Exhaust cursors are ' @@ -214,8 +214,7 @@ class Cursor(object): return self.__retrieved def __del__(self): - if self.__id and not self.__killed: - self.__die() + self.__die() def rewind(self): """Rewind this cursor to its unevaluated state. @@ -264,7 +263,7 @@ class Cursor(object): """ return Cursor(self.__collection) - def __die(self): + def __die(self, synchronous=False): """Closes this cursor. """ if self.__id and not self.__killed: @@ -274,10 +273,14 @@ class Cursor(object): # to stop the server from sending more data. self.__exhaust_mgr.sock.close() else: - self.__collection.database.client.close_cursor( - self.__id, - _CursorAddress( - self.__address, self.__collection.full_name)) + address = _CursorAddress( + self.__address, self.__collection.full_name) + if synchronous: + self.__collection.database.client._close_cursor_now( + self.__id, address) + else: + self.__collection.database.client.close_cursor( + self.__id, address) if self.__exhaust and self.__exhaust_mgr: self.__exhaust_mgr.close() self.__killed = True @@ -287,7 +290,7 @@ class Cursor(object): other Python implementations that don't use reference counting garbage collection. """ - self.__die() + self.__die(True) def __query_spec(self): """Get the spec to use for a query. @@ -1126,7 +1129,7 @@ class Cursor(object): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.__die() + self.close() def __copy__(self): """Support function for `copy.copy()`. diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index a5c52471a..a14902174 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -778,6 +778,9 @@ class MongoClient(common.BaseObject): If this instance is used again it will be automatically re-opened and the threads restarted. """ + # Run _process_periodic_tasks to send pending killCursor requests + # before closing the topology. + self._process_periodic_tasks() self._topology.close() def set_cursor_manager(self, manager_class): @@ -1024,6 +1027,21 @@ class MongoClient(common.BaseObject): else: self.__kill_cursors_queue.append((address, [cursor_id])) + def _close_cursor_now(self, cursor_id, address=None): + """Send a kill cursors message with the given id. + + What closing the cursor actually means depends on this client's + cursor manager. If there is none, the cursor is closed synchronously + on the current thread. + """ + if not isinstance(cursor_id, integer_types): + raise TypeError("cursor_id must be an instance of (int, long)") + + if self.__cursor_manager is not None: + self.__cursor_manager.close(cursor_id, address) + else: + self._kill_cursors([cursor_id], address, self._get_topology()) + def kill_cursors(self, cursor_ids, address=None): """DEPRECATED - Send a kill cursors message soon with the given ids. @@ -1055,6 +1073,62 @@ class MongoClient(common.BaseObject): # "Atomic", needs no lock. self.__kill_cursors_queue.append((address, cursor_ids)) + def _kill_cursors(self, cursor_ids, address, topology): + """Send a kill cursors message with the given ids.""" + listeners = self._event_listeners + publish = listeners.enabled_for_commands + if address: + # address could be a tuple or _CursorAddress, but + # select_server_by_address needs (host, port). + server = topology.select_server_by_address(tuple(address)) + else: + # Application called close_cursor() with no address. + server = topology.select_server(writable_server_selector) + + try: + namespace = address.namespace + db, coll = namespace.split('.', 1) + except AttributeError: + namespace = None + db = coll = "OP_KILL_CURSORS" + + spec = SON([('killCursors', coll), ('cursors', cursor_ids)]) + with server.get_socket(self.__all_credentials) as sock_info: + if sock_info.max_wire_version >= 4 and namespace is not None: + sock_info.command(db, spec) + else: + if publish: + start = datetime.datetime.now() + request_id, msg = message.kill_cursors(cursor_ids) + if publish: + duration = datetime.datetime.now() - start + # Here and below, address could be a tuple or + # _CursorAddress. We always want to publish a + # tuple to match the rest of the monitoring + # API. + listeners.publish_command_start( + spec, db, request_id, tuple(address)) + start = datetime.datetime.now() + + try: + sock_info.send_message(msg, 0) + except Exception as exc: + if publish: + dur = ((datetime.datetime.now() - start) + duration) + listeners.publish_command_failure( + dur, message._convert_exception(exc), + 'killCursors', request_id, + tuple(address)) + raise + + if publish: + duration = ((datetime.datetime.now() - start) + duration) + # OP_KILL_CURSORS returns no reply, fake one. + reply = {'cursorsUnknown': cursor_ids, 'ok': 1} + listeners.publish_command_success( + duration, reply, 'killCursors', request_id, + tuple(address)) + # This method is run periodically by a background thread. def _process_periodic_tasks(self): """Process any pending kill cursors requests and @@ -1072,68 +1146,10 @@ class MongoClient(common.BaseObject): # Don't re-open topology if it's closed and there's no pending cursors. if address_to_cursor_ids: - listeners = self._event_listeners - publish = listeners.enabled_for_commands topology = self._get_topology() for address, cursor_ids in address_to_cursor_ids.items(): try: - if address: - # address could be a tuple or _CursorAddress, but - # select_server_by_address needs (host, port). - server = topology.select_server_by_address( - tuple(address)) - else: - # Application called close_cursor() with no address. - server = topology.select_server( - writable_server_selector) - - try: - namespace = address.namespace - db, coll = namespace.split('.', 1) - except AttributeError: - namespace = None - db = coll = "OP_KILL_CURSORS" - - spec = SON([('killCursors', coll), - ('cursors', cursor_ids)]) - with server.get_socket(self.__all_credentials) as sock_info: - if (sock_info.max_wire_version >= 4 and - namespace is not None): - sock_info.command(db, spec) - else: - if publish: - start = datetime.datetime.now() - request_id, msg = message.kill_cursors(cursor_ids) - if publish: - duration = datetime.datetime.now() - start - # Here and below, address could be a tuple or - # _CursorAddress. We always want to publish a - # tuple to match the rest of the monitoring - # API. - listeners.publish_command_start( - spec, db, request_id, tuple(address)) - start = datetime.datetime.now() - - try: - sock_info.send_message(msg, 0) - except Exception as exc: - if publish: - dur = ((datetime.datetime.now() - start) - + duration) - listeners.publish_command_failure( - dur, message._convert_exception(exc), - 'killCursors', request_id, tuple(address)) - raise - - if publish: - duration = ((datetime.datetime.now() - start) - + duration) - # OP_KILL_CURSORS returns no reply, fake one. - reply = {'cursorsUnknown': cursor_ids, 'ok': 1} - listeners.publish_command_success( - duration, reply, 'killCursors', request_id, - tuple(address)) - + self._kill_cursors(cursor_ids, address, topology) except Exception: helpers._handle_exception() try: diff --git a/test/test_client.py b/test/test_client.py index a26ce804a..c56a5e13a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -16,6 +16,7 @@ import contextlib import datetime +import gc import os import signal import socket @@ -509,6 +510,37 @@ class TestClient(IntegrationTest): coll.count() + def test_close_kills_cursors(self): + if sys.platform.startswith('java'): + # We can't figure out how to make this test reliable with Jython. + raise SkipTest("Can't test with Jython") + # Kill any cursors possibly queued up by previous tests. + gc.collect() + self.client._process_periodic_tasks() + + # Add some test data. + coll = self.client.pymongo_test.test_close_kills_cursors + docs_inserted = 1000 + coll.insert_many([{"i": i} for i in range(docs_inserted)]) + + # Open a cursor and leave it open on the server. + cursor = coll.find().batch_size(10) + self.assertTrue(bool(next(cursor))) + self.assertLess(cursor.retrieved, docs_inserted) + del cursor + # Required for PyPy, Jython and other Python implementations that + # don't use reference counting garbage collection. + gc.collect() + + # Close the client and ensure the topology is closed. + self.assertTrue(self.client._topology._opened) + self.client.close() + self.assertFalse(self.client._topology._opened) + + # The killCursors task should not need to re-open the topology. + self.client._process_periodic_tasks() + self.assertFalse(self.client._topology._opened) + def test_bad_uri(self): with self.assertRaises(InvalidURI): MongoClient("http://localhost") diff --git a/test/test_cursor.py b/test/test_cursor.py index deb7b638d..51ad309dd 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -14,6 +14,7 @@ """Test the cursor module.""" import copy +import gc import itertools import random import re @@ -1126,7 +1127,6 @@ class TestCursor(IntegrationTest): self.assertEqual(3, db.test.count()) - def test_distinct(self): self.db.drop_collection("test") @@ -1245,6 +1245,35 @@ class TestCursor(IntegrationTest): self.assertTrue(cursor.alive) + def test_close_kills_cursor_synchronously(self): + # Kill any cursors possibly queued up by previous tests. + gc.collect() + self.client._process_periodic_tasks() + + listener = WhiteListEventListener("killCursors") + results = listener.results + client = rs_or_single_client(event_listeners=[listener]) + self.addCleanup(client.close) + coll = client[self.db.name].test_close_kills_cursors + + # Add some test data. + docs_inserted = 1000 + coll.insert_many([{"i": i} for i in range(docs_inserted)]) + + results.clear() + + # Close the cursor while it's still open on the server. + cursor = coll.find().batch_size(10) + self.assertTrue(bool(next(cursor))) + self.assertLess(cursor.retrieved, docs_inserted) + cursor.close() + + # Test that the cursor was closed. + self.assertEqual(1, len(results["started"])) + self.assertEqual("killCursors", results["started"][0].command_name) + self.assertEqual(1, len(results["succeeded"])) + self.assertEqual("killCursors", results["succeeded"][0].command_name) + if __name__ == "__main__": unittest.main()