diff --git a/doc/changelog.rst b/doc/changelog.rst index 9e1c65e10..b06bc8e11 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -4,7 +4,8 @@ Changelog Changes in Version 3.11.0b2.dev0 -------------------------------- -Version 3.11 adds support for MongoDB 4.4. Highlights include: +Version 3.11 adds support for MongoDB 4.4 and includes a number of bug fixes. +Highlights include: - Support for :ref:`OCSP` (Online Certificate Status Protocol). - Support for `PyOpenSSL `_ as an @@ -52,6 +53,9 @@ Version 3.11 adds support for MongoDB 4.4. Highlights include: - Deprecated :meth:`pymongo.collection.Collection.reindex`. Use :meth:`~pymongo.database.Database.command` to run the ``reIndex`` command instead. +- Fixed a bug in change streams that could cause PyMongo to miss some change + documents when resuming a stream that was started without a resume token and + whose first batch did not contain any change documents. Unavoidable breaking changes: diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 08b2043de..f742e126c 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -125,7 +125,7 @@ class ChangeStream(object): if resume_token is not None: if self._uses_start_after: options['startAfter'] = resume_token - if self._uses_resume_after: + else: options['resumeAfter'] = resume_token if self._start_at_operation_time is not None: @@ -149,17 +149,20 @@ class ChangeStream(object): return full_pipeline def _process_result(self, result, session, server, sock_info, slave_ok): - """Callback that caches the startAtOperationTime from a changeStream - aggregate command response containing an empty batch of change - documents. + """Callback that caches the postBatchResumeToken or + startAtOperationTime from a changeStream aggregate command response + containing an empty batch of change documents. This is implemented as a callback because we need access to the wire version in order to determine whether to cache this value. """ if not result['cursor']['firstBatch']: - if (self._start_at_operation_time is None and - self.resume_token is None and - sock_info.max_wire_version >= 7): + if 'postBatchResumeToken' in result['cursor']: + self._resume_token = result['cursor']['postBatchResumeToken'] + elif (self._start_at_operation_time is None and + self._uses_resume_after is False and + self._uses_start_after is False and + sock_info.max_wire_version >= 7): self._start_at_operation_time = result.get("operationTime") # PYTHON-2181: informative error on missing operationTime. if self._start_at_operation_time is None: diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 7c55c6e96..862143a5d 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -127,7 +127,6 @@ class APITestsMixin(object): self.assertEqual([{'$project': {'foo': 0}}], change_stream._pipeline) self.assertEqual('updateLookup', change_stream._full_document) - self.assertIsNone(change_stream.resume_token) self.assertEqual(1000, change_stream._max_await_time_ms) self.assertEqual(100, change_stream._batch_size) self.assertIsInstance(change_stream._cursor, CommandCursor) @@ -472,8 +471,10 @@ class ProseSpecTestsMixin(object): listener is a WhiteListEventListener that listens for aggregate and getMore commands.""" if previous_change is None or stream._cursor._has_next(): - return self._get_expected_resume_token_legacy( + token = self._get_expected_resume_token_legacy( stream, listener, previous_change) + if token is not None: + return token response = listener.results['succeeded'][-1].reply return response['cursor']['postBatchResumeToken'] @@ -1061,6 +1062,8 @@ class TestAllScenarios(unittest.TestCase): fail_point = scenario_dict.get("failPoint") if fail_point is None: return + elif not client_context.test_commands_enabled: + self.skipTest("Test commands must be enabled") fail_cmd = SON([('configureFailPoint', 'failCommand')]) fail_cmd.update(fail_point)