PYTHON-1362 - Add find/aggregate_raw_batches()

Rename find_raw to find_raw_batches, and add aggregate_raw_batches.
Rename RawBSONCursor and RawBSONCommandCursor to RawBatchCursor and
RawBatchCommandCursor.
This commit is contained in:
A. Jesse Jiryu Davis 2017-08-13 16:06:42 -04:00
parent 4957589024
commit cfb30e91c0
8 changed files with 304 additions and 128 deletions

View File

@ -24,4 +24,4 @@
.. automethod:: __getitem__
.. autoclass:: pymongo.cursor.RawBSONCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None)
.. autoclass:: pymongo.cursor.RawBatchCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None)

View File

@ -8,6 +8,12 @@ This version drops support for MongoDB versions older than 2.6. If connecting to
a MongoDB 2.4 server or older, PyMongo now throws a
:exc:`~pymongo.errors.ConfigurationError`.
Highlights include:
- New methods :meth:`~pymongo.collection.Collection.find_raw_batches` and
:meth:`~pymongo.collection.Collection.aggregate_raw_batches` for use with
external libraries that can parse raw batches of BSON data.
Changes in Version 3.5.1
------------------------

View File

@ -30,9 +30,9 @@ from pymongo import (common,
helpers,
message)
from pymongo.bulk import BulkOperationBuilder, _Bulk
from pymongo.command_cursor import CommandCursor
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
from pymongo.collation import validate_collation_or_none
from pymongo.cursor import Cursor, RawBSONCursor
from pymongo.cursor import Cursor, RawBatchCursor
from pymongo.errors import ConfigurationError, InvalidName, OperationFailure
from pymongo.helpers import _check_write_command_response
from pymongo.helpers import _UNICODE_REPLACE_CODEC_OPTIONS
@ -1278,11 +1278,11 @@ class Collection(common.BaseObject):
"""
return Cursor(self, *args, **kwargs)
def find_raw(self, *args, **kwargs):
def find_raw_batches(self, *args, **kwargs):
"""Query the database and retrieve batches of raw BSON.
Takes the same parameters as :meth:`find` but returns a
:class:`~pymongo.cursor.RawBSONCursor`.
:class:`~pymongo.cursor.RawBatchCursor`.
This example demonstrates how to work with raw batches, but in practice
raw batches should be passed to an external library that can decode
@ -1290,13 +1290,13 @@ class Collection(common.BaseObject):
:mod:`bson` module.
>>> import bson
>>> cursor = db.test.find_raw()
>>> cursor = db.test.find_raw_batches()
>>> for batch in cursor:
... print(bson.decode_all(batch))
.. versionadded:: 3.6
"""
return RawBSONCursor(self, *args, **kwargs)
return RawBatchCursor(self, *args, **kwargs)
def parallel_scan(self, num_cursors, **kwargs):
"""Scan this entire collection in parallel.
@ -1821,6 +1821,69 @@ class Collection(common.BaseObject):
return options
def _aggregate(self, pipeline, cursor_class, first_batch_size, **kwargs):
if not isinstance(pipeline, list):
raise TypeError("pipeline must be a list")
if "explain" in kwargs:
raise ConfigurationError("The explain option is not supported. "
"Use Database.command instead.")
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd = SON([("aggregate", self.__name),
("pipeline", pipeline)])
# Remove things that are not command options.
batch_size = common.validate_non_negative_integer_or_none(
"batchSize", kwargs.pop("batchSize", None))
use_cursor = common.validate_boolean(
"useCursor", kwargs.pop("useCursor", True))
# If the server does not support the "cursor" option we
# ignore useCursor and batchSize.
with self._socket_for_reads() as (sock_info, slave_ok):
if sock_info.max_wire_version > 0:
if use_cursor:
if "cursor" not in kwargs:
kwargs["cursor"] = {}
if first_batch_size is not None:
kwargs["cursor"]["batchSize"] = first_batch_size
dollar_out = pipeline and '$out' in pipeline[-1]
if (sock_info.max_wire_version >= 5 and dollar_out and
self.write_concern):
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
# Apply this Collection's read concern if $out is not in the
# pipeline.
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
if dollar_out:
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=True,
collation=collation)
else:
result = self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern,
collation=collation)
else:
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=dollar_out,
collation=collation)
if "cursor" in result:
cursor = result["cursor"]
else:
# Pre-MongoDB 2.6. Fake a cursor.
cursor = {
"id": 0,
"firstBatch": result["result"],
"ns": self.full_name,
}
return cursor_class(
self, cursor, sock_info.address).batch_size(batch_size or 0)
def aggregate(self, pipeline, **kwargs):
"""Perform an aggregation using the aggregation framework on this
collection.
@ -1892,66 +1955,34 @@ class Collection(common.BaseObject):
.. _aggregate command:
http://docs.mongodb.org/manual/applications/aggregation
"""
if not isinstance(pipeline, list):
raise TypeError("pipeline must be a list")
return self._aggregate(pipeline,
CommandCursor,
kwargs.get('batchSize'),
**kwargs)
if "explain" in kwargs:
raise ConfigurationError("The explain option is not supported. "
"Use Database.command instead.")
collation = validate_collation_or_none(kwargs.pop('collation', None))
def aggregate_raw_batches(self, pipeline, **kwargs):
"""Perform an aggregation and retrieve batches of raw BSON.
cmd = SON([("aggregate", self.__name),
("pipeline", pipeline)])
Takes the same parameters as :meth:`aggregate` but returns a
:class:`~pymongo.cursor.RawBatchCursor`.
# Remove things that are not command options.
batch_size = common.validate_positive_integer_or_none(
"batchSize", kwargs.pop("batchSize", None))
use_cursor = common.validate_boolean(
"useCursor", kwargs.pop("useCursor", True))
# If the server does not support the "cursor" option we
# ignore useCursor and batchSize.
with self._socket_for_reads() as (sock_info, slave_ok):
if sock_info.max_wire_version > 0:
if use_cursor:
if "cursor" not in kwargs:
kwargs["cursor"] = {}
if batch_size is not None:
kwargs["cursor"]["batchSize"] = batch_size
This example demonstrates how to work with raw batches, but in practice
raw batches should be passed to an external library that can decode
BSON into another data type, rather than used with PyMongo's
:mod:`bson` module.
dollar_out = pipeline and '$out' in pipeline[-1]
if (sock_info.max_wire_version >= 5 and dollar_out and
self.write_concern):
cmd['writeConcern'] = self.write_concern.document
>>> import bson
>>> cursor = db.test.aggregate_raw_batches([
... {'$project': {'x': {'$multiply': [2, '$x']}}}])
>>> for batch in cursor:
... print(bson.decode_all(batch))
cmd.update(kwargs)
# Apply this Collection's read concern if $out is not in the
# pipeline.
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
if dollar_out:
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=True,
collation=collation)
else:
result = self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern,
collation=collation)
else:
result = self._command(sock_info, cmd, slave_ok,
parse_write_concern_error=dollar_out,
collation=collation)
if "cursor" in result:
cursor = result["cursor"]
else:
# Pre-MongoDB 2.6. Fake a cursor.
cursor = {
"id": 0,
"firstBatch": result["result"],
"ns": self.full_name,
}
return CommandCursor(
self, cursor, sock_info.address).batch_size(batch_size or 0)
.. versionadded:: 3.6
"""
return self._aggregate(pipeline,
RawBatchCommandCursor,
0,
**kwargs)
def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
"""Perform a query similar to an SQL *group by* operation.

View File

@ -20,22 +20,29 @@ from collections import deque
from bson.py3compat import integer_types
from pymongo import helpers
from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure
from pymongo.message import _CursorAddress, _GetMore, _convert_exception
from pymongo.errors import (AutoReconnect,
InvalidOperation,
NotMasterError,
OperationFailure)
from pymongo.message import (_convert_exception,
_CursorAddress,
_GetMore,
_RawBatchGetMore)
class CommandCursor(object):
"""A cursor / iterator over command cursors.
"""
"""A cursor / iterator over command cursors."""
_getmore_class = _GetMore
def __init__(self, collection, cursor_info, address, retrieved=0):
"""Create a new command cursor.
The parameter 'retrieved' is unused.
"""
self.__collection = collection
self.__id = cursor_info['id']
self.__address = address
self.__data = deque(cursor_info['firstBatch'])
self.__retrieved = retrieved
self.__batch_size = 0
self.__killed = (self.__id == 0)
@ -115,9 +122,9 @@ class CommandCursor(object):
if publish:
start = datetime.datetime.now()
try:
doc = helpers._unpack_response(response.data,
self.__id,
self.__collection.codec_options)
doc = self._unpack_response(response.data,
self.__id,
self.__collection.codec_options)
if from_command:
helpers._check_command_response(doc['data'][0])
@ -154,11 +161,9 @@ class CommandCursor(object):
cursor = doc['data'][0]['cursor']
documents = cursor['nextBatch']
self.__id = cursor['id']
self.__retrieved += len(documents)
else:
documents = doc["data"]
self.__id = doc["cursor_id"]
self.__retrieved += doc["number_returned"]
if publish:
duration = (datetime.datetime.now() - start) + cmd_duration
@ -174,6 +179,8 @@ class CommandCursor(object):
self.__killed = True
self.__data = deque(documents)
def _unpack_response(self, response, cursor_id, codec_options):
return helpers._unpack_response(response, cursor_id, codec_options)
def _refresh(self):
"""Refreshes the cursor with more data from the server.
@ -188,17 +195,21 @@ class CommandCursor(object):
if self.__id: # Get More
dbname, collname = self.__ns.split('.', 1)
self.__send_message(
_GetMore(dbname,
collname,
self.__batch_size,
self.__id,
self.__collection.codec_options))
self._getmore_class(dbname,
collname,
self.__batch_size,
self.__id,
self.__collection.codec_options))
else: # Cursor id is zero nothing else to return
self.__killed = True
return len(self.__data)
@property
def _collection(self):
return self.__collection
@property
def alive(self):
"""Does this cursor have the potential to return more data?
@ -247,3 +258,31 @@ class CommandCursor(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class RawBatchCommandCursor(CommandCursor):
_getmore_class = _RawBatchGetMore
def __init__(self, collection, cursor_info, address, retrieved=0):
"""Create a new cursor / iterator over raw batches of BSON data.
Should not be called directly by application developers -
see :meth:`~pymongo.collection.Collection.aggregate_raw_batches`
instead.
.. mongodoc:: cursors
"""
assert not cursor_info.get('firstBatch')
super(RawBatchCommandCursor, self).__init__(
collection, cursor_info, address, retrieved)
db = self._collection.database
if db.outgoing_manipulators or db.outgoing_copying_manipulators:
raise InvalidOperation("Raw batches are not compatible with"
" SON manipulators.")
def _unpack_response(self, response, cursor_id, codec_options):
return helpers._raw_response(response, cursor_id)
def __getitem__(self, index):
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")

View File

@ -34,7 +34,12 @@ from pymongo.errors import (AutoReconnect,
InvalidOperation,
NotMasterError,
OperationFailure)
from pymongo.message import _CursorAddress, _GetMore, _Query, _convert_exception
from pymongo.message import (_convert_exception,
_CursorAddress,
_GetMore,
_RawBatchGetMore,
_Query,
_RawBatchQuery)
from pymongo.read_preferences import ReadPreference
_QUERY_OPTIONS = {
@ -1203,50 +1208,29 @@ class Cursor(object):
return y
class _RawQuery(_Query):
def use_command(self, socket_info, exhaust):
# Compatibility checks.
super(_RawQuery, self).use_command(socket_info, exhaust)
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawQuery, self).get_message(set_slave_ok, is_mongos,
False)
class _RawGetMore(_GetMore):
def use_command(self, socket_info, exhaust):
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawGetMore, self).get_message(set_slave_ok, is_mongos,
False)
class RawBSONCursor(Cursor):
class RawBatchCursor(Cursor):
"""A cursor / iterator over raw batches of BSON data from a query result."""
_query_class = _RawQuery
_getmore_class = _RawGetMore
_query_class = _RawBatchQuery
_getmore_class = _RawBatchGetMore
def __init__(self, *args, **kwargs):
"""Create a new cursor / iterator over raw batches of BSON data.
Should not be called directly by application developers -
see :meth:`~pymongo.collection.Collection.find_raw`
see :meth:`~pymongo.collection.Collection.find_raw_batches`
instead.
.. mongodoc:: cursors
"""
if kwargs.get('manipulate'):
raise InvalidOperation(
"Cannot use RawBSONCursor with manipulate=True")
manipulate = kwargs.get('manipulate')
kwargs['manipulate'] = False
super(RawBSONCursor, self).__init__(*args, **kwargs)
super(RawBatchCursor, self).__init__(*args, **kwargs)
# Throw only after cursor's initialized, to prevent errors in __del__.
if manipulate:
raise InvalidOperation(
"Cannot use RawBatchCursor with manipulate=True")
def _unpack_response(self, response, cursor_id, codec_options):
return helpers._raw_response(response, cursor_id)
@ -1260,4 +1244,4 @@ class RawBSONCursor(Cursor):
return clone.explain()
def __getitem__(self, index):
raise InvalidOperation("Cannot call __getitem__ on RawBSONCursor")
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")

View File

@ -353,6 +353,29 @@ class _GetMore(object):
return get_more(ns, self.ntoreturn, self.cursor_id)
class _RawBatchQuery(_Query):
def use_command(self, socket_info, exhaust):
# Compatibility checks.
super(_RawBatchQuery, self).use_command(socket_info, exhaust)
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchQuery, self).get_message(set_slave_ok, is_mongos,
False)
class _RawBatchGetMore(_GetMore):
def use_command(self, socket_info, exhaust):
return False
def get_message(self, set_slave_ok, is_mongos, use_cmd=False):
# Always pass False for use_cmd.
return super(_RawBatchGetMore, self).get_message(set_slave_ok, is_mongos,
False)
class _CursorAddress(tuple):
"""The server address (host, port) of a cursor, with namespace property."""

View File

@ -1581,9 +1581,6 @@ class TestCollection(IntegrationTest):
# Force a getMore
cursor._CommandCursor__data.clear()
next(cursor)
# startingFrom for a command cursor doesn't include the initial batch
# returned by the command.
self.assertEqual(5, cursor._CommandCursor__retrieved)
# batchSize - 1
self.assertEqual(4, len(cursor._CommandCursor__data))
# Exhaust the cursor. There shouldn't be any errors.

View File

@ -42,7 +42,7 @@ from pymongo.read_concern import ReadConcern
from test import (client_context,
SkipTest,
unittest,
IntegrationTest)
IntegrationTest, Version)
from test.utils import (EventListener,
ignore_deprecations,
rs_or_single_client,
@ -1337,25 +1337,30 @@ class TestCursor(IntegrationTest):
self.assertEqual(0, len(results["started"]))
class TestRawBSONCursor(IntegrationTest):
class TestRawBatchCursor(IntegrationTest):
def test_find_raw(self):
c = self.db.test
c.drop()
docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)]
c.insert_many(docs)
batches = list(c.find_raw().sort('_id'))
batches = list(c.find_raw_batches().sort('_id'))
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
def test_manipulate(self):
c = self.db.test
with self.assertRaises(InvalidOperation):
c.find_raw_batches(manipulate=True)
def test_explain(self):
c = self.db.test
c.insert_one({})
explanation = c.find_raw().explain()
explanation = c.find_raw_batches().explain()
self.assertIsInstance(explanation, dict)
def test_clone(self):
cursor = self.db.test.find_raw()
# Copy of a RawBSONCursor is also a RawBSONCursor, not a Cursor.
cursor = self.db.test.find_raw_batches()
# Copy of a RawBatchCursor is also a RawBatchCursor, not a Cursor.
self.assertIsInstance(next(cursor.clone()), bytes)
self.assertIsInstance(next(copy.copy(cursor)), bytes)
@ -1364,39 +1369,39 @@ class TestRawBSONCursor(IntegrationTest):
c = self.db.test
c.drop()
c.insert_many({'_id': i} for i in range(200))
result = b''.join(c.find_raw(cursor_type=CursorType.EXHAUST))
result = b''.join(c.find_raw_batches(cursor_type=CursorType.EXHAUST))
self.assertEqual([{'_id': i} for i in range(200)], decode_all(result))
def test_server_error(self):
with self.assertRaises(OperationFailure) as exc:
next(self.db.test.find_raw({'x': {'$bad': 1}}))
next(self.db.test.find_raw_batches({'x': {'$bad': 1}}))
# The server response was decoded, not left raw.
self.assertIsInstance(exc.exception.details, dict)
def test_get_item(self):
with self.assertRaises(InvalidOperation):
self.db.test.find_raw()[0]
self.db.test.find_raw_batches()[0]
@client_context.require_version_min(3, 4)
def test_collation(self):
next(self.db.test.find_raw(collation=Collation('en_US')))
next(self.db.test.find_raw_batches(collation=Collation('en_US')))
@client_context.require_version_max(3, 2)
def test_collation_error(self):
with self.assertRaises(ConfigurationError):
next(self.db.test.find_raw(collation=Collation('en_US')))
next(self.db.test.find_raw_batches(collation=Collation('en_US')))
@client_context.require_version_min(3, 2)
def test_read_concern(self):
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
next(c.find_raw())
next(c.find_raw_batches())
@client_context.require_version_max(3, 1)
def test_read_concern_error(self):
c = self.db.get_collection("test", read_concern=ReadConcern("majority"))
with self.assertRaises(ConfigurationError):
next(c.find_raw())
next(c.find_raw_batches())
def test_monitoring(self):
listener = EventListener()
@ -1406,7 +1411,7 @@ class TestRawBSONCursor(IntegrationTest):
c.insert_many([{'_id': i} for i in range(10)])
listener.results.clear()
cursor = c.find_raw(batch_size=4)
cursor = c.find_raw_batches(batch_size=4)
# First raw batch of 4 documents.
next(cursor)
@ -1447,5 +1452,96 @@ class TestRawBSONCursor(IntegrationTest):
tuple(cursor)
class TestRawBatchCommandCursor(IntegrationTest):
@classmethod
def setUpClass(cls):
super(TestRawBatchCommandCursor, cls).setUpClass()
if not client_context.version >= Version(2, 6):
raise SkipTest("Requires MongoDB 2.6 aggregation cursors")
def test_aggregate_raw(self):
c = self.db.test
c.drop()
docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)]
c.insert_many(docs)
batches = list(c.aggregate_raw_batches([{'$sort': {'_id': 1}}]))
self.assertEqual(1, len(batches))
self.assertEqual(docs, decode_all(batches[0]))
def test_server_error(self):
c = self.db.test
c.drop()
docs = [{'_id': i, 'x': 3.0 * i} for i in range(10)]
c.insert_many(docs)
c.insert_one({'_id': 10, 'x': 'not a number'})
with self.assertRaises(OperationFailure) as exc:
list(self.db.test.aggregate_raw_batches([{
'$sort': {'_id': 1},
}, {
'$project': {'x': {'$multiply': [2, '$x']}}
}], batchSize=4))
# The server response was decoded, not left raw.
self.assertIsInstance(exc.exception.details, dict)
def test_get_item(self):
with self.assertRaises(InvalidOperation):
self.db.test.aggregate_raw_batches([])[0]
@client_context.require_version_min(3, 4)
def test_collation(self):
next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US')))
@client_context.require_version_max(3, 2)
def test_collation_error(self):
with self.assertRaises(ConfigurationError):
next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US')))
def test_monitoring(self):
listener = EventListener()
client = rs_or_single_client(event_listeners=[listener])
c = client.pymongo_test.test
c.drop()
c.insert_many([{'_id': i} for i in range(10)])
listener.results.clear()
cursor = c.aggregate_raw_batches([{'$sort': {'_id': 1}}], batchSize=4)
# Start cursor, no initial batch.
started = listener.results['started'][0]
succeeded = listener.results['succeeded'][0]
self.assertEqual(0, len(listener.results['failed']))
self.assertEqual('aggregate', started.command_name)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('aggregate', succeeded.command_name)
csr = succeeded.reply["cursor"]
self.assertEqual(csr["ns"], "pymongo_test.test")
# First batch is empty.
self.assertEqual(len(csr["firstBatch"]), 0)
listener.results.clear()
# Batches of 4 documents.
n = 0
for batch in cursor:
results = listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertEqual('getMore', started.command_name)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('getMore', succeeded.command_name)
csr = succeeded.reply["cursor"]
self.assertEqual(csr["ns"], "pymongo_test.test")
self.assertEqual(len(csr["nextBatch"]), 1)
self.assertEqual(csr["nextBatch"][0], batch)
self.assertEqual(decode_all(batch),
[{'_id': i} for i in range(n, min(n + 4, 10))])
n += 4
listener.results.clear()
if __name__ == "__main__":
unittest.main()