PYTHON-988 - Deprecate cursor managers and kill_cursors
This commit is contained in:
parent
057429cfca
commit
e1850d8abe
@ -12,23 +12,28 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""A manager to handle when cursors are killed after they are closed.
|
||||
"""DEPRECATED - A manager to handle when cursors are killed after they are
|
||||
closed.
|
||||
|
||||
New cursor managers should be defined as subclasses of CursorManager and can be
|
||||
installed on a client by calling
|
||||
:meth:`~pymongo.mongo_client.MongoClient.set_cursor_manager`.
|
||||
|
||||
.. versionchanged:: 3.3
|
||||
Deprecated, for real this time.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Undeprecated. :meth:`~pymongo.cursor_manager.CursorManager.close` now
|
||||
requires an `address` argument. The ``BatchCursorManager`` class is removed.
|
||||
"""
|
||||
|
||||
import warnings
|
||||
import weakref
|
||||
from bson.py3compat import integer_types
|
||||
|
||||
|
||||
class CursorManager(object):
|
||||
"""The cursor manager base class."""
|
||||
"""DEPRECATED - The cursor manager base class."""
|
||||
|
||||
def __init__(self, client):
|
||||
"""Instantiate the manager.
|
||||
@ -36,6 +41,10 @@ class CursorManager(object):
|
||||
:Parameters:
|
||||
- `client`: a MongoClient
|
||||
"""
|
||||
warnings.warn(
|
||||
"Cursor managers are deprecated.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2)
|
||||
self.__client = weakref.ref(client)
|
||||
|
||||
def close(self, cursor_id, address):
|
||||
|
||||
@ -34,7 +34,9 @@ access:
|
||||
import contextlib
|
||||
import datetime
|
||||
import threading
|
||||
import warnings
|
||||
import weakref
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from bson.codec_options import DEFAULT_CODEC_OPTIONS
|
||||
@ -358,7 +360,7 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
self.__default_database_name = dbase
|
||||
self.__lock = threading.Lock()
|
||||
self.__cursor_manager = CursorManager(self)
|
||||
self.__cursor_manager = None
|
||||
self.__kill_cursors_queue = []
|
||||
|
||||
self._event_listeners = options.pool_options.event_listeners
|
||||
@ -712,7 +714,7 @@ class MongoClient(common.BaseObject):
|
||||
self._topology.close()
|
||||
|
||||
def set_cursor_manager(self, manager_class):
|
||||
"""Set this client's cursor manager.
|
||||
"""DEPRECATED - Set this client's cursor manager.
|
||||
|
||||
Raises :class:`TypeError` if `manager_class` is not a subclass of
|
||||
:class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
|
||||
@ -723,9 +725,16 @@ class MongoClient(common.BaseObject):
|
||||
:Parameters:
|
||||
- `manager_class`: cursor manager to use
|
||||
|
||||
.. versionchanged:: 3.3
|
||||
Deprecated, for real this time.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Undeprecated.
|
||||
"""
|
||||
warnings.warn(
|
||||
"set_cursor_manager is Deprecated",
|
||||
DeprecationWarning,
|
||||
stacklevel=2)
|
||||
manager = manager_class(self)
|
||||
if not isinstance(manager, CursorManager):
|
||||
raise TypeError("manager_class must be a subclass of "
|
||||
@ -920,12 +929,17 @@ class MongoClient(common.BaseObject):
|
||||
return database.Database(self, name)
|
||||
|
||||
def close_cursor(self, cursor_id, address=None):
|
||||
"""Close a single database cursor.
|
||||
"""Send a kill cursors message soon with the given id.
|
||||
|
||||
Raises :class:`TypeError` if `cursor_id` is not an instance of
|
||||
``(int, long)``. What closing the cursor actually means
|
||||
depends on this client's cursor manager.
|
||||
|
||||
This method may be called from a :class:`~pymongo.cursor.Cursor`
|
||||
destructor during garbage collection, so it isn't safe to take a
|
||||
lock or do network I/O. Instead, we schedule the cursor to be closed
|
||||
soon on a background thread.
|
||||
|
||||
:Parameters:
|
||||
- `cursor_id`: id of cursor to close
|
||||
- `address` (optional): (host, port) pair of the cursor's server.
|
||||
@ -938,30 +952,36 @@ class MongoClient(common.BaseObject):
|
||||
if not isinstance(cursor_id, integer_types):
|
||||
raise TypeError("cursor_id must be an instance of (int, long)")
|
||||
|
||||
self.__cursor_manager.close(cursor_id, address)
|
||||
if self.__cursor_manager is not None:
|
||||
self.__cursor_manager.close(cursor_id, address)
|
||||
else:
|
||||
self.__kill_cursors_queue.append((address, [cursor_id]))
|
||||
|
||||
def kill_cursors(self, cursor_ids, address=None):
|
||||
"""Send a kill cursors message soon with the given ids.
|
||||
"""DEPRECATED - Send a kill cursors message soon with the given ids.
|
||||
|
||||
Raises :class:`TypeError` if `cursor_ids` is not an instance of
|
||||
``list``.
|
||||
|
||||
This method may be called from a :class:`~pymongo.cursor.Cursor`
|
||||
destructor during garbage collection, so it isn't safe to take a
|
||||
lock or do network I/O. Instead, we schedule the cursor to be closed
|
||||
soon on a background thread.
|
||||
|
||||
:Parameters:
|
||||
- `cursor_ids`: list of cursor ids to kill
|
||||
- `address` (optional): (host, port) pair of the cursor's server.
|
||||
If it is not provided, the client attempts to close the cursor on
|
||||
the primary or standalone, or a mongos server.
|
||||
|
||||
.. versionchanged:: 3.3
|
||||
Deprecated.
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Now accepts an `address` argument. Schedules the cursors to be
|
||||
closed on a background thread instead of sending the message
|
||||
immediately.
|
||||
"""
|
||||
warnings.warn(
|
||||
"kill_cursors is deprecated.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2)
|
||||
|
||||
if not isinstance(cursor_ids, list):
|
||||
raise TypeError("cursor_ids must be a list")
|
||||
|
||||
|
||||
@ -22,7 +22,6 @@ import struct
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import warnings
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
@ -39,10 +38,8 @@ from pymongo.errors import (AutoReconnect,
|
||||
ConnectionFailure,
|
||||
InvalidName,
|
||||
OperationFailure,
|
||||
CursorNotFound,
|
||||
NetworkTimeout,
|
||||
InvalidURI)
|
||||
from pymongo.message import _CursorAddress
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.pool import SocketInfo
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
@ -807,68 +804,6 @@ class TestClient(IntegrationTest):
|
||||
new_sock_info = next(iter(pool.sockets))
|
||||
self.assertEqual(old_sock_info, new_sock_info)
|
||||
|
||||
def test_kill_cursors_with_cursoraddress(self):
|
||||
if (client_context.is_mongos
|
||||
and not client_context.version.at_least(2, 4, 7)):
|
||||
# Old mongos sends incorrectly formatted error response when
|
||||
# cursor isn't found, see SERVER-9738.
|
||||
raise SkipTest("Can't test kill_cursors against old mongos")
|
||||
|
||||
self.collection = self.client.pymongo_test.test
|
||||
self.collection.drop()
|
||||
|
||||
self.collection.insert_many([{'_id': i} for i in range(200)])
|
||||
cursor = self.collection.find().batch_size(1)
|
||||
next(cursor)
|
||||
self.client.kill_cursors(
|
||||
[cursor.cursor_id],
|
||||
_CursorAddress(self.client.address, self.collection.full_name))
|
||||
|
||||
# Prevent killcursors from reaching the server while a getmore is in
|
||||
# progress -- the server logs "Assertion: 16089:Cannot kill active
|
||||
# cursor."
|
||||
time.sleep(2)
|
||||
|
||||
def raises_cursor_not_found():
|
||||
try:
|
||||
next(cursor)
|
||||
return False
|
||||
except CursorNotFound:
|
||||
return True
|
||||
|
||||
wait_until(raises_cursor_not_found, 'close cursor')
|
||||
|
||||
def test_kill_cursors_with_tuple(self):
|
||||
if (client_context.is_mongos
|
||||
and not client_context.version.at_least(2, 4, 7)):
|
||||
# Old mongos sends incorrectly formatted error response when
|
||||
# cursor isn't found, see SERVER-9738.
|
||||
raise SkipTest("Can't test kill_cursors against old mongos")
|
||||
|
||||
self.collection = self.client.pymongo_test.test
|
||||
self.collection.drop()
|
||||
|
||||
self.collection.insert_many([{'_id': i} for i in range(200)])
|
||||
cursor = self.collection.find().batch_size(1)
|
||||
next(cursor)
|
||||
self.client.kill_cursors(
|
||||
[cursor.cursor_id],
|
||||
self.client.address)
|
||||
|
||||
# Prevent killcursors from reaching the server while a getmore is in
|
||||
# progress -- the server logs "Assertion: 16089:Cannot kill active
|
||||
# cursor."
|
||||
time.sleep(2)
|
||||
|
||||
def raises_cursor_not_found():
|
||||
try:
|
||||
next(cursor)
|
||||
return False
|
||||
except CursorNotFound:
|
||||
return True
|
||||
|
||||
wait_until(raises_cursor_not_found, 'close cursor')
|
||||
|
||||
def test_lazy_connect_w0(self):
|
||||
# Ensure that connect-on-demand works when the first operation is
|
||||
# an unacknowledged write. This exercises _writable_max_wire_version().
|
||||
|
||||
@ -30,9 +30,7 @@ from pymongo import (MongoClient,
|
||||
DESCENDING,
|
||||
ALL,
|
||||
OFF)
|
||||
from pymongo.command_cursor import CommandCursor
|
||||
from pymongo.cursor import CursorType
|
||||
from pymongo.cursor_manager import CursorManager
|
||||
from pymongo.errors import (InvalidOperation,
|
||||
OperationFailure,
|
||||
ExecutionTimeout)
|
||||
@ -1217,38 +1215,6 @@ class TestCursor(IntegrationTest):
|
||||
next(cursor)
|
||||
self.assertRaises(InvalidOperation, cursor.comment, 'hello')
|
||||
|
||||
def test_cursor_transfer(self):
|
||||
|
||||
# This is just a test, don't try this at home...
|
||||
|
||||
client = client_context.rs_or_standalone_client
|
||||
db = client.pymongo_test
|
||||
|
||||
db.test.delete_many({})
|
||||
db.test.insert_many([{'_id': i} for i in range(200)])
|
||||
|
||||
class CManager(CursorManager):
|
||||
def __init__(self, client):
|
||||
super(CManager, self).__init__(client)
|
||||
|
||||
def close(self, dummy, dummy2):
|
||||
# Do absolutely nothing...
|
||||
pass
|
||||
|
||||
client.set_cursor_manager(CManager)
|
||||
self.addCleanup(client.set_cursor_manager, CursorManager)
|
||||
docs = []
|
||||
cursor = db.test.find().batch_size(10)
|
||||
docs.append(next(cursor))
|
||||
cursor.close()
|
||||
docs.extend(cursor)
|
||||
self.assertEqual(len(docs), 10)
|
||||
cmd_cursor = {'id': cursor.cursor_id, 'firstBatch': []}
|
||||
ccursor = CommandCursor(cursor.collection, cmd_cursor,
|
||||
cursor.address, retrieved=cursor.retrieved)
|
||||
docs.extend(ccursor)
|
||||
self.assertEqual(len(docs), 200)
|
||||
|
||||
def test_modifiers(self):
|
||||
cur = self.db.test.find()
|
||||
self.assertTrue('$query' not in cur._Cursor__query_spec())
|
||||
|
||||
@ -15,9 +15,11 @@
|
||||
"""Test the cursor_manager module."""
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from pymongo.command_cursor import CommandCursor
|
||||
from pymongo.cursor_manager import CursorManager
|
||||
from pymongo.errors import CursorNotFound
|
||||
from pymongo.message import _CursorAddress
|
||||
@ -34,6 +36,10 @@ class TestCursorManager(IntegrationTest):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestCursorManager, cls).setUpClass()
|
||||
cls.warn_context = warnings.catch_warnings()
|
||||
cls.warn_context.__enter__()
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
|
||||
cls.collection = cls.db.test
|
||||
cls.collection.drop()
|
||||
|
||||
@ -42,6 +48,8 @@ class TestCursorManager(IntegrationTest):
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.warn_context.__exit__()
|
||||
cls.warn_context = None
|
||||
cls.collection.drop()
|
||||
|
||||
def test_cursor_manager_validation(self):
|
||||
@ -89,6 +97,37 @@ class TestCursorManager(IntegrationTest):
|
||||
wait_until(raises_cursor_not_found, 'close cursor')
|
||||
self.assertTrue(self.close_was_called)
|
||||
|
||||
def test_cursor_transfer(self):
|
||||
|
||||
# This is just a test, don't try this at home...
|
||||
|
||||
client = rs_or_single_client()
|
||||
db = client.pymongo_test
|
||||
|
||||
db.test.delete_many({})
|
||||
db.test.insert_many([{'_id': i} for i in range(200)])
|
||||
|
||||
class CManager(CursorManager):
|
||||
def __init__(self, client):
|
||||
super(CManager, self).__init__(client)
|
||||
|
||||
def close(self, dummy, dummy2):
|
||||
# Do absolutely nothing...
|
||||
pass
|
||||
|
||||
client.set_cursor_manager(CManager)
|
||||
docs = []
|
||||
cursor = db.test.find().batch_size(10)
|
||||
docs.append(next(cursor))
|
||||
cursor.close()
|
||||
docs.extend(cursor)
|
||||
self.assertEqual(len(docs), 10)
|
||||
cmd_cursor = {'id': cursor.cursor_id, 'firstBatch': []}
|
||||
ccursor = CommandCursor(cursor.collection, cmd_cursor,
|
||||
cursor.address, retrieved=cursor.retrieved)
|
||||
docs.extend(ccursor)
|
||||
self.assertEqual(len(docs), 200)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -28,18 +28,20 @@ from bson.objectid import ObjectId
|
||||
from bson.son import SON
|
||||
from pymongo import ASCENDING, DESCENDING
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
CursorNotFound,
|
||||
DocumentTooLarge,
|
||||
DuplicateKeyError,
|
||||
InvalidDocument,
|
||||
InvalidOperation,
|
||||
OperationFailure,
|
||||
WTimeoutError)
|
||||
from pymongo.message import _CursorAddress
|
||||
from pymongo.son_manipulator import (AutoReference,
|
||||
NamespaceInjector,
|
||||
ObjectIdShuffler,
|
||||
SONManipulator)
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from test import client_context, qcheck, unittest
|
||||
from test import client_context, qcheck, unittest, SkipTest
|
||||
from test.test_client import IntegrationTest
|
||||
from test.utils import (joinall,
|
||||
oid_generated_on_client,
|
||||
@ -1205,6 +1207,68 @@ class TestLegacy(IntegrationTest):
|
||||
self.assertEqual(10001, coll.count())
|
||||
coll.drop()
|
||||
|
||||
def test_kill_cursors_with_cursoraddress(self):
|
||||
if (client_context.is_mongos
|
||||
and not client_context.version.at_least(2, 4, 7)):
|
||||
# Old mongos sends incorrectly formatted error response when
|
||||
# cursor isn't found, see SERVER-9738.
|
||||
raise SkipTest("Can't test kill_cursors against old mongos")
|
||||
|
||||
coll = self.client.pymongo_test.test
|
||||
coll.drop()
|
||||
|
||||
coll.insert_many([{'_id': i} for i in range(200)])
|
||||
cursor = coll.find().batch_size(1)
|
||||
next(cursor)
|
||||
self.client.kill_cursors(
|
||||
[cursor.cursor_id],
|
||||
_CursorAddress(self.client.address, coll.full_name))
|
||||
|
||||
# Prevent killcursors from reaching the server while a getmore is in
|
||||
# progress -- the server logs "Assertion: 16089:Cannot kill active
|
||||
# cursor."
|
||||
time.sleep(2)
|
||||
|
||||
def raises_cursor_not_found():
|
||||
try:
|
||||
next(cursor)
|
||||
return False
|
||||
except CursorNotFound:
|
||||
return True
|
||||
|
||||
wait_until(raises_cursor_not_found, 'close cursor')
|
||||
|
||||
def test_kill_cursors_with_tuple(self):
|
||||
if (client_context.is_mongos
|
||||
and not client_context.version.at_least(2, 4, 7)):
|
||||
# Old mongos sends incorrectly formatted error response when
|
||||
# cursor isn't found, see SERVER-9738.
|
||||
raise SkipTest("Can't test kill_cursors against old mongos")
|
||||
|
||||
coll = self.client.pymongo_test.test
|
||||
coll.drop()
|
||||
|
||||
coll.insert_many([{'_id': i} for i in range(200)])
|
||||
cursor = coll.find().batch_size(1)
|
||||
next(cursor)
|
||||
self.client.kill_cursors(
|
||||
[cursor.cursor_id],
|
||||
self.client.address)
|
||||
|
||||
# Prevent killcursors from reaching the server while a getmore is in
|
||||
# progress -- the server logs "Assertion: 16089:Cannot kill active
|
||||
# cursor."
|
||||
time.sleep(2)
|
||||
|
||||
def raises_cursor_not_found():
|
||||
try:
|
||||
next(cursor)
|
||||
return False
|
||||
except CursorNotFound:
|
||||
return True
|
||||
|
||||
wait_until(raises_cursor_not_found, 'close cursor')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user