diff --git a/pymongo/cursor_manager.py b/pymongo/cursor_manager.py index 3fd230e85..532247e1e 100644 --- a/pymongo/cursor_manager.py +++ b/pymongo/cursor_manager.py @@ -12,23 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""A manager to handle when cursors are killed after they are closed. +"""DEPRECATED - A manager to handle when cursors are killed after they are +closed. New cursor managers should be defined as subclasses of CursorManager and can be installed on a client by calling :meth:`~pymongo.mongo_client.MongoClient.set_cursor_manager`. +.. versionchanged:: 3.3 + Deprecated, for real this time. + .. versionchanged:: 3.0 Undeprecated. :meth:`~pymongo.cursor_manager.CursorManager.close` now requires an `address` argument. The ``BatchCursorManager`` class is removed. """ +import warnings import weakref from bson.py3compat import integer_types class CursorManager(object): - """The cursor manager base class.""" + """DEPRECATED - The cursor manager base class.""" def __init__(self, client): """Instantiate the manager. @@ -36,6 +41,10 @@ class CursorManager(object): :Parameters: - `client`: a MongoClient """ + warnings.warn( + "Cursor managers are deprecated.", + DeprecationWarning, + stacklevel=2) self.__client = weakref.ref(client) def close(self, cursor_id, address): diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 7b1c9e292..39e107ab8 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -34,7 +34,9 @@ access: import contextlib import datetime import threading +import warnings import weakref + from collections import defaultdict from bson.codec_options import DEFAULT_CODEC_OPTIONS @@ -358,7 +360,7 @@ class MongoClient(common.BaseObject): self.__default_database_name = dbase self.__lock = threading.Lock() - self.__cursor_manager = CursorManager(self) + self.__cursor_manager = None self.__kill_cursors_queue = [] self._event_listeners = options.pool_options.event_listeners @@ -712,7 +714,7 @@ class MongoClient(common.BaseObject): self._topology.close() def set_cursor_manager(self, manager_class): - """Set this client's cursor manager. + """DEPRECATED - Set this client's cursor manager. Raises :class:`TypeError` if `manager_class` is not a subclass of :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager @@ -723,9 +725,16 @@ class MongoClient(common.BaseObject): :Parameters: - `manager_class`: cursor manager to use + .. versionchanged:: 3.3 + Deprecated, for real this time. + .. versionchanged:: 3.0 Undeprecated. """ + warnings.warn( + "set_cursor_manager is Deprecated", + DeprecationWarning, + stacklevel=2) manager = manager_class(self) if not isinstance(manager, CursorManager): raise TypeError("manager_class must be a subclass of " @@ -920,12 +929,17 @@ class MongoClient(common.BaseObject): return database.Database(self, name) def close_cursor(self, cursor_id, address=None): - """Close a single database cursor. + """Send a kill cursors message soon with the given id. Raises :class:`TypeError` if `cursor_id` is not an instance of ``(int, long)``. What closing the cursor actually means depends on this client's cursor manager. + This method may be called from a :class:`~pymongo.cursor.Cursor` + destructor during garbage collection, so it isn't safe to take a + lock or do network I/O. Instead, we schedule the cursor to be closed + soon on a background thread. + :Parameters: - `cursor_id`: id of cursor to close - `address` (optional): (host, port) pair of the cursor's server. @@ -938,30 +952,36 @@ class MongoClient(common.BaseObject): if not isinstance(cursor_id, integer_types): raise TypeError("cursor_id must be an instance of (int, long)") - self.__cursor_manager.close(cursor_id, address) + if self.__cursor_manager is not None: + self.__cursor_manager.close(cursor_id, address) + else: + self.__kill_cursors_queue.append((address, [cursor_id])) def kill_cursors(self, cursor_ids, address=None): - """Send a kill cursors message soon with the given ids. + """DEPRECATED - Send a kill cursors message soon with the given ids. Raises :class:`TypeError` if `cursor_ids` is not an instance of ``list``. - This method may be called from a :class:`~pymongo.cursor.Cursor` - destructor during garbage collection, so it isn't safe to take a - lock or do network I/O. Instead, we schedule the cursor to be closed - soon on a background thread. - :Parameters: - `cursor_ids`: list of cursor ids to kill - `address` (optional): (host, port) pair of the cursor's server. If it is not provided, the client attempts to close the cursor on the primary or standalone, or a mongos server. + .. versionchanged:: 3.3 + Deprecated. + .. versionchanged:: 3.0 Now accepts an `address` argument. Schedules the cursors to be closed on a background thread instead of sending the message immediately. """ + warnings.warn( + "kill_cursors is deprecated.", + DeprecationWarning, + stacklevel=2) + if not isinstance(cursor_ids, list): raise TypeError("cursor_ids must be a list") diff --git a/test/test_client.py b/test/test_client.py index 2e2a11b2f..023591059 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -22,7 +22,6 @@ import struct import sys import time import traceback -import warnings sys.path[0:0] = [""] @@ -39,10 +38,8 @@ from pymongo.errors import (AutoReconnect, ConnectionFailure, InvalidName, OperationFailure, - CursorNotFound, NetworkTimeout, InvalidURI) -from pymongo.message import _CursorAddress from pymongo.mongo_client import MongoClient from pymongo.pool import SocketInfo from pymongo.read_preferences import ReadPreference @@ -807,68 +804,6 @@ class TestClient(IntegrationTest): new_sock_info = next(iter(pool.sockets)) self.assertEqual(old_sock_info, new_sock_info) - def test_kill_cursors_with_cursoraddress(self): - if (client_context.is_mongos - and not client_context.version.at_least(2, 4, 7)): - # Old mongos sends incorrectly formatted error response when - # cursor isn't found, see SERVER-9738. - raise SkipTest("Can't test kill_cursors against old mongos") - - self.collection = self.client.pymongo_test.test - self.collection.drop() - - self.collection.insert_many([{'_id': i} for i in range(200)]) - cursor = self.collection.find().batch_size(1) - next(cursor) - self.client.kill_cursors( - [cursor.cursor_id], - _CursorAddress(self.client.address, self.collection.full_name)) - - # Prevent killcursors from reaching the server while a getmore is in - # progress -- the server logs "Assertion: 16089:Cannot kill active - # cursor." - time.sleep(2) - - def raises_cursor_not_found(): - try: - next(cursor) - return False - except CursorNotFound: - return True - - wait_until(raises_cursor_not_found, 'close cursor') - - def test_kill_cursors_with_tuple(self): - if (client_context.is_mongos - and not client_context.version.at_least(2, 4, 7)): - # Old mongos sends incorrectly formatted error response when - # cursor isn't found, see SERVER-9738. - raise SkipTest("Can't test kill_cursors against old mongos") - - self.collection = self.client.pymongo_test.test - self.collection.drop() - - self.collection.insert_many([{'_id': i} for i in range(200)]) - cursor = self.collection.find().batch_size(1) - next(cursor) - self.client.kill_cursors( - [cursor.cursor_id], - self.client.address) - - # Prevent killcursors from reaching the server while a getmore is in - # progress -- the server logs "Assertion: 16089:Cannot kill active - # cursor." - time.sleep(2) - - def raises_cursor_not_found(): - try: - next(cursor) - return False - except CursorNotFound: - return True - - wait_until(raises_cursor_not_found, 'close cursor') - def test_lazy_connect_w0(self): # Ensure that connect-on-demand works when the first operation is # an unacknowledged write. This exercises _writable_max_wire_version(). diff --git a/test/test_cursor.py b/test/test_cursor.py index 7713debd4..632c889e3 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -30,9 +30,7 @@ from pymongo import (MongoClient, DESCENDING, ALL, OFF) -from pymongo.command_cursor import CommandCursor from pymongo.cursor import CursorType -from pymongo.cursor_manager import CursorManager from pymongo.errors import (InvalidOperation, OperationFailure, ExecutionTimeout) @@ -1217,38 +1215,6 @@ class TestCursor(IntegrationTest): next(cursor) self.assertRaises(InvalidOperation, cursor.comment, 'hello') - def test_cursor_transfer(self): - - # This is just a test, don't try this at home... - - client = client_context.rs_or_standalone_client - db = client.pymongo_test - - db.test.delete_many({}) - db.test.insert_many([{'_id': i} for i in range(200)]) - - class CManager(CursorManager): - def __init__(self, client): - super(CManager, self).__init__(client) - - def close(self, dummy, dummy2): - # Do absolutely nothing... - pass - - client.set_cursor_manager(CManager) - self.addCleanup(client.set_cursor_manager, CursorManager) - docs = [] - cursor = db.test.find().batch_size(10) - docs.append(next(cursor)) - cursor.close() - docs.extend(cursor) - self.assertEqual(len(docs), 10) - cmd_cursor = {'id': cursor.cursor_id, 'firstBatch': []} - ccursor = CommandCursor(cursor.collection, cmd_cursor, - cursor.address, retrieved=cursor.retrieved) - docs.extend(ccursor) - self.assertEqual(len(docs), 200) - def test_modifiers(self): cur = self.db.test.find() self.assertTrue('$query' not in cur._Cursor__query_spec()) diff --git a/test/test_cursor_manager.py b/test/test_cursor_manager.py index 1a1543f1f..9e24a6c69 100644 --- a/test/test_cursor_manager.py +++ b/test/test_cursor_manager.py @@ -15,9 +15,11 @@ """Test the cursor_manager module.""" import sys +import warnings sys.path[0:0] = [""] +from pymongo.command_cursor import CommandCursor from pymongo.cursor_manager import CursorManager from pymongo.errors import CursorNotFound from pymongo.message import _CursorAddress @@ -34,6 +36,10 @@ class TestCursorManager(IntegrationTest): @classmethod def setUpClass(cls): super(TestCursorManager, cls).setUpClass() + cls.warn_context = warnings.catch_warnings() + cls.warn_context.__enter__() + warnings.simplefilter("ignore", DeprecationWarning) + cls.collection = cls.db.test cls.collection.drop() @@ -42,6 +48,8 @@ class TestCursorManager(IntegrationTest): @classmethod def tearDownClass(cls): + cls.warn_context.__exit__() + cls.warn_context = None cls.collection.drop() def test_cursor_manager_validation(self): @@ -89,6 +97,37 @@ class TestCursorManager(IntegrationTest): wait_until(raises_cursor_not_found, 'close cursor') self.assertTrue(self.close_was_called) + def test_cursor_transfer(self): + + # This is just a test, don't try this at home... + + client = rs_or_single_client() + db = client.pymongo_test + + db.test.delete_many({}) + db.test.insert_many([{'_id': i} for i in range(200)]) + + class CManager(CursorManager): + def __init__(self, client): + super(CManager, self).__init__(client) + + def close(self, dummy, dummy2): + # Do absolutely nothing... + pass + + client.set_cursor_manager(CManager) + docs = [] + cursor = db.test.find().batch_size(10) + docs.append(next(cursor)) + cursor.close() + docs.extend(cursor) + self.assertEqual(len(docs), 10) + cmd_cursor = {'id': cursor.cursor_id, 'firstBatch': []} + ccursor = CommandCursor(cursor.collection, cmd_cursor, + cursor.address, retrieved=cursor.retrieved) + docs.extend(ccursor) + self.assertEqual(len(docs), 200) + if __name__ == "__main__": unittest.main() diff --git a/test/test_legacy_api.py b/test/test_legacy_api.py index db5d2cedc..bac3a24a2 100644 --- a/test/test_legacy_api.py +++ b/test/test_legacy_api.py @@ -28,18 +28,20 @@ from bson.objectid import ObjectId from bson.son import SON from pymongo import ASCENDING, DESCENDING from pymongo.errors import (ConfigurationError, + CursorNotFound, DocumentTooLarge, DuplicateKeyError, InvalidDocument, InvalidOperation, OperationFailure, WTimeoutError) +from pymongo.message import _CursorAddress from pymongo.son_manipulator import (AutoReference, NamespaceInjector, ObjectIdShuffler, SONManipulator) from pymongo.write_concern import WriteConcern -from test import client_context, qcheck, unittest +from test import client_context, qcheck, unittest, SkipTest from test.test_client import IntegrationTest from test.utils import (joinall, oid_generated_on_client, @@ -1205,6 +1207,68 @@ class TestLegacy(IntegrationTest): self.assertEqual(10001, coll.count()) coll.drop() + def test_kill_cursors_with_cursoraddress(self): + if (client_context.is_mongos + and not client_context.version.at_least(2, 4, 7)): + # Old mongos sends incorrectly formatted error response when + # cursor isn't found, see SERVER-9738. + raise SkipTest("Can't test kill_cursors against old mongos") + + coll = self.client.pymongo_test.test + coll.drop() + + coll.insert_many([{'_id': i} for i in range(200)]) + cursor = coll.find().batch_size(1) + next(cursor) + self.client.kill_cursors( + [cursor.cursor_id], + _CursorAddress(self.client.address, coll.full_name)) + + # Prevent killcursors from reaching the server while a getmore is in + # progress -- the server logs "Assertion: 16089:Cannot kill active + # cursor." + time.sleep(2) + + def raises_cursor_not_found(): + try: + next(cursor) + return False + except CursorNotFound: + return True + + wait_until(raises_cursor_not_found, 'close cursor') + + def test_kill_cursors_with_tuple(self): + if (client_context.is_mongos + and not client_context.version.at_least(2, 4, 7)): + # Old mongos sends incorrectly formatted error response when + # cursor isn't found, see SERVER-9738. + raise SkipTest("Can't test kill_cursors against old mongos") + + coll = self.client.pymongo_test.test + coll.drop() + + coll.insert_many([{'_id': i} for i in range(200)]) + cursor = coll.find().batch_size(1) + next(cursor) + self.client.kill_cursors( + [cursor.cursor_id], + self.client.address) + + # Prevent killcursors from reaching the server while a getmore is in + # progress -- the server logs "Assertion: 16089:Cannot kill active + # cursor." + time.sleep(2) + + def raises_cursor_not_found(): + try: + next(cursor) + return False + except CursorNotFound: + return True + + wait_until(raises_cursor_not_found, 'close cursor') + if __name__ == "__main__": unittest.main()