diff --git a/doc/api/pymongo/collection.rst b/doc/api/pymongo/collection.rst index 600a401bd..606071de2 100644 --- a/doc/api/pymongo/collection.rst +++ b/doc/api/pymongo/collection.rst @@ -42,6 +42,7 @@ .. automethod:: reindex .. automethod:: index_information .. automethod:: options + .. automethod:: aggregate .. automethod:: group .. automethod:: rename .. automethod:: distinct diff --git a/doc/changelog.rst b/doc/changelog.rst index 423187434..97fabc4b2 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -706,7 +706,7 @@ Changes in Version 1.2 get around some issues with queries on fields named ``query`` - enforce 4MB document limit on the client side - added :meth:`~pymongo.collection.Collection.map_reduce` helper - see - :doc:`example ` + :doc:`example ` - added :meth:`~pymongo.cursor.Cursor.distinct` method on :class:`~pymongo.cursor.Cursor` instances to allow distinct with queries diff --git a/doc/examples/aggregation.rst b/doc/examples/aggregation.rst new file mode 100644 index 000000000..4b306fc55 --- /dev/null +++ b/doc/examples/aggregation.rst @@ -0,0 +1,192 @@ +Aggregation Examples +==================== + +There are several methods of performing aggregations in MongoDB. These +examples cover the new aggregation framework, using map reduce and using the +group method. + +.. testsetup:: + + >>> from pymongo import Connection + >>> connection = Connection() + >>> connection.drop_database('aggregation_example') + +Setup +----- +To start, we'll insert some example data which we can perform +aggregations on: + +.. doctest:: + + >>> from pymongo import Connection + >>> db = Connection().aggregation_example + >>> db.things.insert({"x": 1, "tags": ["dog", "cat"]}) + ObjectId('...') + >>> db.things.insert({"x": 2, "tags": ["cat"]}) + ObjectId('...') + >>> db.things.insert({"x": 2, "tags": ["mouse", "cat", "dog"]}) + ObjectId('...') + >>> db.things.insert({"x": 3, "tags": []}) + ObjectId('...') + +Aggregation Framework +--------------------- + +This example shows how to use the +:meth:`~pymongo.collection.Collection.aggregate` method to use the aggregation +framework. We'll perform a simple aggregation to count the number of +occurrences for each tag in the ``tags`` array, across the entire collection. +To achieve this we need to pass in three operations to the pipeline. +First, we need to unwind the ``tags`` array, then group by the tags and +sum them up, finally we sort by count. + +As python dictionaries don't maintain order you should use :class:`~bson.son.SON` +or :class:`collections.OrderedDict` where explicit ordering is required +eg "$sort": + +.. note:: + + aggregate requires server version **>= 2.1.1**. The PyMongo + :meth:`~pymongo.collection.Collection.aggregate` helper requires + PyMongo version **>= 2.2.1+**. + +.. doctest:: + + >>> from bson.son import SON + >>> db.things.aggregate([ + ... {"$unwind": "$tags"}, + ... {"$group": {"_id": "$tags", "count": {"$sum": 1}}}, + ... {"$sort": SON([("count", -1), ("_id", -1)])} + ... ]) + ... + {u'ok': 1.0, u'result': [{u'count': 3, u'_id': u'cat'}, {u'count': 2, u'_id': u'dog'}, {u'count': 1, u'_id': u'mouse'}, {u'count': 1, u'_id': None}]} + + +As well as simple aggregations the aggregation framework provides projection +capabilities to reshape the returned data. Using projections and aggregation, +you can add computed fields, create new virtual sub-objects, and extract +sub-fields into the top-level of results. + +.. seealso:: The full documentation for MongoDB's `aggregation framework + `_ + +Map/Reduce +---------- + +Another option for aggregation is to use the map reduce framework. Here we +will define **map** and **reduce** functions to also count he number of +occurrences for each tag in the ``tags`` array, across the entire collection. + +Our **map** function just emits a single `(key, 1)` pair for each tag in +the array: + +.. doctest:: + + >>> from bson.code import Code + >>> mapper = Code(""" + ... function () { + ... this.tags.forEach(function(z) { + ... emit(z, 1); + ... }); + ... } + ... """) + +The **reduce** function sums over all of the emitted values for a given key: + +.. doctest:: + + >>> reducer = Code(""" + ... function (key, values) { + ... var total = 0; + ... for (var i = 0; i < values.length; i++) { + ... total += values[i]; + ... } + ... return total; + ... } + ... """) + +.. note:: We can't just return ``values.length`` as the **reduce** function + might be called iteratively on the results of other reduce steps. + +Finally, we call :meth:`~pymongo.collection.Collection.map_reduce` and +iterate over the result collection: + +.. doctest:: + + >>> result = db.things.map_reduce(mapper, reducer, "myresults") + >>> for doc in result.find(): + ... print doc + ... + {u'_id': u'cat', u'value': 3.0} + {u'_id': u'dog', u'value': 2.0} + {u'_id': u'mouse', u'value': 1.0} + +Advanced Map/Reduce +------------------- + +PyMongo's API supports all of the features of MongoDB's map/reduce engine. +One interesting feature is the ability to get more detailed results when +desired, by passing `full_response=True` to +:meth:`~pymongo.collection.Collection.map_reduce`. This returns the full +response to the map/reduce command, rather than just the result collection: + +.. doctest:: + + >>> db.things.map_reduce(mapper, reducer, "myresults", full_response=True) + {u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': u'...'} + +All of the optional map/reduce parameters are also supported, simply pass them +as keyword arguments. In this example we use the `query` parameter to limit the +documents that will be mapped over: + +.. doctest:: + + >>> result = db.things.map_reduce(mapper, reducer, "myresults", query={"x": {"$lt": 2}}) + >>> for doc in result.find(): + ... print doc + ... + {u'_id': u'cat', u'value': 1.0} + {u'_id': u'dog', u'value': 1.0} + +With MongoDB 1.8.0 or newer you can use :class:`~bson.son.SON` or +:class:`collections.OrderedDict` to specify a different database to store the +result collection: + +.. doctest:: + + >>> from bson.son import SON + >>> db.things.map_reduce(mapper, reducer, out=SON([("replace", "results"), ("db", "outdb")]), full_response=True) + {u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': {u'db': ..., u'collection': ...}} + +.. seealso:: The full list of options for MongoDB's `map reduce engine `_ + +Group +----- + +The :meth:`~pymongo.collection.Collection.group` method provides some of the +same functionality as SQL's GROUP BY. Simpler than a map reduce you need to +provide a key to group by, an initial value for the aggregation and a +reduce function. + +.. note:: Doesn't work with sharded MongoDB configurations, use aggregation or + map/reduce instead of group(). + +Here we are doing a simple group and count of the occurrences ``x`` values: + +.. doctest:: + + >>> reducer = Code(""" + ... function(obj, prev){ + ... prev.count++; + ... } + ... """) + ... + >>> from bson.son import SON + >>> results = db.things.group(key={"x":1}, condition={}, initial={"count": 0}, reduce=reducer) + >>> for doc in results: + ... print doc + {u'count': 1.0, u'x': 1.0} + {u'count': 2.0, u'x': 2.0} + {u'count': 1.0, u'x': 3.0} + +.. seealso:: The full list of options for MongoDB's `group method `_ \ No newline at end of file diff --git a/doc/examples/index.rst b/doc/examples/index.rst index ee50e1e48..831cb161a 100644 --- a/doc/examples/index.rst +++ b/doc/examples/index.rst @@ -17,7 +17,7 @@ MongoDB, you can start it like so: :maxdepth: 1 gridfs - map_reduce + aggregation geo replica_set custom_type diff --git a/doc/examples/map_reduce.rst b/doc/examples/map_reduce.rst deleted file mode 100644 index b4347e5c9..000000000 --- a/doc/examples/map_reduce.rst +++ /dev/null @@ -1,119 +0,0 @@ -Map/Reduce Example -================== - -.. testsetup:: - - from pymongo import Connection - connection = Connection() - connection.drop_database('map_reduce_example') - -This example shows how to use the -:meth:`~pymongo.collection.Collection.map_reduce` method to perform -map/reduce style aggregations on your data. - -.. note:: - - Map/Reduce requires server version **>= 1.1.1**. The PyMongo - :meth:`~pymongo.collection.Collection.map_reduce` helper requires - PyMongo version **>= 1.2**. - -Setup ------ -To start, we'll insert some example data which we can perform -map/reduce queries on: - -.. doctest:: - - >>> from pymongo import Connection - >>> db = Connection().map_reduce_example - >>> db.things.insert({"x": 1, "tags": ["dog", "cat"]}) - ObjectId('...') - >>> db.things.insert({"x": 2, "tags": ["cat"]}) - ObjectId('...') - >>> db.things.insert({"x": 3, "tags": ["mouse", "cat", "dog"]}) - ObjectId('...') - >>> db.things.insert({"x": 4, "tags": []}) - ObjectId('...') - -Basic Map/Reduce ----------------- -Now we'll define our **map** and **reduce** functions. In this case -we're performing the same operation as in the `MongoDB Map/Reduce -documentation `_ - -counting the number of occurrences for each tag in the ``tags`` array, -across the entire collection. - -Our **map** function just emits a single `(key, 1)` pair for each tag in -the array: - -.. doctest:: - - >>> from bson.code import Code - >>> mapper = Code(""" - ... function () { - ... this.tags.forEach(function(z) { - ... emit(z, 1); - ... }); - ... } - ... """) - -The **reduce** function sums over all of the emitted values for a given key: - -.. doctest:: - - >>> reducer = Code(""" - ... function (key, values) { - ... var total = 0; - ... for (var i = 0; i < values.length; i++) { - ... total += values[i]; - ... } - ... return total; - ... } - ... """) - -.. note:: We can't just return ``values.length`` as the **reduce** function - might be called iteratively on the results of other reduce steps. - -Finally, we call :meth:`~pymongo.collection.Collection.map_reduce` and -iterate over the result collection: - -.. doctest:: - - >>> result = db.things.map_reduce(mapper, reducer, "myresults") - >>> for doc in result.find(): - ... print doc - ... - {u'_id': u'cat', u'value': 3.0} - {u'_id': u'dog', u'value': 2.0} - {u'_id': u'mouse', u'value': 1.0} - -Advanced Map/Reduce -------------------- - -PyMongo's API supports all of the features of MongoDB's map/reduce engine. One interesting feature is the ability to get more detailed results when desired, by passing `full_response=True` to :meth:`~pymongo.collection.Collection.map_reduce`. This returns the full response to the map/reduce command, rather than just the result collection: - -.. doctest:: - - >>> db.things.map_reduce(mapper, reducer, "myresults", full_response=True) - {u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': u'...'} - -All of the optional map/reduce parameters are also supported, simply pass them as keyword arguments. In this example we use the `query` parameter to limit the documents that will be mapped over: - -.. doctest:: - - >>> result = db.things.map_reduce(mapper, reducer, "myresults", query={"x": {"$lt": 3}}) - >>> for doc in result.find(): - ... print doc - ... - {u'_id': u'cat', u'value': 2.0} - {u'_id': u'dog', u'value': 1.0} - -With MongoDB 1.8.0 or newer you can use :class:`~bson.son.SON` to specify a different database to store the result collection: - -.. doctest:: - - >>> from bson.son import SON - >>> db.things.map_reduce(mapper, reducer, out=SON([("replace", "results"), ("db", "outdb")]), full_response=True) - {u'counts': {u'input': 4, u'reduce': 2, u'emit': 6, u'output': 3}, u'timeMillis': ..., u'ok': ..., u'result': {u'db': ..., u'collection': ...}} - -.. seealso:: The full list of options for MongoDB's `map reduce engine `_ diff --git a/pymongo/collection.py b/pymongo/collection.py index 3af411bac..60fbc225a 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -889,6 +889,42 @@ class Collection(common.BaseObject): return options + def aggregate(self, ops): + """Perform an aggregation using the aggregation framework on this + collection. + + With :class:`~pymongo.replica_set_connection.ReplicaSetConnection` + or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`, + if the `read_preference` attribute of this instance is not set to + :attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated) + `slave_okay` attribute of this instance is set to `True` the + `aggregate command`_. will be sent to a secondary or slave. + + :Parameters: + - `ops`: a single command or list of aggregation commands + + .. note:: Requires server version **>= 2.1.1** + + .. versionadded:: 2.2.1+ + + .. _aggregate command: + http://docs.mongodb.org/manual/applications/aggregation + """ + if not isinstance(ops, (dict, list, tuple)): + raise TypeError("ops must be a dict, list or tuple") + + if isinstance(ops, dict): + ops = [ops] + + use_master = not self.slave_okay and not self.read_preference + + return self.__database.command("aggregate", self.__name, + pipeline=ops, + read_preference=self.read_preference, + slave_okay=self.slave_okay, + _use_master=use_master) + + # TODO key and condition ought to be optional, but deprecation # could be painful as argument order would have to change. def group(self, key, condition, initial, reduce, finalize=None): @@ -1031,7 +1067,7 @@ class Collection(common.BaseObject): .. note:: Requires server version **>= 1.1.1** - .. seealso:: :doc:`/examples/map_reduce` + .. seealso:: :doc:`/examples/aggregation` .. versionchanged:: 2.2 Removed deprecated arguments: merge_output and reduce_output diff --git a/test/test_collection.py b/test/test_collection.py index cb407e7f6..1ffc36489 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1047,6 +1047,19 @@ class TestCollection(unittest.TestCase): self.assertEqual(db.test.find({'foo': 'bar'}).count(), 1) self.assertEqual(db.test.find({'foo': re.compile(r'ba.*')}).count(), 2) + def test_aggregate(self): + db = self.db + db.drop_collection("test") + db.test.save({'foo': [1, 2]}) + + self.assertRaises(TypeError, db.test.aggregate, "wow") + + ops = {"$project": {"_id": False, "foo": True}} + expected = {'ok': 1.0, 'result': [{'foo': [1, 2]}]} + self.assertEqual(expected, db.test.aggregate(ops)) + self.assertEqual(expected, db.test.aggregate([ops])) + self.assertEqual(expected, db.test.aggregate((ops,))) + def test_group(self): db = self.db db.drop_collection("test")