From cfb30e91c018add331754b244639c5a0df6e748f Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Sun, 13 Aug 2017 16:06:42 -0400 Subject: [PATCH] PYTHON-1362 - Add find/aggregate_raw_batches() Rename find_raw to find_raw_batches, and add aggregate_raw_batches. Rename RawBSONCursor and RawBSONCommandCursor to RawBatchCursor and RawBatchCommandCursor. --- doc/api/pymongo/cursor.rst | 2 +- doc/changelog.rst | 6 ++ pymongo/collection.py | 153 ++++++++++++++++++++++--------------- pymongo/command_cursor.py | 69 +++++++++++++---- pymongo/cursor.py | 52 +++++-------- pymongo/message.py | 23 ++++++ test/test_collection.py | 3 - test/test_cursor.py | 124 ++++++++++++++++++++++++++---- 8 files changed, 304 insertions(+), 128 deletions(-) diff --git a/doc/api/pymongo/cursor.rst b/doc/api/pymongo/cursor.rst index 19d9cb66c..4b9943f9a 100644 --- a/doc/api/pymongo/cursor.rst +++ b/doc/api/pymongo/cursor.rst @@ -24,4 +24,4 @@ .. automethod:: __getitem__ - .. autoclass:: pymongo.cursor.RawBSONCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None) + .. autoclass:: pymongo.cursor.RawBatchCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None) diff --git a/doc/changelog.rst b/doc/changelog.rst index 104f1dc80..2cd71484e 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -8,6 +8,12 @@ This version drops support for MongoDB versions older than 2.6. If connecting to a MongoDB 2.4 server or older, PyMongo now throws a :exc:`~pymongo.errors.ConfigurationError`. +Highlights include: + +- New methods :meth:`~pymongo.collection.Collection.find_raw_batches` and + :meth:`~pymongo.collection.Collection.aggregate_raw_batches` for use with + external libraries that can parse raw batches of BSON data. + Changes in Version 3.5.1 ------------------------ diff --git a/pymongo/collection.py b/pymongo/collection.py index 135f45ef0..e516999dd 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -30,9 +30,9 @@ from pymongo import (common, helpers, message) from pymongo.bulk import BulkOperationBuilder, _Bulk -from pymongo.command_cursor import CommandCursor +from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor from pymongo.collation import validate_collation_or_none -from pymongo.cursor import Cursor, RawBSONCursor +from pymongo.cursor import Cursor, RawBatchCursor from pymongo.errors import ConfigurationError, InvalidName, OperationFailure from pymongo.helpers import _check_write_command_response from pymongo.helpers import _UNICODE_REPLACE_CODEC_OPTIONS @@ -1278,11 +1278,11 @@ class Collection(common.BaseObject): """ return Cursor(self, *args, **kwargs) - def find_raw(self, *args, **kwargs): + def find_raw_batches(self, *args, **kwargs): """Query the database and retrieve batches of raw BSON. Takes the same parameters as :meth:`find` but returns a - :class:`~pymongo.cursor.RawBSONCursor`. + :class:`~pymongo.cursor.RawBatchCursor`. This example demonstrates how to work with raw batches, but in practice raw batches should be passed to an external library that can decode @@ -1290,13 +1290,13 @@ class Collection(common.BaseObject): :mod:`bson` module. >>> import bson - >>> cursor = db.test.find_raw() + >>> cursor = db.test.find_raw_batches() >>> for batch in cursor: ... print(bson.decode_all(batch)) .. versionadded:: 3.6 """ - return RawBSONCursor(self, *args, **kwargs) + return RawBatchCursor(self, *args, **kwargs) def parallel_scan(self, num_cursors, **kwargs): """Scan this entire collection in parallel. @@ -1821,6 +1821,69 @@ class Collection(common.BaseObject): return options + def _aggregate(self, pipeline, cursor_class, first_batch_size, **kwargs): + if not isinstance(pipeline, list): + raise TypeError("pipeline must be a list") + + if "explain" in kwargs: + raise ConfigurationError("The explain option is not supported. " + "Use Database.command instead.") + collation = validate_collation_or_none(kwargs.pop('collation', None)) + + cmd = SON([("aggregate", self.__name), + ("pipeline", pipeline)]) + + # Remove things that are not command options. + batch_size = common.validate_non_negative_integer_or_none( + "batchSize", kwargs.pop("batchSize", None)) + use_cursor = common.validate_boolean( + "useCursor", kwargs.pop("useCursor", True)) + # If the server does not support the "cursor" option we + # ignore useCursor and batchSize. + with self._socket_for_reads() as (sock_info, slave_ok): + if sock_info.max_wire_version > 0: + if use_cursor: + if "cursor" not in kwargs: + kwargs["cursor"] = {} + if first_batch_size is not None: + kwargs["cursor"]["batchSize"] = first_batch_size + + dollar_out = pipeline and '$out' in pipeline[-1] + if (sock_info.max_wire_version >= 5 and dollar_out and + self.write_concern): + cmd['writeConcern'] = self.write_concern.document + + cmd.update(kwargs) + + # Apply this Collection's read concern if $out is not in the + # pipeline. + if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: + if dollar_out: + result = self._command(sock_info, cmd, slave_ok, + parse_write_concern_error=True, + collation=collation) + else: + result = self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern, + collation=collation) + else: + result = self._command(sock_info, cmd, slave_ok, + parse_write_concern_error=dollar_out, + collation=collation) + + if "cursor" in result: + cursor = result["cursor"] + else: + # Pre-MongoDB 2.6. Fake a cursor. + cursor = { + "id": 0, + "firstBatch": result["result"], + "ns": self.full_name, + } + + return cursor_class( + self, cursor, sock_info.address).batch_size(batch_size or 0) + def aggregate(self, pipeline, **kwargs): """Perform an aggregation using the aggregation framework on this collection. @@ -1892,66 +1955,34 @@ class Collection(common.BaseObject): .. _aggregate command: http://docs.mongodb.org/manual/applications/aggregation """ - if not isinstance(pipeline, list): - raise TypeError("pipeline must be a list") + return self._aggregate(pipeline, + CommandCursor, + kwargs.get('batchSize'), + **kwargs) - if "explain" in kwargs: - raise ConfigurationError("The explain option is not supported. " - "Use Database.command instead.") - collation = validate_collation_or_none(kwargs.pop('collation', None)) + def aggregate_raw_batches(self, pipeline, **kwargs): + """Perform an aggregation and retrieve batches of raw BSON. - cmd = SON([("aggregate", self.__name), - ("pipeline", pipeline)]) + Takes the same parameters as :meth:`aggregate` but returns a + :class:`~pymongo.cursor.RawBatchCursor`. - # Remove things that are not command options. - batch_size = common.validate_positive_integer_or_none( - "batchSize", kwargs.pop("batchSize", None)) - use_cursor = common.validate_boolean( - "useCursor", kwargs.pop("useCursor", True)) - # If the server does not support the "cursor" option we - # ignore useCursor and batchSize. - with self._socket_for_reads() as (sock_info, slave_ok): - if sock_info.max_wire_version > 0: - if use_cursor: - if "cursor" not in kwargs: - kwargs["cursor"] = {} - if batch_size is not None: - kwargs["cursor"]["batchSize"] = batch_size + This example demonstrates how to work with raw batches, but in practice + raw batches should be passed to an external library that can decode + BSON into another data type, rather than used with PyMongo's + :mod:`bson` module. - dollar_out = pipeline and '$out' in pipeline[-1] - if (sock_info.max_wire_version >= 5 and dollar_out and - self.write_concern): - cmd['writeConcern'] = self.write_concern.document + >>> import bson + >>> cursor = db.test.aggregate_raw_batches([ + ... {'$project': {'x': {'$multiply': [2, '$x']}}}]) + >>> for batch in cursor: + ... print(bson.decode_all(batch)) - cmd.update(kwargs) - - # Apply this Collection's read concern if $out is not in the - # pipeline. - if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: - if dollar_out: - result = self._command(sock_info, cmd, slave_ok, - parse_write_concern_error=True, - collation=collation) - else: - result = self._command(sock_info, cmd, slave_ok, - read_concern=self.read_concern, - collation=collation) - else: - result = self._command(sock_info, cmd, slave_ok, - parse_write_concern_error=dollar_out, - collation=collation) - - if "cursor" in result: - cursor = result["cursor"] - else: - # Pre-MongoDB 2.6. Fake a cursor. - cursor = { - "id": 0, - "firstBatch": result["result"], - "ns": self.full_name, - } - return CommandCursor( - self, cursor, sock_info.address).batch_size(batch_size or 0) + .. versionadded:: 3.6 + """ + return self._aggregate(pipeline, + RawBatchCommandCursor, + 0, + **kwargs) def group(self, key, condition, initial, reduce, finalize=None, **kwargs): """Perform a query similar to an SQL *group by* operation. diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 3f98e110b..0a628c5a6 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -20,22 +20,29 @@ from collections import deque from bson.py3compat import integer_types from pymongo import helpers -from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure -from pymongo.message import _CursorAddress, _GetMore, _convert_exception +from pymongo.errors import (AutoReconnect, + InvalidOperation, + NotMasterError, + OperationFailure) +from pymongo.message import (_convert_exception, + _CursorAddress, + _GetMore, + _RawBatchGetMore) class CommandCursor(object): - """A cursor / iterator over command cursors. - """ + """A cursor / iterator over command cursors.""" + _getmore_class = _GetMore def __init__(self, collection, cursor_info, address, retrieved=0): """Create a new command cursor. + + The parameter 'retrieved' is unused. """ self.__collection = collection self.__id = cursor_info['id'] self.__address = address self.__data = deque(cursor_info['firstBatch']) - self.__retrieved = retrieved self.__batch_size = 0 self.__killed = (self.__id == 0) @@ -115,9 +122,9 @@ class CommandCursor(object): if publish: start = datetime.datetime.now() try: - doc = helpers._unpack_response(response.data, - self.__id, - self.__collection.codec_options) + doc = self._unpack_response(response.data, + self.__id, + self.__collection.codec_options) if from_command: helpers._check_command_response(doc['data'][0]) @@ -154,11 +161,9 @@ class CommandCursor(object): cursor = doc['data'][0]['cursor'] documents = cursor['nextBatch'] self.__id = cursor['id'] - self.__retrieved += len(documents) else: documents = doc["data"] self.__id = doc["cursor_id"] - self.__retrieved += doc["number_returned"] if publish: duration = (datetime.datetime.now() - start) + cmd_duration @@ -174,6 +179,8 @@ class CommandCursor(object): self.__killed = True self.__data = deque(documents) + def _unpack_response(self, response, cursor_id, codec_options): + return helpers._unpack_response(response, cursor_id, codec_options) def _refresh(self): """Refreshes the cursor with more data from the server. @@ -188,17 +195,21 @@ class CommandCursor(object): if self.__id: # Get More dbname, collname = self.__ns.split('.', 1) self.__send_message( - _GetMore(dbname, - collname, - self.__batch_size, - self.__id, - self.__collection.codec_options)) + self._getmore_class(dbname, + collname, + self.__batch_size, + self.__id, + self.__collection.codec_options)) else: # Cursor id is zero nothing else to return self.__killed = True return len(self.__data) + @property + def _collection(self): + return self.__collection + @property def alive(self): """Does this cursor have the potential to return more data? @@ -247,3 +258,31 @@ class CommandCursor(object): def __exit__(self, exc_type, exc_val, exc_tb): self.close() + + +class RawBatchCommandCursor(CommandCursor): + _getmore_class = _RawBatchGetMore + + def __init__(self, collection, cursor_info, address, retrieved=0): + """Create a new cursor / iterator over raw batches of BSON data. + + Should not be called directly by application developers - + see :meth:`~pymongo.collection.Collection.aggregate_raw_batches` + instead. + + .. mongodoc:: cursors + """ + assert not cursor_info.get('firstBatch') + super(RawBatchCommandCursor, self).__init__( + collection, cursor_info, address, retrieved) + + db = self._collection.database + if db.outgoing_manipulators or db.outgoing_copying_manipulators: + raise InvalidOperation("Raw batches are not compatible with" + " SON manipulators.") + + def _unpack_response(self, response, cursor_id, codec_options): + return helpers._raw_response(response, cursor_id) + + def __getitem__(self, index): + raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor") diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 2165353ad..137f71e19 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -34,7 +34,12 @@ from pymongo.errors import (AutoReconnect, InvalidOperation, NotMasterError, OperationFailure) -from pymongo.message import _CursorAddress, _GetMore, _Query, _convert_exception +from pymongo.message import (_convert_exception, + _CursorAddress, + _GetMore, + _RawBatchGetMore, + _Query, + _RawBatchQuery) from pymongo.read_preferences import ReadPreference _QUERY_OPTIONS = { @@ -1203,50 +1208,29 @@ class Cursor(object): return y -class _RawQuery(_Query): - def use_command(self, socket_info, exhaust): - # Compatibility checks. - super(_RawQuery, self).use_command(socket_info, exhaust) - - return False - - def get_message(self, set_slave_ok, is_mongos, use_cmd=False): - # Always pass False for use_cmd. - return super(_RawQuery, self).get_message(set_slave_ok, is_mongos, - False) - - -class _RawGetMore(_GetMore): - def use_command(self, socket_info, exhaust): - return False - - def get_message(self, set_slave_ok, is_mongos, use_cmd=False): - # Always pass False for use_cmd. - return super(_RawGetMore, self).get_message(set_slave_ok, is_mongos, - False) - - -class RawBSONCursor(Cursor): +class RawBatchCursor(Cursor): """A cursor / iterator over raw batches of BSON data from a query result.""" - _query_class = _RawQuery - _getmore_class = _RawGetMore + _query_class = _RawBatchQuery + _getmore_class = _RawBatchGetMore def __init__(self, *args, **kwargs): """Create a new cursor / iterator over raw batches of BSON data. Should not be called directly by application developers - - see :meth:`~pymongo.collection.Collection.find_raw` + see :meth:`~pymongo.collection.Collection.find_raw_batches` instead. .. mongodoc:: cursors """ - if kwargs.get('manipulate'): - raise InvalidOperation( - "Cannot use RawBSONCursor with manipulate=True") - + manipulate = kwargs.get('manipulate') kwargs['manipulate'] = False - super(RawBSONCursor, self).__init__(*args, **kwargs) + super(RawBatchCursor, self).__init__(*args, **kwargs) + + # Throw only after cursor's initialized, to prevent errors in __del__. + if manipulate: + raise InvalidOperation( + "Cannot use RawBatchCursor with manipulate=True") def _unpack_response(self, response, cursor_id, codec_options): return helpers._raw_response(response, cursor_id) @@ -1260,4 +1244,4 @@ class RawBSONCursor(Cursor): return clone.explain() def __getitem__(self, index): - raise InvalidOperation("Cannot call __getitem__ on RawBSONCursor") + raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor") diff --git a/pymongo/message.py b/pymongo/message.py index db0fed2f3..80a4e44cc 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -353,6 +353,29 @@ class _GetMore(object): return get_more(ns, self.ntoreturn, self.cursor_id) +class _RawBatchQuery(_Query): + def use_command(self, socket_info, exhaust): + # Compatibility checks. + super(_RawBatchQuery, self).use_command(socket_info, exhaust) + + return False + + def get_message(self, set_slave_ok, is_mongos, use_cmd=False): + # Always pass False for use_cmd. + return super(_RawBatchQuery, self).get_message(set_slave_ok, is_mongos, + False) + + +class _RawBatchGetMore(_GetMore): + def use_command(self, socket_info, exhaust): + return False + + def get_message(self, set_slave_ok, is_mongos, use_cmd=False): + # Always pass False for use_cmd. + return super(_RawBatchGetMore, self).get_message(set_slave_ok, is_mongos, + False) + + class _CursorAddress(tuple): """The server address (host, port) of a cursor, with namespace property.""" diff --git a/test/test_collection.py b/test/test_collection.py index 142e34097..11a43ec19 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1581,9 +1581,6 @@ class TestCollection(IntegrationTest): # Force a getMore cursor._CommandCursor__data.clear() next(cursor) - # startingFrom for a command cursor doesn't include the initial batch - # returned by the command. - self.assertEqual(5, cursor._CommandCursor__retrieved) # batchSize - 1 self.assertEqual(4, len(cursor._CommandCursor__data)) # Exhaust the cursor. There shouldn't be any errors. diff --git a/test/test_cursor.py b/test/test_cursor.py index 445166fc9..f11407d8d 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -42,7 +42,7 @@ from pymongo.read_concern import ReadConcern from test import (client_context, SkipTest, unittest, - IntegrationTest) + IntegrationTest, Version) from test.utils import (EventListener, ignore_deprecations, rs_or_single_client, @@ -1337,25 +1337,30 @@ class TestCursor(IntegrationTest): self.assertEqual(0, len(results["started"])) -class TestRawBSONCursor(IntegrationTest): +class TestRawBatchCursor(IntegrationTest): def test_find_raw(self): c = self.db.test c.drop() docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)] c.insert_many(docs) - batches = list(c.find_raw().sort('_id')) + batches = list(c.find_raw_batches().sort('_id')) self.assertEqual(1, len(batches)) self.assertEqual(docs, decode_all(batches[0])) + def test_manipulate(self): + c = self.db.test + with self.assertRaises(InvalidOperation): + c.find_raw_batches(manipulate=True) + def test_explain(self): c = self.db.test c.insert_one({}) - explanation = c.find_raw().explain() + explanation = c.find_raw_batches().explain() self.assertIsInstance(explanation, dict) def test_clone(self): - cursor = self.db.test.find_raw() - # Copy of a RawBSONCursor is also a RawBSONCursor, not a Cursor. + cursor = self.db.test.find_raw_batches() + # Copy of a RawBatchCursor is also a RawBatchCursor, not a Cursor. self.assertIsInstance(next(cursor.clone()), bytes) self.assertIsInstance(next(copy.copy(cursor)), bytes) @@ -1364,39 +1369,39 @@ class TestRawBSONCursor(IntegrationTest): c = self.db.test c.drop() c.insert_many({'_id': i} for i in range(200)) - result = b''.join(c.find_raw(cursor_type=CursorType.EXHAUST)) + result = b''.join(c.find_raw_batches(cursor_type=CursorType.EXHAUST)) self.assertEqual([{'_id': i} for i in range(200)], decode_all(result)) def test_server_error(self): with self.assertRaises(OperationFailure) as exc: - next(self.db.test.find_raw({'x': {'$bad': 1}})) + next(self.db.test.find_raw_batches({'x': {'$bad': 1}})) # The server response was decoded, not left raw. self.assertIsInstance(exc.exception.details, dict) def test_get_item(self): with self.assertRaises(InvalidOperation): - self.db.test.find_raw()[0] + self.db.test.find_raw_batches()[0] @client_context.require_version_min(3, 4) def test_collation(self): - next(self.db.test.find_raw(collation=Collation('en_US'))) + next(self.db.test.find_raw_batches(collation=Collation('en_US'))) @client_context.require_version_max(3, 2) def test_collation_error(self): with self.assertRaises(ConfigurationError): - next(self.db.test.find_raw(collation=Collation('en_US'))) + next(self.db.test.find_raw_batches(collation=Collation('en_US'))) @client_context.require_version_min(3, 2) def test_read_concern(self): c = self.db.get_collection("test", read_concern=ReadConcern("majority")) - next(c.find_raw()) + next(c.find_raw_batches()) @client_context.require_version_max(3, 1) def test_read_concern_error(self): c = self.db.get_collection("test", read_concern=ReadConcern("majority")) with self.assertRaises(ConfigurationError): - next(c.find_raw()) + next(c.find_raw_batches()) def test_monitoring(self): listener = EventListener() @@ -1406,7 +1411,7 @@ class TestRawBSONCursor(IntegrationTest): c.insert_many([{'_id': i} for i in range(10)]) listener.results.clear() - cursor = c.find_raw(batch_size=4) + cursor = c.find_raw_batches(batch_size=4) # First raw batch of 4 documents. next(cursor) @@ -1447,5 +1452,96 @@ class TestRawBSONCursor(IntegrationTest): tuple(cursor) +class TestRawBatchCommandCursor(IntegrationTest): + @classmethod + def setUpClass(cls): + super(TestRawBatchCommandCursor, cls).setUpClass() + if not client_context.version >= Version(2, 6): + raise SkipTest("Requires MongoDB 2.6 aggregation cursors") + + def test_aggregate_raw(self): + c = self.db.test + c.drop() + docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)] + c.insert_many(docs) + batches = list(c.aggregate_raw_batches([{'$sort': {'_id': 1}}])) + self.assertEqual(1, len(batches)) + self.assertEqual(docs, decode_all(batches[0])) + + def test_server_error(self): + c = self.db.test + c.drop() + docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)] + c.insert_many(docs) + c.insert_one({'_id': 10, 'x': 'not a number'}) + + with self.assertRaises(OperationFailure) as exc: + list(self.db.test.aggregate_raw_batches([{ + '$sort': {'_id': 1}, + }, { + '$project': {'x': {'$multiply': [2, '$x']}} + }], batchSize=4)) + + # The server response was decoded, not left raw. + self.assertIsInstance(exc.exception.details, dict) + + def test_get_item(self): + with self.assertRaises(InvalidOperation): + self.db.test.aggregate_raw_batches([])[0] + + @client_context.require_version_min(3, 4) + def test_collation(self): + next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US'))) + + @client_context.require_version_max(3, 2) + def test_collation_error(self): + with self.assertRaises(ConfigurationError): + next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US'))) + + def test_monitoring(self): + listener = EventListener() + client = rs_or_single_client(event_listeners=[listener]) + c = client.pymongo_test.test + c.drop() + c.insert_many([{'_id': i} for i in range(10)]) + + listener.results.clear() + cursor = c.aggregate_raw_batches([{'$sort': {'_id': 1}}], batchSize=4) + + # Start cursor, no initial batch. + started = listener.results['started'][0] + succeeded = listener.results['succeeded'][0] + self.assertEqual(0, len(listener.results['failed'])) + self.assertEqual('aggregate', started.command_name) + self.assertEqual('pymongo_test', started.database_name) + self.assertEqual('aggregate', succeeded.command_name) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["ns"], "pymongo_test.test") + + # First batch is empty. + self.assertEqual(len(csr["firstBatch"]), 0) + listener.results.clear() + + # Batches of 4 documents. + n = 0 + for batch in cursor: + results = listener.results + started = results['started'][0] + succeeded = results['succeeded'][0] + self.assertEqual(0, len(results['failed'])) + self.assertEqual('getMore', started.command_name) + self.assertEqual('pymongo_test', started.database_name) + self.assertEqual('getMore', succeeded.command_name) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["ns"], "pymongo_test.test") + self.assertEqual(len(csr["nextBatch"]), 1) + self.assertEqual(csr["nextBatch"][0], batch) + self.assertEqual(decode_all(batch), + [{'_id': i} for i in range(n, min(n + 4, 10))]) + + n += 4 + listener.results.clear() + + if __name__ == "__main__": unittest.main()