diff --git a/pymongo/collection.py b/pymongo/collection.py index 4df54f486..3114246d6 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -1642,8 +1642,18 @@ class Collection(common.BaseObject): cmd.update(kwargs) - result = self._command(sock_info, cmd, slave_ok, - read_concern=self.read_concern) + # Apply this Collection's read concern if $out is not in the + # pipeline. + if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: + for stage in cmd['pipeline']: + if '$out' in stage: + result = self._command(sock_info, cmd, slave_ok) + break + else: + result = self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern) + else: + result = self._command(sock_info, cmd, slave_ok) if "cursor" in result: cursor = result["cursor"] @@ -1826,8 +1836,14 @@ class Collection(common.BaseObject): cmd.update(kwargs) with self._socket_for_primary_reads() as (sock_info, slave_ok): - response = self._command( - sock_info, cmd, slave_ok, ReadPreference.PRIMARY) + if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and + 'inline' in cmd['out']): + response = self._command( + sock_info, cmd, slave_ok, ReadPreference.PRIMARY, + read_concern=self.read_concern) + else: + response = self._command( + sock_info, cmd, slave_ok, ReadPreference.PRIMARY) if full_response or not response.get('result'): return response @@ -1869,6 +1885,11 @@ class Collection(common.BaseObject): ("out", {"inline": 1})]) cmd.update(kwargs) with self._socket_for_reads() as (sock_info, slave_ok): + if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: + res = self._command(sock_info, cmd, slave_ok, + read_concern=self.read_concern) + else: + res = self._command(sock_info, cmd, slave_ok) res = self._command(sock_info, cmd, slave_ok) if full_response: diff --git a/test/test_read_concern.py b/test/test_read_concern.py index 1468e2c65..72b33d686 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -18,7 +18,7 @@ import pymongo from bson.son import SON from pymongo import monitoring -from pymongo.errors import ConfigurationError +from pymongo.errors import ConfigurationError, OperationFailure from pymongo.read_concern import ReadConcern from test import client_context, pair, unittest @@ -110,3 +110,54 @@ class TestReadConcern(unittest.TestCase): self.assertEqual( {'level': 'local'}, self.listener.results['started'][0].command['readConcern']) + + def test_aggregate_out(self): + coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) + try: + tuple(coll.aggregate([{'$match': {'field': 'value'}}, + {'$out': 'output_collection'}])) + except OperationFailure: + # "ns doesn't exist" + pass + self.assertNotIn('readConcern', + self.listener.results['started'][0].command) + + def test_map_reduce_out(self): + coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) + try: + tuple(coll.map_reduce('function() { emit(this._id, this.value); }', + 'function(key, values) { return 42; }', + out='output_collection')) + except OperationFailure: + # "ns doesn't exist" + pass + self.assertNotIn('readConcern', + self.listener.results['started'][0].command) + + if client_context.version.at_least(3, 1, 9, -1): + self.listener.results.clear() + try: + tuple(coll.map_reduce( + 'function() { emit(this._id, this.value); }', + 'function(key, values) { return 42; }', + out={'inline': 1})) + except OperationFailure: + # "ns doesn't exist" + pass + self.assertEqual( + {'level': 'local'}, + self.listener.results['started'][0].command['readConcern']) + + @client_context.require_version_min(3, 1, 9, -1) + def test_inline_map_reduce(self): + coll = self.db.get_collection('coll', read_concern=ReadConcern('local')) + try: + tuple(coll.inline_map_reduce( + 'function() { emit(this._id, this.value); }', + 'function(key, values) { return 42; }')) + except OperationFailure: + # "ns doesn't exist" + pass + self.assertEqual( + {'level': 'local'}, + self.listener.results['started'][0].command['readConcern'])