Add helper for aggregate command PYTHON-366

Added collection helper method.
Created an aggregation example covering:
 * aggregate
 * map reduce
 * group
This commit is contained in:
Ross Lawley 2012-07-12 09:43:55 +01:00
parent 8bd18b3cd1
commit a5432ea0f3
7 changed files with 245 additions and 122 deletions

View File

@ -42,6 +42,7 @@
.. automethod:: reindex
.. automethod:: index_information
.. automethod:: options
.. automethod:: aggregate
.. automethod:: group
.. automethod:: rename
.. automethod:: distinct

View File

@ -706,7 +706,7 @@ Changes in Version 1.2
get around some issues with queries on fields named ``query``
- enforce 4MB document limit on the client side
- added :meth:`~pymongo.collection.Collection.map_reduce` helper - see
:doc:`example <examples/map_reduce>`
:doc:`example <examples/aggregation>`
- added :meth:`~pymongo.cursor.Cursor.distinct` method on
:class:`~pymongo.cursor.Cursor` instances to allow distinct with
queries

View File

@ -0,0 +1,192 @@
Aggregation Examples
====================
There are several methods of performing aggregations in MongoDB. These
examples cover the new aggregation framework, using map reduce and using the
group method.
.. testsetup::
>>> from pymongo import Connection
>>> connection = Connection()
>>> connection.drop_database('aggregation_example')
Setup
-----
To start, we'll insert some example data which we can perform
aggregations on:
.. doctest::
>>> from pymongo import Connection
>>> db = Connection().aggregation_example
>>> db.things.insert({"x": 1, "tags": ["dog", "cat"]})
ObjectId('...')
>>> db.things.insert({"x": 2, "tags": ["cat"]})
ObjectId('...')
>>> db.things.insert({"x": 2, "tags": ["mouse", "cat", "dog"]})
ObjectId('...')
>>> db.things.insert({"x": 3, "tags": []})
ObjectId('...')
Aggregation Framework
---------------------
This example shows how to use the
:meth:`~pymongo.collection.Collection.aggregate` method to use the aggregation
framework. We'll perform a simple aggregation to count the number of
occurrences for each tag in the ``tags`` array, across the entire collection.
To achieve this we need to pass in three operations to the pipeline.
First, we need to unwind the ``tags`` array, then group by the tags and
sum them up, finally we sort by count.
As python dictionaries don't maintain order you should use :class:`~bson.son.SON`
or :class:`collections.OrderedDict` where explicit ordering is required
eg "$sort":
.. note::
aggregate requires server version **>= 2.1.1**. The PyMongo
:meth:`~pymongo.collection.Collection.aggregate` helper requires
PyMongo version **>= 2.2.1+**.
.. doctest::
>>> from bson.son import SON
>>> db.things.aggregate([
... {"$unwind": "$tags"},
... {"$group": {"_id": "$tags", "count": {"$sum": 1}}},
... {"$sort": SON([("count", -1), ("_id", -1)])}
... ])
...
{u'ok': 1.0, u'result': [{u'count': 3, u'_id': u'cat'}, {u'count': 2, u'_id': u'dog'}, {u'count': 1, u'_id': u'mouse'}, {u'count': 1, u'_id': None}]}
As well as simple aggregations the aggregation framework provides projection
capabilities to reshape the returned data. Using projections and aggregation,
you can add computed fields, create new virtual sub-objects, and extract
sub-fields into the top-level of results.
.. seealso:: The full documentation for MongoDB's `aggregation framework
<http://docs.mongodb.org/manual/applications/aggregation>`_
Map/Reduce
----------
Another option for aggregation is to use the map reduce framework. Here we
will define **map** and **reduce** functions to also count he number of
occurrences for each tag in the ``tags`` array, across the entire collection.
Our **map** function just emits a single `(key, 1)` pair for each tag in
the array:
.. doctest::
>>> from bson.code import Code
>>> mapper = Code("""
... function () {
... this.tags.forEach(function(z) {
... emit(z, 1);
... });
... }
... """)
The **reduce** function sums over all of the emitted values for a given key:
.. doctest::
>>> reducer = Code("""
... function (key, values) {
... var total = 0;
... for (var i = 0; i < values.length; i++) {
... total += values[i];
... }
... return total;
... }
... """)
.. note:: We can't just return ``values.length`` as the **reduce** function
might be called iteratively on the results of other reduce steps.
Finally, we call :meth:`~pymongo.collection.Collection.map_reduce` and
iterate over the result collection:
.. doctest::
>>> result = db.things.map_reduce(mapper, reducer, "myresults")
>>> for doc in result.find():
... print doc
...
{u'_id': u'cat', u'value': 3.0}
{u'_id': u'dog', u'value': 2.0}
{u'_id': u'mouse', u'value': 1.0}
Advanced Map/Reduce
-------------------
PyMongo's API supports all of the features of MongoDB's map/reduce engine.
One interesting feature is the ability to get more detailed results when
desired, by passing `full_response=True` to
:meth:`~pymongo.collection.Collection.map_reduce`. This returns the full
response to the map/reduce command, rather than just the result collection:
.. doctest::
>>> db.things.map_reduce(mapper, reducer, "myresults", full_response=True)
{u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': u'...'}
All of the optional map/reduce parameters are also supported, simply pass them
as keyword arguments. In this example we use the `query` parameter to limit the
documents that will be mapped over:
.. doctest::
>>> result = db.things.map_reduce(mapper, reducer, "myresults", query={"x": {"$lt": 2}})
>>> for doc in result.find():
... print doc
...
{u'_id': u'cat', u'value': 1.0}
{u'_id': u'dog', u'value': 1.0}
With MongoDB 1.8.0 or newer you can use :class:`~bson.son.SON` or
:class:`collections.OrderedDict` to specify a different database to store the
result collection:
.. doctest::
>>> from bson.son import SON
>>> db.things.map_reduce(mapper, reducer, out=SON([("replace", "results"), ("db", "outdb")]), full_response=True)
{u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': {u'db': ..., u'collection': ...}}
.. seealso:: The full list of options for MongoDB's `map reduce engine <http://www.mongodb.org/display/DOCS/MapReduce>`_
Group
-----
The :meth:`~pymongo.collection.Collection.group` method provides some of the
same functionality as SQL's GROUP BY. Simpler than a map reduce you need to
provide a key to group by, an initial value for the aggregation and a
reduce function.
.. note:: Doesn't work with sharded MongoDB configurations, use aggregation or
map/reduce instead of group().
Here we are doing a simple group and count of the occurrences ``x`` values:
.. doctest::
>>> reducer = Code("""
... function(obj, prev){
... prev.count++;
... }
... """)
...
>>> from bson.son import SON
>>> results = db.things.group(key={"x":1}, condition={}, initial={"count": 0}, reduce=reducer)
>>> for doc in results:
... print doc
{u'count': 1.0, u'x': 1.0}
{u'count': 2.0, u'x': 2.0}
{u'count': 1.0, u'x': 3.0}
.. seealso:: The full list of options for MongoDB's `group method <http://www.mongodb.org/display/DOCS/Aggregation#Aggregation-Group>`_

View File

@ -17,7 +17,7 @@ MongoDB, you can start it like so:
:maxdepth: 1
gridfs
map_reduce
aggregation
geo
replica_set
custom_type

View File

@ -1,119 +0,0 @@
Map/Reduce Example
==================
.. testsetup::
from pymongo import Connection
connection = Connection()
connection.drop_database('map_reduce_example')
This example shows how to use the
:meth:`~pymongo.collection.Collection.map_reduce` method to perform
map/reduce style aggregations on your data.
.. note::
Map/Reduce requires server version **>= 1.1.1**. The PyMongo
:meth:`~pymongo.collection.Collection.map_reduce` helper requires
PyMongo version **>= 1.2**.
Setup
-----
To start, we'll insert some example data which we can perform
map/reduce queries on:
.. doctest::
>>> from pymongo import Connection
>>> db = Connection().map_reduce_example
>>> db.things.insert({"x": 1, "tags": ["dog", "cat"]})
ObjectId('...')
>>> db.things.insert({"x": 2, "tags": ["cat"]})
ObjectId('...')
>>> db.things.insert({"x": 3, "tags": ["mouse", "cat", "dog"]})
ObjectId('...')
>>> db.things.insert({"x": 4, "tags": []})
ObjectId('...')
Basic Map/Reduce
----------------
Now we'll define our **map** and **reduce** functions. In this case
we're performing the same operation as in the `MongoDB Map/Reduce
documentation <http://www.mongodb.org/display/DOCS/MapReduce>`_ -
counting the number of occurrences for each tag in the ``tags`` array,
across the entire collection.
Our **map** function just emits a single `(key, 1)` pair for each tag in
the array:
.. doctest::
>>> from bson.code import Code
>>> mapper = Code("""
... function () {
... this.tags.forEach(function(z) {
... emit(z, 1);
... });
... }
... """)
The **reduce** function sums over all of the emitted values for a given key:
.. doctest::
>>> reducer = Code("""
... function (key, values) {
... var total = 0;
... for (var i = 0; i < values.length; i++) {
... total += values[i];
... }
... return total;
... }
... """)
.. note:: We can't just return ``values.length`` as the **reduce** function
might be called iteratively on the results of other reduce steps.
Finally, we call :meth:`~pymongo.collection.Collection.map_reduce` and
iterate over the result collection:
.. doctest::
>>> result = db.things.map_reduce(mapper, reducer, "myresults")
>>> for doc in result.find():
... print doc
...
{u'_id': u'cat', u'value': 3.0}
{u'_id': u'dog', u'value': 2.0}
{u'_id': u'mouse', u'value': 1.0}
Advanced Map/Reduce
-------------------
PyMongo's API supports all of the features of MongoDB's map/reduce engine. One interesting feature is the ability to get more detailed results when desired, by passing `full_response=True` to :meth:`~pymongo.collection.Collection.map_reduce`. This returns the full response to the map/reduce command, rather than just the result collection:
.. doctest::
>>> db.things.map_reduce(mapper, reducer, "myresults", full_response=True)
{u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': u'...'}
All of the optional map/reduce parameters are also supported, simply pass them as keyword arguments. In this example we use the `query` parameter to limit the documents that will be mapped over:
.. doctest::
>>> result = db.things.map_reduce(mapper, reducer, "myresults", query={"x": {"$lt": 3}})
>>> for doc in result.find():
... print doc
...
{u'_id': u'cat', u'value': 2.0}
{u'_id': u'dog', u'value': 1.0}
With MongoDB 1.8.0 or newer you can use :class:`~bson.son.SON` to specify a different database to store the result collection:
.. doctest::
>>> from bson.son import SON
>>> db.things.map_reduce(mapper, reducer, out=SON([("replace", "results"), ("db", "outdb")]), full_response=True)
{u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': {u'db': ..., u'collection': ...}}
.. seealso:: The full list of options for MongoDB's `map reduce engine <http://www.mongodb.org/display/DOCS/MapReduce>`_

View File

@ -889,6 +889,42 @@ class Collection(common.BaseObject):
return options
def aggregate(self, ops):
"""Perform an aggregation using the aggregation framework on this
collection.
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
:attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
`slave_okay` attribute of this instance is set to `True` the
`aggregate command`_. will be sent to a secondary or slave.
:Parameters:
- `ops`: a single command or list of aggregation commands
.. note:: Requires server version **>= 2.1.1**
.. versionadded:: 2.2.1+
.. _aggregate command:
http://docs.mongodb.org/manual/applications/aggregation
"""
if not isinstance(ops, (dict, list, tuple)):
raise TypeError("ops must be a dict, list or tuple")
if isinstance(ops, dict):
ops = [ops]
use_master = not self.slave_okay and not self.read_preference
return self.__database.command("aggregate", self.__name,
pipeline=ops,
read_preference=self.read_preference,
slave_okay=self.slave_okay,
_use_master=use_master)
# TODO key and condition ought to be optional, but deprecation
# could be painful as argument order would have to change.
def group(self, key, condition, initial, reduce, finalize=None):
@ -1031,7 +1067,7 @@ class Collection(common.BaseObject):
.. note:: Requires server version **>= 1.1.1**
.. seealso:: :doc:`/examples/map_reduce`
.. seealso:: :doc:`/examples/aggregation`
.. versionchanged:: 2.2
Removed deprecated arguments: merge_output and reduce_output

View File

@ -1047,6 +1047,19 @@ class TestCollection(unittest.TestCase):
self.assertEqual(db.test.find({'foo': 'bar'}).count(), 1)
self.assertEqual(db.test.find({'foo': re.compile(r'ba.*')}).count(), 2)
def test_aggregate(self):
db = self.db
db.drop_collection("test")
db.test.save({'foo': [1, 2]})
self.assertRaises(TypeError, db.test.aggregate, "wow")
ops = {"$project": {"_id": False, "foo": True}}
expected = {'ok': 1.0, 'result': [{'foo': [1, 2]}]}
self.assertEqual(expected, db.test.aggregate(ops))
self.assertEqual(expected, db.test.aggregate([ops]))
self.assertEqual(expected, db.test.aggregate((ops,)))
def test_group(self):
db = self.db
db.drop_collection("test")