PYTHON-2305 Cache postBatchResumeToken when an aggregate command returns an empty firstBatch (#456)
This commit is contained in:
parent
1c29c1a65e
commit
065001ef2e
@ -4,7 +4,8 @@ Changelog
|
||||
Changes in Version 3.11.0b2.dev0
|
||||
--------------------------------
|
||||
|
||||
Version 3.11 adds support for MongoDB 4.4. Highlights include:
|
||||
Version 3.11 adds support for MongoDB 4.4 and includes a number of bug fixes.
|
||||
Highlights include:
|
||||
|
||||
- Support for :ref:`OCSP` (Online Certificate Status Protocol).
|
||||
- Support for `PyOpenSSL <https://pypi.org/project/pyOpenSSL/>`_ as an
|
||||
@ -52,6 +53,9 @@ Version 3.11 adds support for MongoDB 4.4. Highlights include:
|
||||
- Deprecated :meth:`pymongo.collection.Collection.reindex`. Use
|
||||
:meth:`~pymongo.database.Database.command` to run the ``reIndex`` command
|
||||
instead.
|
||||
- Fixed a bug in change streams that could cause PyMongo to miss some change
|
||||
documents when resuming a stream that was started without a resume token and
|
||||
whose first batch did not contain any change documents.
|
||||
|
||||
Unavoidable breaking changes:
|
||||
|
||||
|
||||
@ -125,7 +125,7 @@ class ChangeStream(object):
|
||||
if resume_token is not None:
|
||||
if self._uses_start_after:
|
||||
options['startAfter'] = resume_token
|
||||
if self._uses_resume_after:
|
||||
else:
|
||||
options['resumeAfter'] = resume_token
|
||||
|
||||
if self._start_at_operation_time is not None:
|
||||
@ -149,17 +149,20 @@ class ChangeStream(object):
|
||||
return full_pipeline
|
||||
|
||||
def _process_result(self, result, session, server, sock_info, slave_ok):
|
||||
"""Callback that caches the startAtOperationTime from a changeStream
|
||||
aggregate command response containing an empty batch of change
|
||||
documents.
|
||||
"""Callback that caches the postBatchResumeToken or
|
||||
startAtOperationTime from a changeStream aggregate command response
|
||||
containing an empty batch of change documents.
|
||||
|
||||
This is implemented as a callback because we need access to the wire
|
||||
version in order to determine whether to cache this value.
|
||||
"""
|
||||
if not result['cursor']['firstBatch']:
|
||||
if (self._start_at_operation_time is None and
|
||||
self.resume_token is None and
|
||||
sock_info.max_wire_version >= 7):
|
||||
if 'postBatchResumeToken' in result['cursor']:
|
||||
self._resume_token = result['cursor']['postBatchResumeToken']
|
||||
elif (self._start_at_operation_time is None and
|
||||
self._uses_resume_after is False and
|
||||
self._uses_start_after is False and
|
||||
sock_info.max_wire_version >= 7):
|
||||
self._start_at_operation_time = result.get("operationTime")
|
||||
# PYTHON-2181: informative error on missing operationTime.
|
||||
if self._start_at_operation_time is None:
|
||||
|
||||
@ -127,7 +127,6 @@ class APITestsMixin(object):
|
||||
self.assertEqual([{'$project': {'foo': 0}}],
|
||||
change_stream._pipeline)
|
||||
self.assertEqual('updateLookup', change_stream._full_document)
|
||||
self.assertIsNone(change_stream.resume_token)
|
||||
self.assertEqual(1000, change_stream._max_await_time_ms)
|
||||
self.assertEqual(100, change_stream._batch_size)
|
||||
self.assertIsInstance(change_stream._cursor, CommandCursor)
|
||||
@ -472,8 +471,10 @@ class ProseSpecTestsMixin(object):
|
||||
listener is a WhiteListEventListener that listens for aggregate and
|
||||
getMore commands."""
|
||||
if previous_change is None or stream._cursor._has_next():
|
||||
return self._get_expected_resume_token_legacy(
|
||||
token = self._get_expected_resume_token_legacy(
|
||||
stream, listener, previous_change)
|
||||
if token is not None:
|
||||
return token
|
||||
|
||||
response = listener.results['succeeded'][-1].reply
|
||||
return response['cursor']['postBatchResumeToken']
|
||||
@ -1061,6 +1062,8 @@ class TestAllScenarios(unittest.TestCase):
|
||||
fail_point = scenario_dict.get("failPoint")
|
||||
if fail_point is None:
|
||||
return
|
||||
elif not client_context.test_commands_enabled:
|
||||
self.skipTest("Test commands must be enabled")
|
||||
|
||||
fail_cmd = SON([('configureFailPoint', 'failCommand')])
|
||||
fail_cmd.update(fail_point)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user