PYTHON-1545 Resume more getMore errors for ChangeStreams

This commit is contained in:
Shane Harvey 2018-06-20 10:51:56 -07:00
parent 96291c88b0
commit 994cf80c7c
2 changed files with 43 additions and 20 deletions

View File

@ -21,8 +21,20 @@ from bson.son import SON
from pymongo import common
from pymongo.collation import validate_collation_or_none
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (ConnectionFailure, CursorNotFound,
InvalidOperation, PyMongoError)
from pymongo.errors import (ConnectionFailure,
InvalidOperation,
OperationFailure,
PyMongoError)
# The change streams spec considers the following server errors from the
# getMore command non-resumable. All other getMore errors are resumable.
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
11601, # Interrupted
136, # CappedPositionLost
237, # CursorKilled
None, # No error code was returned.
])
class ChangeStream(object):
@ -144,6 +156,14 @@ class ChangeStream(object):
explicit_session=self._session is not None
)
def _resume(self):
"""Reestablish this change stream after a resumable error."""
try:
self._cursor.close()
except PyMongoError:
pass
self._cursor = self._create_cursor()
def close(self):
"""Close this ChangeStream."""
self._cursor.close()
@ -162,12 +182,13 @@ class ChangeStream(object):
while True:
try:
change = self._cursor.next()
except (ConnectionFailure, CursorNotFound):
try:
self._cursor.close()
except PyMongoError:
pass
self._cursor = self._create_cursor()
except ConnectionFailure:
self._resume()
continue
except OperationFailure as exc:
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
raise
self._resume()
continue
try:
resume_token = change['_id']

View File

@ -37,6 +37,7 @@ from bson.py3compat import iteritems
from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument
from pymongo import monitoring
from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (InvalidOperation, OperationFailure,
ServerSelectionTimeoutError)
@ -317,23 +318,24 @@ class TestCollectionChangeStream(IntegrationTest):
self.client._close_cursor_now(cursor.cursor_id, address)
self.insert_and_check(change_stream, {'_id': 2})
def test_does_not_resume_on_server_error(self):
"""ChangeStream will not attempt to resume on a server error."""
def mock_next(self, *args, **kwargs):
self._CommandCursor__killed = True
raise OperationFailure('Mock server error')
original_next = CommandCursor.next
CommandCursor.next = mock_next
try:
def test_does_not_resume_fatal_errors(self):
"""ChangeStream will not attempt to resume fatal server errors."""
for code in _NON_RESUMABLE_GETMORE_ERRORS:
with self.coll.watch() as change_stream:
self.coll.insert_one({})
def mock_next(*args, **kwargs):
change_stream._cursor.close()
raise OperationFailure('Mock server error', code=code)
original_next = change_stream._cursor.next
change_stream._cursor.next = mock_next
with self.assertRaises(OperationFailure):
next(change_stream)
CommandCursor.next = original_next
change_stream._cursor.next = original_next
with self.assertRaises(StopIteration):
next(change_stream)
finally:
CommandCursor.next = original_next
def test_initial_empty_batch(self):
"""Ensure that a cursor returned from an aggregate command with a