Gracefully kill cursor on capped rollover PYTHON-637
This change does a few things: - Raises a new exception for CursorNotFound, inheriting from OperationFailure so we don't break existing code. - Catches the exception in cursor.Cursor and command_cursor.CommandCursor, setting __killed to True. - If the cursor is not tailable, re-raises the exception. This makes it easier to deal with capped collection rollover when iterating a tailable cursor.
This commit is contained in:
parent
940d73f672
commit
e3c809bd02
@ -98,11 +98,11 @@ For `Twisted <http://twistedmatrix.com/>`_, see `TxMongo
|
||||
<http://github.com/fiorix/mongo-async-python-driver>`_. Compared to PyMongo,
|
||||
TxMongo is less stable, lacks features, and is less actively maintained.
|
||||
|
||||
What does *OperationFailure* cursor id not valid at server mean?
|
||||
----------------------------------------------------------------
|
||||
What does *CursorNotFound* cursor id not valid at server mean?
|
||||
--------------------------------------------------------------
|
||||
Cursors in MongoDB can timeout on the server if they've been open for
|
||||
a long time without any operations being performed on them. This can
|
||||
lead to an :class:`~pymongo.errors.OperationFailure` exception being
|
||||
lead to an :class:`~pymongo.errors.CursorNotFound` exception being
|
||||
raised when attempting to iterate the cursor.
|
||||
|
||||
How do I change the timeout value for cursors?
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
from collections import deque
|
||||
|
||||
from pymongo import helpers, message
|
||||
from pymongo.errors import AutoReconnect
|
||||
from pymongo.errors import AutoReconnect, CursorNotFound
|
||||
|
||||
|
||||
class CommandCursor(object):
|
||||
@ -107,6 +107,9 @@ class CommandCursor(object):
|
||||
response = helpers._unpack_response(response,
|
||||
self.__id,
|
||||
*self.__decode_opts)
|
||||
except CursorNotFound:
|
||||
self.__killed = True
|
||||
raise
|
||||
except AutoReconnect:
|
||||
# Don't send kill cursors to another server after a "not master"
|
||||
# error. It's completely pointless.
|
||||
|
||||
@ -21,8 +21,9 @@ from bson.code import Code
|
||||
from bson.son import SON
|
||||
from pymongo import helpers, message, read_preferences
|
||||
from pymongo.read_preferences import ReadPreference, secondary_ok_commands
|
||||
from pymongo.errors import (InvalidOperation,
|
||||
AutoReconnect)
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
CursorNotFound,
|
||||
InvalidOperation)
|
||||
|
||||
_QUERY_OPTIONS = {
|
||||
"tailable_cursor": 2,
|
||||
@ -896,6 +897,15 @@ class Cursor(object):
|
||||
self.__tz_aware,
|
||||
self.__uuid_subtype,
|
||||
self.__compile_re)
|
||||
except CursorNotFound:
|
||||
self.__killed = True
|
||||
# If this is a tailable cursor the error is likely
|
||||
# due to capped collection roll over. Setting
|
||||
# self.__killed to True ensures Cursor.alive will be
|
||||
# False. No need to re-raise.
|
||||
if self.__query_flags & _QUERY_OPTIONS["tailable_cursor"]:
|
||||
return
|
||||
raise
|
||||
except AutoReconnect:
|
||||
# Don't send kill cursors to another server after a "not master"
|
||||
# error. It's completely pointless.
|
||||
|
||||
@ -88,6 +88,14 @@ class OperationFailure(PyMongoError):
|
||||
return self.__details
|
||||
|
||||
|
||||
class CursorNotFound(OperationFailure):
|
||||
"""Raised while iterating query results if the cursor is
|
||||
invalidated on the server.
|
||||
|
||||
.. versionadded:: 2.7
|
||||
"""
|
||||
|
||||
|
||||
class ExecutionTimeout(OperationFailure):
|
||||
"""Raised when a database operation times out, exceeding the $maxTimeMS
|
||||
set in the query or command option.
|
||||
|
||||
@ -23,6 +23,7 @@ import pymongo
|
||||
from bson.binary import OLD_UUID_SUBTYPE
|
||||
from bson.son import SON
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
CursorNotFound,
|
||||
DuplicateKeyError,
|
||||
OperationFailure,
|
||||
ExecutionTimeout,
|
||||
@ -92,8 +93,8 @@ def _unpack_response(response, cursor_id=None, as_class=dict,
|
||||
# Shouldn't get this response if we aren't doing a getMore
|
||||
assert cursor_id is not None
|
||||
|
||||
raise OperationFailure("cursor id '%s' not valid at server" %
|
||||
cursor_id)
|
||||
raise CursorNotFound("cursor id '%s' not valid at server" %
|
||||
cursor_id)
|
||||
elif response_flag & 2:
|
||||
error_object = bson.BSON(response[20:]).decode()
|
||||
if error_object["$err"].startswith("not master"):
|
||||
|
||||
@ -921,33 +921,44 @@ class TestCursor(unittest.TestCase):
|
||||
def test_tailable(self):
|
||||
db = self.db
|
||||
db.drop_collection("test")
|
||||
db.create_collection("test", capped=True, size=1000)
|
||||
db.create_collection("test", capped=True, size=1000, max=3)
|
||||
|
||||
cursor = db.test.find(tailable=True)
|
||||
try:
|
||||
cursor = db.test.find(tailable=True)
|
||||
|
||||
db.test.insert({"x": 1})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(1, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
db.test.insert({"x": 1})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(1, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
|
||||
db.test.insert({"x": 2})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(2, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
db.test.insert({"x": 2})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(2, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
|
||||
db.test.insert({"x": 3})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(3, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
db.test.insert({"x": 3})
|
||||
count = 0
|
||||
for doc in cursor:
|
||||
count += 1
|
||||
self.assertEqual(3, doc["x"])
|
||||
self.assertEqual(1, count)
|
||||
|
||||
self.assertEqual(3, db.test.count())
|
||||
db.drop_collection("test")
|
||||
# Capped rollover - the collection can never
|
||||
# have more than 3 documents. Just make sure
|
||||
# this doesn't raise...
|
||||
db.test.insert(({"x": i} for i in xrange(4, 7)))
|
||||
self.assertEqual(0, len(list(cursor)))
|
||||
|
||||
# and that the cursor doesn't think it's still alive.
|
||||
self.assertFalse(cursor.alive)
|
||||
|
||||
self.assertEqual(3, db.test.count())
|
||||
finally:
|
||||
db.drop_collection("test")
|
||||
|
||||
def test_distinct(self):
|
||||
if not version.at_least(self.db.connection, (1, 1, 3, 1)):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user