PYTHON-1662 Add ChangeStream.try_next API

This commit is contained in:
Shane Harvey 2019-03-07 15:13:35 -08:00
parent 11967eb160
commit 92ddc09b7e
5 changed files with 225 additions and 61 deletions

View File

@ -1,5 +1,5 @@
:mod:`change_stream` -- Watch changes on a collection
=====================================================
:mod:`change_stream` -- Watch changes on a collection, database, or cluster
===========================================================================
.. automodule:: pymongo.change_stream
:members:

View File

@ -84,6 +84,9 @@ Changes in Version 3.8.0.dev0
string.
- Add the ``filter`` parameter to
:meth:`~pymongo.database.Database.list_collection_names`.
- Changes can now be requested from a ``ChangeStream`` cursor without blocking
indefinitely using the new
:meth:`pymongo.change_stream.ChangeStream.try_next` method.
Issues Resolved
...............

View File

@ -12,7 +12,7 @@
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""ChangeStream cursor to iterate over changes on a collection."""
"""Watch changes on a collection, a database, or the entire cluster."""
import copy
@ -41,14 +41,12 @@ class ChangeStream(object):
"""The internal abstract base class for change stream cursors.
Should not be called directly by application developers. Use
:meth:pymongo.collection.Collection.watch,
:meth:pymongo.database.Database.watch, or
:meth:pymongo.mongo_client.MongoClient.watch instead.
:meth:`pymongo.collection.Collection.watch`,
:meth:`pymongo.database.Database.watch`, or
:meth:`pymongo.mongo_client.MongoClient.watch` instead.
Defines the interface for change streams. Should be subclassed to
implement the `ChangeStream._create_cursor` abstract method, and
the `ChangeStream._database`and ChangeStream._aggregation_target`
abstract properties.
.. versionadded:: 3.6
.. mongodoc:: changeStreams
"""
def __init__(self, target, pipeline, full_document, resume_after,
max_await_time_ms, batch_size, collation,
@ -176,34 +174,97 @@ class ChangeStream(object):
"""Advance the cursor.
This method blocks until the next change document is returned or an
unrecoverable error is raised.
unrecoverable error is raised. This method is used when iterating over
all changes in the cursor. For example::
try:
with db.collection.watch(
[{'$match': {'operationType': 'insert'}}]) as stream:
for insert_change in stream:
print(insert_change)
except pymongo.errors.PyMongoError:
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
logging.error('...')
Raises :exc:`StopIteration` if this ChangeStream is closed.
"""
while True:
try:
change = self._cursor.next()
except ConnectionFailure:
self._resume()
continue
except OperationFailure as exc:
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
raise
self._resume()
continue
try:
resume_token = change['_id']
except KeyError:
self.close()
raise InvalidOperation(
"Cannot provide resume functionality when the resume "
"token is missing.")
self._resume_token = copy.copy(resume_token)
self._start_at_operation_time = None
return change
while self.alive:
doc = self.try_next()
if doc is not None:
return doc
raise StopIteration
__next__ = next
@property
def alive(self):
"""Does this cursor have the potential to return more data?
.. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
:exc:`StopIteration` and :meth:`try_next` can return ``None``.
.. versionadded:: 3.8
"""
return self._cursor.alive
def try_next(self):
"""Advance the cursor without blocking indefinitely.
This method returns the next change document without waiting
indefinitely for the next change. For example::
with db.collection.watch() as stream:
while stream.alive:
change = stream.try_next()
if change is not None:
print(change)
elif stream.alive:
# We end up here when there are no recent changes.
# Sleep for a while to avoid flooding the server with
# getMore requests when no changes are available.
time.sleep(10)
If no change document is cached locally then this method runs a single
getMore command. If the getMore yields any documents, the next
document is returned, otherwise, if the getMore returns no documents
(because there have been no changes) then ``None`` is returned.
:Returns:
The next change document or ``None`` when no document is available
after running a single getMore or when the cursor is closed.
.. versionadded:: 3.8
"""
# Attempt to get the next change with at most one getMore and at most
# one resume attempt.
try:
change = self._cursor._try_next(True)
except ConnectionFailure:
self._resume()
change = self._cursor._try_next(False)
except OperationFailure as exc:
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
raise
self._resume()
change = self._cursor._try_next(False)
# No changes are available.
if change is None:
return None
try:
resume_token = change['_id']
except KeyError:
self.close()
raise InvalidOperation(
"Cannot provide resume functionality when the resume "
"token is missing.")
self._resume_token = copy.copy(resume_token)
self._start_at_operation_time = None
return change
def __enter__(self):
return self
@ -212,13 +273,12 @@ class ChangeStream(object):
class CollectionChangeStream(ChangeStream):
"""Class for creating a change stream on a collection.
"""A change stream that watches changes on a single collection.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.collection.Collection.watch` instead.
.. versionadded: 3.6
.. mongodoc:: changeStreams
.. versionadded:: 3.7
"""
@property
def _aggregation_target(self):
@ -230,13 +290,12 @@ class CollectionChangeStream(ChangeStream):
class DatabaseChangeStream(ChangeStream):
"""Class for creating a change stream on all collections in a database.
"""A change stream that watches changes on all collections in a database.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.database.Database.watch` instead.
.. versionadded: 3.7
.. mongodoc:: changeStreams
.. versionadded:: 3.7
"""
@property
def _aggregation_target(self):
@ -248,13 +307,12 @@ class DatabaseChangeStream(ChangeStream):
class ClusterChangeStream(DatabaseChangeStream):
"""Class for creating a change stream on all collections on a cluster.
"""A change stream that watches changes on all collections in the cluster.
Should not be called directly by application developers. Use
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
.. versionadded: 3.7
.. mongodoc:: changeStreams
.. versionadded:: 3.7
"""
def _pipeline_options(self):
options = super(ClusterChangeStream, self)._pipeline_options()

View File

@ -285,15 +285,24 @@ class CommandCursor(object):
def next(self):
"""Advance the cursor."""
# Block until a document is returnable.
while not len(self.__data) and not self.__killed:
while self.alive:
doc = self._try_next(True)
if doc is not None:
return doc
raise StopIteration
__next__ = next
def _try_next(self, get_more_allowed):
"""Advance the cursor blocking for at most one getMore command."""
if not len(self.__data) and not self.__killed and get_more_allowed:
self._refresh()
if len(self.__data):
coll = self.__collection
return coll.database._fix_outgoing(self.__data.popleft(), coll)
else:
raise StopIteration
__next__ = next
return None
def __enter__(self):
return self

View File

@ -50,7 +50,92 @@ from test.utils import (
)
class TestClusterChangeStream(IntegrationTest):
class ChangeStreamTryNextMixin(object):
def change_stream_with_client(self, client, *args, **kwargs):
raise NotImplementedError
def change_stream(self, *args, **kwargs):
return self.change_stream_with_client(self.client, *args, **kwargs)
def watched_collection(self):
"""Return a collection that is watched by self.change_stream()."""
raise NotImplementedError
def kill_change_stream_cursor(self, change_stream):
# Cause a cursor not found error on the next getMore.
cursor = change_stream._cursor
address = _CursorAddress(cursor.address, cursor._CommandCursor__ns)
client = self.watched_collection().database.client
client._close_cursor_now(cursor.cursor_id, address)
def test_try_next(self):
coll = self.watched_collection()
coll.insert_one({})
self.addCleanup(coll.drop)
with self.change_stream(max_await_time_ms=100) as stream:
self.assertIsNone(stream.try_next())
self.assertIsNone(stream._resume_token)
coll.insert_one({})
change = stream.try_next()
self.assertEqual(change['_id'], stream._resume_token)
self.assertIsNone(stream.try_next())
self.assertEqual(change['_id'], stream._resume_token)
def test_try_next_runs_one_getmore(self):
listener = EventListener()
client = rs_or_single_client(event_listeners=[listener])
# Connect to the cluster.
client.admin.command('ping')
listener.results.clear()
coll = self.watched_collection()
coll.drop()
# Create the watched collection before starting the change stream to
# skip any "create" events.
coll.insert_one({'_id': 1})
self.addCleanup(coll.drop)
with self.change_stream_with_client(
client, max_await_time_ms=100) as stream:
self.assertEqual(listener.started_command_names(), ["aggregate"])
listener.results.clear()
# Confirm that only a single getMore is run even when no documents
# are returned.
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
# Get at least one change before resuming.
coll.insert_one({'_id': 2})
change = stream.try_next()
self.assertEqual(change['_id'], stream._resume_token)
listener.results.clear()
# Cause the next request to initiate the resume process.
self.kill_change_stream_cursor(stream)
listener.results.clear()
# The sequence should be:
# - getMore, fail
# - resume with aggregate command
# - no results, return immediately without another getMore
self.assertIsNone(stream.try_next())
self.assertEqual(
listener.started_command_names(), ["getMore", "aggregate"])
listener.results.clear()
# Stream still works after a resume.
coll.insert_one({'_id': 3})
change = stream.try_next()
self.assertEqual(change['_id'], stream._resume_token)
self.assertEqual(listener.started_command_names(), ["getMore"])
self.assertIsNone(stream.try_next())
class TestClusterChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
@classmethod
@client_context.require_version_min(4, 0, 0, -1)
@ -66,8 +151,11 @@ class TestClusterChangeStream(IntegrationTest):
cls.client.drop_database(db)
super(TestClusterChangeStream, cls).tearDownClass()
def change_stream(self, *args, **kwargs):
return self.client.watch(*args, **kwargs)
def change_stream_with_client(self, client, *args, **kwargs):
return client.watch(*args, **kwargs)
def watched_collection(self):
return self.db.test
def generate_unique_collnames(self, numcolls):
# Generate N collection names unique to a test.
@ -94,7 +182,7 @@ class TestClusterChangeStream(IntegrationTest):
)
class TestDatabaseChangeStream(IntegrationTest):
class TestDatabaseChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
@classmethod
@client_context.require_version_min(4, 0, 0, -1)
@ -103,8 +191,11 @@ class TestDatabaseChangeStream(IntegrationTest):
def setUpClass(cls):
super(TestDatabaseChangeStream, cls).setUpClass()
def change_stream(self, *args, **kwargs):
return self.db.watch(*args, **kwargs)
def change_stream_with_client(self, client, *args, **kwargs):
return client[self.db.name].watch(*args, **kwargs)
def watched_collection(self):
return self.db.test
def generate_unique_collnames(self, numcolls):
# Generate N collection names unique to a test.
@ -145,7 +236,7 @@ class TestDatabaseChangeStream(IntegrationTest):
self.client.drop_database(other_db)
class TestCollectionChangeStream(IntegrationTest):
class TestCollectionChangeStream(IntegrationTest, ChangeStreamTryNextMixin):
@classmethod
@client_context.require_version_min(3, 5, 11)
@ -171,6 +262,12 @@ class TestCollectionChangeStream(IntegrationTest):
def tearDown(self):
self.coll.drop()
def change_stream_with_client(self, client, *args, **kwargs):
return client[self.db.name].test.watch(*args, **kwargs)
def watched_collection(self):
return self.db.test
def insert_and_check(self, change_stream, doc):
self.coll.insert_one(doc)
change = next(change_stream)
@ -319,9 +416,7 @@ class TestCollectionChangeStream(IntegrationTest):
with self.coll.watch([]) as change_stream:
self.insert_and_check(change_stream, {'_id': 1})
# Cause a cursor not found error on the next getMore.
cursor = change_stream._cursor
address = _CursorAddress(cursor.address, self.coll.full_name)
self.client._close_cursor_now(cursor.cursor_id, address)
self.kill_change_stream_cursor(change_stream)
self.insert_and_check(change_stream, {'_id': 2})
def test_does_not_resume_fatal_errors(self):
@ -330,16 +425,16 @@ class TestCollectionChangeStream(IntegrationTest):
with self.coll.watch() as change_stream:
self.coll.insert_one({})
def mock_next(*args, **kwargs):
def mock_try_next(*args, **kwargs):
change_stream._cursor.close()
raise OperationFailure('Mock server error', code=code)
original_next = change_stream._cursor.next
change_stream._cursor.next = mock_next
original_try_next = change_stream._cursor._try_next
change_stream._cursor._try_next = mock_try_next
with self.assertRaises(OperationFailure):
next(change_stream)
change_stream._cursor.next = original_next
change_stream._cursor._try_next = original_try_next
with self.assertRaises(StopIteration):
next(change_stream)
@ -368,8 +463,7 @@ class TestCollectionChangeStream(IntegrationTest):
self.insert_and_check(change_stream, {'_id': 1})
# Cause a cursor not found error on the next getMore.
cursor = change_stream._cursor
address = _CursorAddress(cursor.address, self.coll.full_name)
self.client._close_cursor_now(cursor.cursor_id, address)
self.kill_change_stream_cursor(change_stream)
cursor.close = raise_error
self.insert_and_check(change_stream, {'_id': 2})