diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 07094cb1f..84602d9b4 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -26,29 +26,8 @@ class ChangeStream(object): Should not be called directly by application developers. Use :meth:`~pymongo.collection.Collection.watch` instead. - :Parameters: - - `collection`: The watched :class:`~pymongo.collection.Collection`. - - `pipeline`: A list of aggregation pipeline stages to append to an - initial `$changeStream` aggregation stage. - - `full_document` (string): The fullDocument to pass as an option - to the $changeStream pipeline stage. Allowed values: 'default', - 'updateLookup'. When set to 'updateLookup', the change notification - for partial updates will include both a delta describing the - changes to the document, as well as a copy of the entire document - that was changed from some time after the change occurred. - - `resume_after` (optional): The logical starting point for this - change stream. - - `max_await_time_ms` (optional): The maximum time in milliseconds - for the server to wait for changes before responding to a getMore - operation. - - `batch_size` (optional): The maximum number of documents to return - per batch. - - `collation` (optional): The :class:`~pymongo.collation.Collation` - to use for the aggregation. - - `session` (optional): a - :class:`~pymongo.client_session.ClientSession`. - .. versionadded: 3.6 + .. mongodoc:: changeStreams """ def __init__(self, collection, pipeline, full_document, resume_after=None, max_await_time_ms=None, batch_size=None, diff --git a/pymongo/collection.py b/pymongo/collection.py index 2b0c922e9..65fd2b003 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -2215,22 +2215,23 @@ class Collection(common.BaseObject): session=None): """Watch changes on this collection. - Performs an aggregation with an implicit initial - `$changeStream aggregation stage`_ and returns a - :class:`~pymongo.change_stream.ChangeStream` cursor that iterates over - changes on this collection. Introduced in MongoDB 3.6. + Performs an aggregation with an implicit initial ``$changeStream`` + stage and returns a :class:`~pymongo.change_stream.ChangeStream` + cursor which iterates over changes on this collection. + Introduced in MongoDB 3.6. .. code-block:: python for change in db.collection.watch(): print(change) - The ChangeStream returned automatically resumes when it - encounters a potentially recoverable error during iteration. The resume - process is transparent to the application and ensures no change stream - documents are lost; the call to - :meth:`~pymongo.change_stream.ChangeStream.next` blocks until the next - change document is returned or an unrecoverable error is raised. + The :class:`~pymongo.change_stream.ChangeStream` iterable blocks + until the next change document is returned or an error is raised. If + the :meth:`~pymongo.change_stream.ChangeStream.next` method encounters + a network error when retrieving a batch from the server, it will + automatically attempt to recreate the cursor such that no change + events are missed. Any error encountered during the resume attempt + indicates there may be an outage and will be raised. .. code-block:: python @@ -2239,29 +2240,29 @@ class Collection(common.BaseObject): [{'$match': {'operationType': 'insert'}}]): print(insert_change) except pymongo.errors.PyMongoError: - # We know for sure it's unrecoverable: + # The ChangeStream encountered an unrecoverable error or the + # resume attempt failed to recreate the cursor. log.error('...') For a precise description of the resume process see the - `Change Streams specification`_. + `change streams specification`_. .. note:: Using this helper method is preferred to directly calling :meth:`~pymongo.collection.Collection.aggregate` with a - ``$changeStream`` aggregation stage, for the purpose of supporting + ``$changeStream`` stage, for the purpose of supporting resumability. .. warning:: This Collection's :attr:`read_concern` must be ``ReadConcern("majority")`` in order to use the ``$changeStream`` - aggregation stage. + stage. :Parameters: - `pipeline` (optional): A list of aggregation pipeline stages to - append to an initial `$changeStream` aggregation stage. Not all - pipeline stages are valid after a `$changeStream` stage, see the - `$changeStream aggregation stage`_ documentation for the supported - stages. + append to an initial ``$changeStream`` stage. Not all + pipeline stages are valid after a ``$changeStream`` stage, see the + MongoDB documentation on change streams for the supported stages. - `full_document` (optional): The fullDocument to pass as an option - to the $changeStream pipeline stage. Allowed values: 'default', + to the ``$changeStream`` stage. Allowed values: 'default', 'updateLookup'. Defaults to 'default'. When set to 'updateLookup', the change notification for partial updates will include both a delta describing the changes to the @@ -2284,10 +2285,9 @@ class Collection(common.BaseObject): .. versionadded:: 3.6 - .. _$changeStream aggregation stage: - https://docs.mongodb.com/manual/reference/operator/aggregation/changeStream/ + .. mongodoc:: changeStreams - .. _Change Streams specification: + .. _change streams specification: https://github.com/mongodb/specifications/blob/master/source/change-streams.rst """ if pipeline is None: