diff --git a/doc/api/pymongo/change_stream.rst b/doc/api/pymongo/change_stream.rst index 0f2c584b1..ca165f890 100644 --- a/doc/api/pymongo/change_stream.rst +++ b/doc/api/pymongo/change_stream.rst @@ -1,5 +1,5 @@ -:mod:`change_stream` -- Watch changes on a collection -===================================================== +:mod:`change_stream` -- Watch changes on a collection, database, or cluster +=========================================================================== .. automodule:: pymongo.change_stream :members: diff --git a/doc/changelog.rst b/doc/changelog.rst index 8510ce72b..cda32ba40 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -42,6 +42,9 @@ Changes in Version 3.8.0.dev0 string. - Add the ``filter`` parameter to :meth:`~pymongo.database.Database.list_collection_names`. +- Changes can now be requested from a ``ChangeStream`` cursor without blocking + indefinitely using the new + :meth:`pymongo.change_stream.ChangeStream.try_next` method. Issues Resolved ............... diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index b8b94418a..0d7f517f3 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -12,7 +12,7 @@ # implied. See the License for the specific language governing # permissions and limitations under the License. -"""ChangeStream cursor to iterate over changes on a collection.""" +"""Watch changes on a collection, a database, or the entire cluster.""" import copy @@ -41,14 +41,12 @@ class ChangeStream(object): """The internal abstract base class for change stream cursors. Should not be called directly by application developers. Use - :meth:pymongo.collection.Collection.watch, - :meth:pymongo.database.Database.watch, or - :meth:pymongo.mongo_client.MongoClient.watch instead. + :meth:`pymongo.collection.Collection.watch`, + :meth:`pymongo.database.Database.watch`, or + :meth:`pymongo.mongo_client.MongoClient.watch` instead. - Defines the interface for change streams. Should be subclassed to - implement the `ChangeStream._create_cursor` abstract method, and - the `ChangeStream._database`and ChangeStream._aggregation_target` - abstract properties. + .. versionadded:: 3.6 + .. mongodoc:: changeStreams """ def __init__(self, target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, @@ -175,34 +173,97 @@ class ChangeStream(object): """Advance the cursor. This method blocks until the next change document is returned or an - unrecoverable error is raised. + unrecoverable error is raised. This method is used when iterating over + all changes in the cursor. For example:: + + try: + with db.collection.watch( + [{'$match': {'operationType': 'insert'}}]) as stream: + for insert_change in stream: + print(insert_change) + except pymongo.errors.PyMongoError: + # The ChangeStream encountered an unrecoverable error or the + # resume attempt failed to recreate the cursor. + logging.error('...') Raises :exc:`StopIteration` if this ChangeStream is closed. """ - while True: - try: - change = self._cursor.next() - except ConnectionFailure: - self._resume() - continue - except OperationFailure as exc: - if exc.code in _NON_RESUMABLE_GETMORE_ERRORS: - raise - self._resume() - continue - try: - resume_token = change['_id'] - except KeyError: - self.close() - raise InvalidOperation( - "Cannot provide resume functionality when the resume " - "token is missing.") - self._resume_token = copy.copy(resume_token) - self._start_at_operation_time = None - return change + while self.alive: + doc = self.try_next() + if doc is not None: + return doc + + raise StopIteration __next__ = next + @property + def alive(self): + """Does this cursor have the potential to return more data? + + .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise + :exc:`StopIteration` and :meth:`try_next` can return ``None``. + + .. versionadded:: 3.8 + """ + return self._cursor.alive + + def try_next(self): + """Advance the cursor without blocking indefinitely. + + This method returns the next change document without waiting + indefinitely for the next change. For example:: + + with db.collection.watch() as stream: + while stream.alive: + change = stream.try_next() + if change is not None: + print(change) + elif stream.alive: + # We end up here when there are no recent changes. + # Sleep for a while to avoid flooding the server with + # getMore requests when no changes are available. + time.sleep(10) + + If no change document is cached locally then this method runs a single + getMore command. If the getMore yields any documents, the next + document is returned, otherwise, if the getMore returns no documents + (because there have been no changes) then ``None`` is returned. + + :Returns: + The next change document or ``None`` when no document is available + after running a single getMore or when the cursor is closed. + + .. versionadded:: 3.8 + """ + # Attempt to get the next change with at most one getMore and at most + # one resume attempt. + try: + change = self._cursor._try_next(True) + except ConnectionFailure: + self._resume() + change = self._cursor._try_next(False) + except OperationFailure as exc: + if exc.code in _NON_RESUMABLE_GETMORE_ERRORS: + raise + self._resume() + change = self._cursor._try_next(False) + + # No changes are available. + if change is None: + return None + + try: + resume_token = change['_id'] + except KeyError: + self.close() + raise InvalidOperation( + "Cannot provide resume functionality when the resume " + "token is missing.") + self._resume_token = copy.copy(resume_token) + self._start_at_operation_time = None + return change + def __enter__(self): return self @@ -211,13 +272,12 @@ class ChangeStream(object): class CollectionChangeStream(ChangeStream): - """Class for creating a change stream on a collection. + """A change stream that watches changes on a single collection. Should not be called directly by application developers. Use helper method :meth:`pymongo.collection.Collection.watch` instead. - .. versionadded: 3.6 - .. mongodoc:: changeStreams + .. versionadded:: 3.7 """ @property def _aggregation_target(self): @@ -229,13 +289,12 @@ class CollectionChangeStream(ChangeStream): class DatabaseChangeStream(ChangeStream): - """Class for creating a change stream on all collections in a database. + """A change stream that watches changes on all collections in a database. Should not be called directly by application developers. Use helper method :meth:`pymongo.database.Database.watch` instead. - .. versionadded: 3.7 - .. mongodoc:: changeStreams + .. versionadded:: 3.7 """ @property def _aggregation_target(self): @@ -247,13 +306,12 @@ class DatabaseChangeStream(ChangeStream): class ClusterChangeStream(DatabaseChangeStream): - """Class for creating a change stream on all collections on a cluster. + """A change stream that watches changes on all collections in the cluster. Should not be called directly by application developers. Use helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead. - .. versionadded: 3.7 - .. mongodoc:: changeStreams + .. versionadded:: 3.7 """ def _pipeline_options(self): options = super(ClusterChangeStream, self)._pipeline_options() diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 0875cd4e0..e44df9e04 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -285,15 +285,24 @@ class CommandCursor(object): def next(self): """Advance the cursor.""" # Block until a document is returnable. - while not len(self.__data) and not self.__killed: + while self.alive: + doc = self._try_next(True) + if doc is not None: + return doc + + raise StopIteration + + __next__ = next + + def _try_next(self, get_more_allowed): + """Advance the cursor blocking for at most one getMore command.""" + if not len(self.__data) and not self.__killed and get_more_allowed: self._refresh() if len(self.__data): coll = self.__collection return coll.database._fix_outgoing(self.__data.popleft(), coll) else: - raise StopIteration - - __next__ = next + return None def __enter__(self): return self diff --git a/test/test_change_stream.py b/test/test_change_stream.py index d07181a3a..58d6518df 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -50,7 +50,92 @@ from test.utils import ( ) -class TestClusterChangeStream(IntegrationTest): +class ChangeStreamTryNextMixin(object): + + def change_stream_with_client(self, client, *args, **kwargs): + raise NotImplementedError + + def change_stream(self, *args, **kwargs): + return self.change_stream_with_client(self.client, *args, **kwargs) + + def watched_collection(self): + """Return a collection that is watched by self.change_stream().""" + raise NotImplementedError + + def kill_change_stream_cursor(self, change_stream): + # Cause a cursor not found error on the next getMore. + cursor = change_stream._cursor + address = _CursorAddress(cursor.address, cursor._CommandCursor__ns) + client = self.watched_collection().database.client + client._close_cursor_now(cursor.cursor_id, address) + + def test_try_next(self): + coll = self.watched_collection() + coll.insert_one({}) + self.addCleanup(coll.drop) + with self.change_stream(max_await_time_ms=100) as stream: + self.assertIsNone(stream.try_next()) + self.assertIsNone(stream._resume_token) + coll.insert_one({}) + change = stream.try_next() + self.assertEqual(change['_id'], stream._resume_token) + self.assertIsNone(stream.try_next()) + self.assertEqual(change['_id'], stream._resume_token) + + def test_try_next_runs_one_getmore(self): + listener = EventListener() + client = rs_or_single_client(event_listeners=[listener]) + # Connect to the cluster. + client.admin.command('ping') + listener.results.clear() + coll = self.watched_collection() + coll.drop() + # Create the watched collection before starting the change stream to + # skip any "create" events. + coll.insert_one({'_id': 1}) + self.addCleanup(coll.drop) + with self.change_stream_with_client( + client, max_await_time_ms=100) as stream: + self.assertEqual(listener.started_command_names(), ["aggregate"]) + listener.results.clear() + + # Confirm that only a single getMore is run even when no documents + # are returned. + self.assertIsNone(stream.try_next()) + self.assertEqual(listener.started_command_names(), ["getMore"]) + listener.results.clear() + self.assertIsNone(stream.try_next()) + self.assertEqual(listener.started_command_names(), ["getMore"]) + listener.results.clear() + + # Get at least one change before resuming. + coll.insert_one({'_id': 2}) + change = stream.try_next() + self.assertEqual(change['_id'], stream._resume_token) + listener.results.clear() + + # Cause the next request to initiate the resume process. + self.kill_change_stream_cursor(stream) + listener.results.clear() + + # The sequence should be: + # - getMore, fail + # - resume with aggregate command + # - no results, return immediately without another getMore + self.assertIsNone(stream.try_next()) + self.assertEqual( + listener.started_command_names(), ["getMore", "aggregate"]) + listener.results.clear() + + # Stream still works after a resume. + coll.insert_one({'_id': 3}) + change = stream.try_next() + self.assertEqual(change['_id'], stream._resume_token) + self.assertEqual(listener.started_command_names(), ["getMore"]) + self.assertIsNone(stream.try_next()) + + +class TestClusterChangeStream(IntegrationTest, ChangeStreamTryNextMixin): @classmethod @client_context.require_version_min(4, 0, 0, -1) @@ -66,8 +151,11 @@ class TestClusterChangeStream(IntegrationTest): cls.client.drop_database(db) super(TestClusterChangeStream, cls).tearDownClass() - def change_stream(self, *args, **kwargs): - return self.client.watch(*args, **kwargs) + def change_stream_with_client(self, client, *args, **kwargs): + return client.watch(*args, **kwargs) + + def watched_collection(self): + return self.db.test def generate_unique_collnames(self, numcolls): # Generate N collection names unique to a test. @@ -94,7 +182,7 @@ class TestClusterChangeStream(IntegrationTest): ) -class TestDatabaseChangeStream(IntegrationTest): +class TestDatabaseChangeStream(IntegrationTest, ChangeStreamTryNextMixin): @classmethod @client_context.require_version_min(4, 0, 0, -1) @@ -103,8 +191,11 @@ class TestDatabaseChangeStream(IntegrationTest): def setUpClass(cls): super(TestDatabaseChangeStream, cls).setUpClass() - def change_stream(self, *args, **kwargs): - return self.db.watch(*args, **kwargs) + def change_stream_with_client(self, client, *args, **kwargs): + return client[self.db.name].watch(*args, **kwargs) + + def watched_collection(self): + return self.db.test def generate_unique_collnames(self, numcolls): # Generate N collection names unique to a test. @@ -145,7 +236,7 @@ class TestDatabaseChangeStream(IntegrationTest): self.client.drop_database(other_db) -class TestCollectionChangeStream(IntegrationTest): +class TestCollectionChangeStream(IntegrationTest, ChangeStreamTryNextMixin): @classmethod @client_context.require_version_min(3, 5, 11) @@ -171,6 +262,12 @@ class TestCollectionChangeStream(IntegrationTest): def tearDown(self): self.coll.drop() + def change_stream_with_client(self, client, *args, **kwargs): + return client[self.db.name].test.watch(*args, **kwargs) + + def watched_collection(self): + return self.db.test + def insert_and_check(self, change_stream, doc): self.coll.insert_one(doc) change = next(change_stream) @@ -319,9 +416,7 @@ class TestCollectionChangeStream(IntegrationTest): with self.coll.watch([]) as change_stream: self.insert_and_check(change_stream, {'_id': 1}) # Cause a cursor not found error on the next getMore. - cursor = change_stream._cursor - address = _CursorAddress(cursor.address, self.coll.full_name) - self.client._close_cursor_now(cursor.cursor_id, address) + self.kill_change_stream_cursor(change_stream) self.insert_and_check(change_stream, {'_id': 2}) def test_does_not_resume_fatal_errors(self): @@ -330,16 +425,16 @@ class TestCollectionChangeStream(IntegrationTest): with self.coll.watch() as change_stream: self.coll.insert_one({}) - def mock_next(*args, **kwargs): + def mock_try_next(*args, **kwargs): change_stream._cursor.close() raise OperationFailure('Mock server error', code=code) - original_next = change_stream._cursor.next - change_stream._cursor.next = mock_next + original_try_next = change_stream._cursor._try_next + change_stream._cursor._try_next = mock_try_next with self.assertRaises(OperationFailure): next(change_stream) - change_stream._cursor.next = original_next + change_stream._cursor._try_next = original_try_next with self.assertRaises(StopIteration): next(change_stream) @@ -368,8 +463,7 @@ class TestCollectionChangeStream(IntegrationTest): self.insert_and_check(change_stream, {'_id': 1}) # Cause a cursor not found error on the next getMore. cursor = change_stream._cursor - address = _CursorAddress(cursor.address, self.coll.full_name) - self.client._close_cursor_now(cursor.cursor_id, address) + self.kill_change_stream_cursor(change_stream) cursor.close = raise_error self.insert_and_check(change_stream, {'_id': 2})