diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index d4482c7c3..b8b94418a 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -21,8 +21,20 @@ from bson.son import SON from pymongo import common from pymongo.collation import validate_collation_or_none from pymongo.command_cursor import CommandCursor -from pymongo.errors import (ConnectionFailure, CursorNotFound, - InvalidOperation, PyMongoError) +from pymongo.errors import (ConnectionFailure, + InvalidOperation, + OperationFailure, + PyMongoError) + + +# The change streams spec considers the following server errors from the +# getMore command non-resumable. All other getMore errors are resumable. +_NON_RESUMABLE_GETMORE_ERRORS = frozenset([ + 11601, # Interrupted + 136, # CappedPositionLost + 237, # CursorKilled + None, # No error code was returned. +]) class ChangeStream(object): @@ -144,6 +156,14 @@ class ChangeStream(object): explicit_session=self._session is not None ) + def _resume(self): + """Reestablish this change stream after a resumable error.""" + try: + self._cursor.close() + except PyMongoError: + pass + self._cursor = self._create_cursor() + def close(self): """Close this ChangeStream.""" self._cursor.close() @@ -162,12 +182,13 @@ class ChangeStream(object): while True: try: change = self._cursor.next() - except (ConnectionFailure, CursorNotFound): - try: - self._cursor.close() - except PyMongoError: - pass - self._cursor = self._create_cursor() + except ConnectionFailure: + self._resume() + continue + except OperationFailure as exc: + if exc.code in _NON_RESUMABLE_GETMORE_ERRORS: + raise + self._resume() continue try: resume_token = change['_id'] diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 189008f52..10be189a3 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -37,6 +37,7 @@ from bson.py3compat import iteritems from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument from pymongo import monitoring +from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS from pymongo.command_cursor import CommandCursor from pymongo.errors import (InvalidOperation, OperationFailure, ServerSelectionTimeoutError) @@ -317,23 +318,24 @@ class TestCollectionChangeStream(IntegrationTest): self.client._close_cursor_now(cursor.cursor_id, address) self.insert_and_check(change_stream, {'_id': 2}) - def test_does_not_resume_on_server_error(self): - """ChangeStream will not attempt to resume on a server error.""" - def mock_next(self, *args, **kwargs): - self._CommandCursor__killed = True - raise OperationFailure('Mock server error') - - original_next = CommandCursor.next - CommandCursor.next = mock_next - try: + def test_does_not_resume_fatal_errors(self): + """ChangeStream will not attempt to resume fatal server errors.""" + for code in _NON_RESUMABLE_GETMORE_ERRORS: with self.coll.watch() as change_stream: + self.coll.insert_one({}) + + def mock_next(*args, **kwargs): + change_stream._cursor.close() + raise OperationFailure('Mock server error', code=code) + + original_next = change_stream._cursor.next + change_stream._cursor.next = mock_next + with self.assertRaises(OperationFailure): next(change_stream) - CommandCursor.next = original_next + change_stream._cursor.next = original_next with self.assertRaises(StopIteration): next(change_stream) - finally: - CommandCursor.next = original_next def test_initial_empty_batch(self): """Ensure that a cursor returned from an aggregate command with a