PYTHON-836 - Fix command routing for aggregate and map_reduce helpers.
This commit is contained in:
parent
c2ab84d117
commit
eeef679228
@ -1369,6 +1369,13 @@ class Collection(common.BaseObject):
|
||||
of useCursor, if provided, will be obeyed regardless of server
|
||||
version.
|
||||
|
||||
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
|
||||
:class:`Collection`. Please note that using the ``$out`` pipeline stage
|
||||
requires a read preference of
|
||||
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default).
|
||||
The server will raise an error if the ``$out`` pipeline stage is used
|
||||
with any other read preference.
|
||||
|
||||
.. warning:: When upgrading a pre-MongoDB 2.6 sharded cluster to any
|
||||
newer version the useCursor option **must** be set to ``False``
|
||||
until all shards have been upgraded.
|
||||
@ -1377,9 +1384,6 @@ class Collection(common.BaseObject):
|
||||
use :meth:`~pymongo.database.Database.command` instead. An
|
||||
example is included in the :ref:`aggregate-examples` documentation.
|
||||
|
||||
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
|
||||
:class:`Collection`.
|
||||
|
||||
:Parameters:
|
||||
- `pipeline`: a list of aggregation pipeline stages
|
||||
- `**kwargs` (optional): See list of options above.
|
||||
@ -1399,6 +1403,8 @@ class Collection(common.BaseObject):
|
||||
Added cursor support.
|
||||
.. versionadded:: 2.3
|
||||
|
||||
.. seealso:: :doc:`/examples/aggregation`
|
||||
|
||||
.. _aggregate command:
|
||||
http://docs.mongodb.org/manual/applications/aggregation
|
||||
"""
|
||||
@ -1434,14 +1440,7 @@ class Collection(common.BaseObject):
|
||||
|
||||
cmd.update(kwargs)
|
||||
|
||||
# XXX: Keep doing this automatically?
|
||||
read_preference = self.read_preference
|
||||
for stage in pipeline:
|
||||
if '$out' in stage:
|
||||
read_preference = ReadPreference.PRIMARY
|
||||
break
|
||||
|
||||
result, address = self._command(cmd, read_preference)
|
||||
result, address = self._command(cmd)
|
||||
|
||||
if "cursor" in result:
|
||||
cursor_info = result["cursor"]
|
||||
@ -1579,9 +1578,6 @@ class Collection(common.BaseObject):
|
||||
the results of the operation. Otherwise, returns the full
|
||||
response from the server to the `map reduce command`_.
|
||||
|
||||
The :meth:`map_reduce` method obeys the :attr:`read_preference` of this
|
||||
:class:`Collection`.
|
||||
|
||||
:Parameters:
|
||||
- `map`: map function (as a JavaScript string)
|
||||
- `reduce`: reduce function (as a JavaScript string)
|
||||
@ -1598,6 +1594,11 @@ class Collection(common.BaseObject):
|
||||
|
||||
>>> db.test.map_reduce(map, reduce, "myresults", limit=2)
|
||||
|
||||
.. note:: The :meth:`map_reduce` method does **not** obey the
|
||||
:attr:`read_preference` of this :class:`Collection`. To run
|
||||
mapReduce on a secondary use the :meth:`inline_map_reduce` method
|
||||
instead.
|
||||
|
||||
.. seealso:: :doc:`/examples/aggregation`
|
||||
|
||||
.. versionchanged:: 2.2
|
||||
@ -1617,12 +1618,7 @@ class Collection(common.BaseObject):
|
||||
("out", out)])
|
||||
cmd.update(kwargs)
|
||||
|
||||
# XXX: Keep doing this automatically?
|
||||
read_preference = self.read_preference
|
||||
if not isinstance(out, collections.Mapping) or not out.get('inline'):
|
||||
read_preference = ReadPreference.PRIMARY
|
||||
|
||||
response = self._command(cmd, read_preference)[0]
|
||||
response = self._command(cmd, ReadPreference.PRIMARY)[0]
|
||||
|
||||
if full_response or not response.get('result'):
|
||||
return response
|
||||
|
||||
@ -325,25 +325,6 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
|
||||
read_preference=mode())
|
||||
self._test_fn(server_type, func)
|
||||
|
||||
@client_context.require_version_min(2, 5, 2)
|
||||
def test_aggregate_command_with_out(self):
|
||||
# Tests aggregate command when pipeline contains $out.
|
||||
db = self.c.get_database(
|
||||
"pymongo_test", write_concern=WriteConcern(w=self.w))
|
||||
db.test.insert_one({"x": 1, "y": 1})
|
||||
db.test.insert_one({"x": 1, "y": 2})
|
||||
db.test.insert_one({"x": 2, "y": 1})
|
||||
db.test.insert_one({"x": 2, "y": 2})
|
||||
|
||||
# Test aggregate when sent through the collection aggregate
|
||||
# function. Aggregate with $out always goes to primary, doesn't obey
|
||||
# read prefs.
|
||||
self._test_coll_helper(False, self.c.pymongo_test.test, 'aggregate',
|
||||
[{"$match": {"x": 2}}, {"$out": "agg_out"}])
|
||||
|
||||
self.c.pymongo_test.drop_collection("test")
|
||||
self.c.pymongo_test.drop_collection("agg_out")
|
||||
|
||||
def test_create_collection(self):
|
||||
# Collections should be created on primary, obviously
|
||||
self._test_primary_helper(
|
||||
@ -370,7 +351,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
|
||||
self._test_coll_helper(False, self.c.pymongo_test.test, 'map_reduce',
|
||||
'function() { }', 'function() { }', 'mr_out')
|
||||
|
||||
self._test_coll_helper(True, self.c.pymongo_test.test, 'map_reduce',
|
||||
self._test_coll_helper(False, self.c.pymongo_test.test, 'map_reduce',
|
||||
'function() { }', 'function() { }',
|
||||
{'inline': 1})
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user