PYTHON-1662 Add ChangeStream.try_next API
(cherry picked from commit 92ddc09b7e)
This commit is contained in:
parent
86463345e6
commit
c339077ec9
@ -1,5 +1,5 @@
|
||||
:mod:`change_stream` -- Watch changes on a collection
|
||||
=====================================================
|
||||
:mod:`change_stream` -- Watch changes on a collection, database, or cluster
|
||||
===========================================================================
|
||||
|
||||
.. automodule:: pymongo.change_stream
|
||||
:members:
|
||||
|
||||
@ -42,6 +42,9 @@ Changes in Version 3.8.0.dev0
|
||||
string.
|
||||
- Add the ``filter`` parameter to
|
||||
:meth:`~pymongo.database.Database.list_collection_names`.
|
||||
- Changes can now be requested from a ``ChangeStream`` cursor without blocking
|
||||
indefinitely using the new
|
||||
:meth:`pymongo.change_stream.ChangeStream.try_next` method.
|
||||
|
||||
Issues Resolved
|
||||
...............
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License.
|
||||
|
||||
"""ChangeStream cursor to iterate over changes on a collection."""
|
||||
"""Watch changes on a collection, a database, or the entire cluster."""
|
||||
|
||||
import copy
|
||||
|
||||
@ -41,14 +41,12 @@ class ChangeStream(object):
|
||||
"""The internal abstract base class for change stream cursors.
|
||||
|
||||
Should not be called directly by application developers. Use
|
||||
:meth:pymongo.collection.Collection.watch,
|
||||
:meth:pymongo.database.Database.watch, or
|
||||
:meth:pymongo.mongo_client.MongoClient.watch instead.
|
||||
:meth:`pymongo.collection.Collection.watch`,
|
||||
:meth:`pymongo.database.Database.watch`, or
|
||||
:meth:`pymongo.mongo_client.MongoClient.watch` instead.
|
||||
|
||||
Defines the interface for change streams. Should be subclassed to
|
||||
implement the `ChangeStream._create_cursor` abstract method, and
|
||||
the `ChangeStream._database`and ChangeStream._aggregation_target`
|
||||
abstract properties.
|
||||
.. versionadded:: 3.6
|
||||
.. mongodoc:: changeStreams
|
||||
"""
|
||||
def __init__(self, target, pipeline, full_document, resume_after,
|
||||
max_await_time_ms, batch_size, collation,
|
||||
@ -175,34 +173,97 @@ class ChangeStream(object):
|
||||
"""Advance the cursor.
|
||||
|
||||
This method blocks until the next change document is returned or an
|
||||
unrecoverable error is raised.
|
||||
unrecoverable error is raised. This method is used when iterating over
|
||||
all changes in the cursor. For example::
|
||||
|
||||
try:
|
||||
with db.collection.watch(
|
||||
[{'$match': {'operationType': 'insert'}}]) as stream:
|
||||
for insert_change in stream:
|
||||
print(insert_change)
|
||||
except pymongo.errors.PyMongoError:
|
||||
# The ChangeStream encountered an unrecoverable error or the
|
||||
# resume attempt failed to recreate the cursor.
|
||||
logging.error('...')
|
||||
|
||||
Raises :exc:`StopIteration` if this ChangeStream is closed.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
change = self._cursor.next()
|
||||
except ConnectionFailure:
|
||||
self._resume()
|
||||
continue
|
||||
except OperationFailure as exc:
|
||||
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
|
||||
raise
|
||||
self._resume()
|
||||
continue
|
||||
try:
|
||||
resume_token = change['_id']
|
||||
except KeyError:
|
||||
self.close()
|
||||
raise InvalidOperation(
|
||||
"Cannot provide resume functionality when the resume "
|
||||
"token is missing.")
|
||||
self._resume_token = copy.copy(resume_token)
|
||||
self._start_at_operation_time = None
|
||||
return change
|
||||
while self.alive:
|
||||
doc = self.try_next()
|
||||
if doc is not None:
|
||||
return doc
|
||||
|
||||
raise StopIteration
|
||||
|
||||
__next__ = next
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
"""Does this cursor have the potential to return more data?
|
||||
|
||||
.. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
|
||||
:exc:`StopIteration` and :meth:`try_next` can return ``None``.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
"""
|
||||
return self._cursor.alive
|
||||
|
||||
def try_next(self):
|
||||
"""Advance the cursor without blocking indefinitely.
|
||||
|
||||
This method returns the next change document without waiting
|
||||
indefinitely for the next change. For example::
|
||||
|
||||
with db.collection.watch() as stream:
|
||||
while stream.alive:
|
||||
change = stream.try_next()
|
||||
if change is not None:
|
||||
print(change)
|
||||
elif stream.alive:
|
||||
# We end up here when there are no recent changes.
|
||||
# Sleep for a while to avoid flooding the server with
|
||||
# getMore requests when no changes are available.
|
||||
time.sleep(10)
|
||||
|
||||
If no change document is cached locally then this method runs a single
|
||||
getMore command. If the getMore yields any documents, the next
|
||||
document is returned, otherwise, if the getMore returns no documents
|
||||
(because there have been no changes) then ``None`` is returned.
|
||||
|
||||
:Returns:
|
||||
The next change document or ``None`` when no document is available
|
||||
after running a single getMore or when the cursor is closed.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
"""
|
||||
# Attempt to get the next change with at most one getMore and at most
|
||||
# one resume attempt.
|
||||
try:
|
||||
change = self._cursor._try_next(True)
|
||||
except ConnectionFailure:
|
||||
self._resume()
|
||||
change = self._cursor._try_next(False)
|
||||
except OperationFailure as exc:
|
||||
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
|
||||
raise
|
||||
self._resume()
|
||||
change = self._cursor._try_next(False)
|
||||
|
||||
# No changes are available.
|
||||
if change is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
resume_token = change['_id']
|
||||
except KeyError:
|
||||
self.close()
|
||||
raise InvalidOperation(
|
||||
"Cannot provide resume functionality when the resume "
|
||||
"token is missing.")
|
||||
self._resume_token = copy.copy(resume_token)
|
||||
self._start_at_operation_time = None
|
||||
return change
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
@ -211,13 +272,12 @@ class ChangeStream(object):
|
||||
|
||||
|
||||
class CollectionChangeStream(ChangeStream):
|
||||
"""Class for creating a change stream on a collection.
|
||||
"""A change stream that watches changes on a single collection.
|
||||
|
||||
Should not be called directly by application developers. Use
|
||||
helper method :meth:`pymongo.collection.Collection.watch` instead.
|
||||
|
||||
.. versionadded: 3.6
|
||||
.. mongodoc:: changeStreams
|
||||
.. versionadded:: 3.7
|
||||
"""
|
||||
@property
|
||||
def _aggregation_target(self):
|
||||
@ -229,13 +289,12 @@ class CollectionChangeStream(ChangeStream):
|
||||
|
||||
|
||||
class DatabaseChangeStream(ChangeStream):
|
||||
"""Class for creating a change stream on all collections in a database.
|
||||
"""A change stream that watches changes on all collections in a database.
|
||||
|
||||
Should not be called directly by application developers. Use
|
||||
helper method :meth:`pymongo.database.Database.watch` instead.
|
||||
|
||||
.. versionadded: 3.7
|
||||
.. mongodoc:: changeStreams
|
||||
.. versionadded:: 3.7
|
||||
"""
|
||||
@property
|
||||
def _aggregation_target(self):
|
||||
@ -247,13 +306,12 @@ class DatabaseChangeStream(ChangeStream):
|
||||
|
||||
|
||||
class ClusterChangeStream(DatabaseChangeStream):
|
||||
"""Class for creating a change stream on all collections on a cluster.
|
||||
"""A change stream that watches changes on all collections in the cluster.
|
||||
|
||||
Should not be called directly by application developers. Use
|
||||
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
|
||||
|
||||
.. versionadded: 3.7
|
||||
.. mongodoc:: changeStreams
|
||||
.. versionadded:: 3.7
|
||||
"""
|
||||
def _pipeline_options(self):
|
||||
options = super(ClusterChangeStream, self)._pipeline_options()
|
||||
|
||||
@ -285,15 +285,24 @@ class CommandCursor(object):
|
||||
def next(self):
|
||||
"""Advance the cursor."""
|
||||
# Block until a document is returnable.
|
||||
while not len(self.__data) and not self.__killed:
|
||||
while self.alive:
|
||||
doc = self._try_next(True)
|
||||
if doc is not None:
|
||||
return doc
|
||||
|
||||
raise StopIteration
|
||||
|
||||
__next__ = next
|
||||
|
||||
def _try_next(self, get_more_allowed):
|
||||
"""Advance the cursor blocking for at most one getMore command."""
|
||||
if not len(self.__data) and not self.__killed and get_more_allowed:
|
||||
self._refresh()
|
||||
if len(self.__data):
|
||||
coll = self.__collection
|
||||
return coll.database._fix_outgoing(self.__data.popleft(), coll)
|
||||
else:
|
||||
raise StopIteration
|
||||
|
||||
__next__ = next
|
||||
return None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
@ -50,7 +50,92 @@ from test.utils import (
|
||||
)
|
||||
|
||||
|
||||
class TestClusterChangeStream(IntegrationTest):
|
||||
class ChangeStreamTryNextMixin(object):
|
||||
|
||||
def change_stream_with_client(self, client, *args, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
def change_stream(self, *args, **kwargs):
|
||||
return self.change_stream_with_client(self.client, *args, **kwargs)
|
||||
|
||||
def watched_collection(self):
|
||||
"""Return a collection that is watched by self.change_stream()."""
|
||||
raise NotImplementedError
|
||||
|
||||
def kill_change_stream_cursor(self, change_stream):
|
||||
# Cause a cursor not found error on the next getMore.
|
||||
cursor = change_stream._cursor
|
||||
address = _CursorAddress(cursor.address, cursor._CommandCursor__ns)
|
||||
client = self.watched_collection().database.client
|
||||
client._close_cursor_now(cursor.cursor_id, address)
|
||||
|
||||
def test_try_next(self):
|
||||
coll = self.watched_collection()
|
||||
coll.insert_one({})
|
||||
self.addCleanup(coll.drop)
|
||||
with self.change_stream(max_await_time_ms=100) as stream:
|
||||
self.assertIsNone(stream.try_next())
|
||||
self.assertIsNone(stream._resume_token)
|
||||
coll.insert_one({})
|
||||
change = stream.try_next()
|
||||
self.assertEqual(change['_id'], stream._resume_token)
|
||||
self.assertIsNone(stream.try_next())
|
||||
self.assertEqual(change['_id'], stream._resume_token)
|
||||
|
||||
def test_try_next_runs_one_getmore(self):
|
||||
listener = EventListener()
|
||||
client = rs_or_single_client(event_listeners=[listener])
|
||||
# Connect to the cluster.
|
||||
client.admin.command('ping')
|
||||
listener.results.clear()
|
||||
coll = self.watched_collection()
|
||||
coll.drop()
|
||||
# Create the watched collection before starting the change stream to
|
||||
# skip any "create" events.
|
||||
coll.insert_one({'_id': 1})
|
||||
self.addCleanup(coll.drop)
|
||||
with self.change_stream_with_client(
|
||||
client, max_await_time_ms=100) as stream:
|
||||
self.assertEqual(listener.started_command_names(), ["aggregate"])
|
||||
listener.results.clear()
|
||||
|
||||
# Confirm that only a single getMore is run even when no documents
|
||||
# are returned.
|
||||
self.assertIsNone(stream.try_next())
|
||||
self.assertEqual(listener.started_command_names(), ["getMore"])
|
||||
listener.results.clear()
|
||||
self.assertIsNone(stream.try_next())
|
||||
self.assertEqual(listener.started_command_names(), ["getMore"])
|
||||
listener.results.clear()
|
||||
|
||||
# Get at least one change before resuming.
|
||||
coll.insert_one({'_id': 2})
|
||||
change = stream.try_next()
|
||||
self.assertEqual(change['_id'], stream._resume_token)
|
||||
listener.results.clear()
|
||||
|
||||
# Cause the next request to initiate the resume process.
|
||||
self.kill_change_stream_cursor(stream)
|
||||
listener.results.clear()
|
||||
|
||||
# The sequence should be:
|
||||
# - getMore, fail
|
||||
# - resume with aggregate command
|
||||
# - no results, return immediately without another getMore
|
||||
self.assertIsNone(stream.try_next())
|
||||
self.assertEqual(
|
||||
listener.started_command_names(), ["getMore", "aggregate"])
|
||||
listener.results.clear()
|
||||
|
||||
# Stream still works after a resume.
|
||||
coll.insert_one({'_id': 3})
|
||||
change = stream.try_next()
|
||||
self.assertEqual(change['_id'], stream._resume_token)
|
||||
self.assertEqual(listener.started_command_names(), ["getMore"])
|
||||
self.assertIsNone(stream.try_next())
|
||||
|
||||
|
||||
class TestClusterChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
|
||||
|
||||
@classmethod
|
||||
@client_context.require_version_min(4, 0, 0, -1)
|
||||
@ -66,8 +151,11 @@ class TestClusterChangeStream(IntegrationTest):
|
||||
cls.client.drop_database(db)
|
||||
super(TestClusterChangeStream, cls).tearDownClass()
|
||||
|
||||
def change_stream(self, *args, **kwargs):
|
||||
return self.client.watch(*args, **kwargs)
|
||||
def change_stream_with_client(self, client, *args, **kwargs):
|
||||
return client.watch(*args, **kwargs)
|
||||
|
||||
def watched_collection(self):
|
||||
return self.db.test
|
||||
|
||||
def generate_unique_collnames(self, numcolls):
|
||||
# Generate N collection names unique to a test.
|
||||
@ -94,7 +182,7 @@ class TestClusterChangeStream(IntegrationTest):
|
||||
)
|
||||
|
||||
|
||||
class TestDatabaseChangeStream(IntegrationTest):
|
||||
class TestDatabaseChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
|
||||
|
||||
@classmethod
|
||||
@client_context.require_version_min(4, 0, 0, -1)
|
||||
@ -103,8 +191,11 @@ class TestDatabaseChangeStream(IntegrationTest):
|
||||
def setUpClass(cls):
|
||||
super(TestDatabaseChangeStream, cls).setUpClass()
|
||||
|
||||
def change_stream(self, *args, **kwargs):
|
||||
return self.db.watch(*args, **kwargs)
|
||||
def change_stream_with_client(self, client, *args, **kwargs):
|
||||
return client[self.db.name].watch(*args, **kwargs)
|
||||
|
||||
def watched_collection(self):
|
||||
return self.db.test
|
||||
|
||||
def generate_unique_collnames(self, numcolls):
|
||||
# Generate N collection names unique to a test.
|
||||
@ -145,7 +236,7 @@ class TestDatabaseChangeStream(IntegrationTest):
|
||||
self.client.drop_database(other_db)
|
||||
|
||||
|
||||
class TestCollectionChangeStream(IntegrationTest):
|
||||
class TestCollectionChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
|
||||
|
||||
@classmethod
|
||||
@client_context.require_version_min(3, 5, 11)
|
||||
@ -171,6 +262,12 @@ class TestCollectionChangeStream(IntegrationTest):
|
||||
def tearDown(self):
|
||||
self.coll.drop()
|
||||
|
||||
def change_stream_with_client(self, client, *args, **kwargs):
|
||||
return client[self.db.name].test.watch(*args, **kwargs)
|
||||
|
||||
def watched_collection(self):
|
||||
return self.db.test
|
||||
|
||||
def insert_and_check(self, change_stream, doc):
|
||||
self.coll.insert_one(doc)
|
||||
change = next(change_stream)
|
||||
@ -319,9 +416,7 @@ class TestCollectionChangeStream(IntegrationTest):
|
||||
with self.coll.watch([]) as change_stream:
|
||||
self.insert_and_check(change_stream, {'_id': 1})
|
||||
# Cause a cursor not found error on the next getMore.
|
||||
cursor = change_stream._cursor
|
||||
address = _CursorAddress(cursor.address, self.coll.full_name)
|
||||
self.client._close_cursor_now(cursor.cursor_id, address)
|
||||
self.kill_change_stream_cursor(change_stream)
|
||||
self.insert_and_check(change_stream, {'_id': 2})
|
||||
|
||||
def test_does_not_resume_fatal_errors(self):
|
||||
@ -330,16 +425,16 @@ class TestCollectionChangeStream(IntegrationTest):
|
||||
with self.coll.watch() as change_stream:
|
||||
self.coll.insert_one({})
|
||||
|
||||
def mock_next(*args, **kwargs):
|
||||
def mock_try_next(*args, **kwargs):
|
||||
change_stream._cursor.close()
|
||||
raise OperationFailure('Mock server error', code=code)
|
||||
|
||||
original_next = change_stream._cursor.next
|
||||
change_stream._cursor.next = mock_next
|
||||
original_try_next = change_stream._cursor._try_next
|
||||
change_stream._cursor._try_next = mock_try_next
|
||||
|
||||
with self.assertRaises(OperationFailure):
|
||||
next(change_stream)
|
||||
change_stream._cursor.next = original_next
|
||||
change_stream._cursor._try_next = original_try_next
|
||||
with self.assertRaises(StopIteration):
|
||||
next(change_stream)
|
||||
|
||||
@ -368,8 +463,7 @@ class TestCollectionChangeStream(IntegrationTest):
|
||||
self.insert_and_check(change_stream, {'_id': 1})
|
||||
# Cause a cursor not found error on the next getMore.
|
||||
cursor = change_stream._cursor
|
||||
address = _CursorAddress(cursor.address, self.coll.full_name)
|
||||
self.client._close_cursor_now(cursor.cursor_id, address)
|
||||
self.kill_change_stream_cursor(change_stream)
|
||||
cursor.close = raise_error
|
||||
self.insert_and_check(change_stream, {'_id': 2})
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user