diff --git a/doc/changelog.rst b/doc/changelog.rst index 68c8ec92d..c3fd1e344 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -51,6 +51,9 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include: :mod:`~pymongo.monitoring` for an example. - :meth:`pymongo.collection.Collection.aggregate` and :meth:`pymongo.database.Database.aggregate` now support the ``$merge`` pipeline + stage and use read preference + :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` if the ``$out`` or + ``$merge`` pipeline stages are used. - Support for specifying a pipeline or document in :meth:`~pymongo.collection.Collection.update_one`, :meth:`~pymongo.collection.Collection.update_many`, diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index efb6b24c9..7383a3511 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -19,6 +19,7 @@ from bson.son import SON from pymongo import common from pymongo.collation import validate_collation_or_none from pymongo.errors import ConfigurationError +from pymongo.read_preferences import ReadPreference class _AggregationCommand(object): @@ -97,6 +98,11 @@ class _AggregationCommand(object): self._result_processor( result, session, server, sock_info, slave_ok) + def get_read_preference(self, session): + if self._performs_write: + return ReadPreference.PRIMARY + return self._target._read_preference_for(session) + def get_cursor(self, session, server, sock_info, slave_ok): # Ensure command compatibility. self._check_compat(sock_info) @@ -106,9 +112,6 @@ class _AggregationCommand(object): ("pipeline", self._pipeline)]) cmd.update(self._options) - # Cache read preference for easy access. - read_preference = self._target._read_preference_for(session) - # Apply this target's read concern if: # readConcern has not been specified as a kwarg and either # - server version is >= 4.2 or @@ -134,7 +137,7 @@ class _AggregationCommand(object): self._database.name, cmd, slave_ok, - read_preference, + self.get_read_preference(session), self._target.codec_options, parse_write_concern_error=True, read_concern=read_concern, diff --git a/pymongo/collection.py b/pymongo/collection.py index f78b34c12..d47a43ae8 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -2287,7 +2287,7 @@ class Collection(common.BaseObject): self, cursor_class, pipeline, kwargs, explicit_session, user_fields={'cursor': {'firstBatch': 1}}, use_cursor=use_cursor) return self.__database.client._retryable_read( - cmd.get_cursor, self._read_preference_for(session), session, + cmd.get_cursor, cmd.get_read_preference(session), session, retryable=not cmd._performs_write) def aggregate(self, pipeline, session=None, **kwargs): @@ -2313,11 +2313,9 @@ class Collection(common.BaseObject): - `useCursor` (bool): Deprecated. Will be removed in PyMongo 4.0. The :meth:`aggregate` method obeys the :attr:`read_preference` of this - :class:`Collection`. Please note that using the ``$out`` and ``$merge`` - pipeline stages requires a read preference of - :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default). - The server will raise an error if the ``$out`` or ``$merge`` pipeline - stages are used with any other read preference. + :class:`Collection`, except when ``$out`` or ``$merge`` are used, in + which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` + is used. .. note:: This method does not support the 'explain' option. Please use :meth:`~pymongo.database.Database.command` instead. An @@ -2337,11 +2335,12 @@ class Collection(common.BaseObject): A :class:`~pymongo.command_cursor.CommandCursor` over the result set. - .. versionchanged:: 3.9 - Added support for the ``$merge`` pipeline stage. .. versionchanged:: 3.9 Apply this collection's read concern to pipelines containing the `$out` stage when connected to MongoDB >= 4.2. + Added support for the ``$merge`` pipeline stage. + Aggregations that write always use read preference + :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`. .. versionchanged:: 3.6 Added the `session` parameter. Added the `maxAwaitTimeMS` option. Deprecated the `useCursor` option. diff --git a/pymongo/database.py b/pymongo/database.py index ac797071a..9287a2cf5 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -477,11 +477,9 @@ class Database(common.BaseObject): :class:`~pymongo.collation.Collation`. The :meth:`aggregate` method obeys the :attr:`read_preference` of this - :class:`Database`. Please note that using the ``$out`` or ``$merge`` - pipeline stages requires a read preference of - :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` (the default). - The server will raise an error if the ``$out`` or ``$merge`` pipeline - stages is used with any other read preference. + :class:`Database`, except when ``$out`` or ``$merge`` are used, in + which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` + is used. .. note:: This method does not support the 'explain' option. Please use :meth:`~pymongo.database.Database.command` instead. @@ -512,7 +510,7 @@ class Database(common.BaseObject): self, CommandCursor, pipeline, kwargs, session is not None, user_fields={'cursor': {'firstBatch': 1}}) return self.client._retryable_read( - cmd.get_cursor, self._read_preference_for(s), s, + cmd.get_cursor, cmd.get_read_preference(s), s, retryable=not cmd._performs_write) def watch(self, pipeline=None, full_document='default', resume_after=None, diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index c188992ce..3a7bd69c8 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -468,6 +468,11 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): 'aggregate', [{'$project': {'_id': 1}}]) + def test_aggregate_write(self): + self._test_coll_helper(False, self.c.pymongo_test.test, + 'aggregate', + [{'$project': {'_id': 1}}, {'$out': "agg_write_test"}]) + class TestMovingAverage(unittest.TestCase): def test_moving_average(self):