diff --git a/doc/changelog.rst b/doc/changelog.rst index 192b45661..062104bc8 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -4,6 +4,8 @@ Changelog Changes in Version 4.1 ---------------------- +PyMongo 4.0 brings a number of improvements including: + - :meth:`pymongo.collection.Collection.update_one`, :meth:`pymongo.collection.Collection.update_many`, :meth:`pymongo.collection.Collection.delete_one`, @@ -15,6 +17,10 @@ Changes in Version 4.1 and :meth:`pymongo.collection.Collection.find` all support a new keyword argument ``let`` which is a map of parameter names and values. Parameters can then be accessed as variables in an aggregate expression context. +- :meth:`~pymongo.collection.Collection.aggregate` now supports + $merge and $out executing on secondaries on MongoDB >=5.0. + aggregate() now always obeys the collection's :attr:`read_preference` on + MongoDB >= 5.0. Changes in Version 4.0 diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index f0be39e67..a5a7abaed 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -19,7 +19,7 @@ from bson.son import SON from pymongo import common from pymongo.collation import validate_collation_or_none from pymongo.errors import ConfigurationError -from pymongo.read_preferences import ReadPreference +from pymongo.read_preferences import _AggWritePref, ReadPreference class _AggregationCommand(object): @@ -70,6 +70,7 @@ class _AggregationCommand(object): options.pop('collation', None)) self._max_await_time_ms = options.pop('maxAwaitTimeMS', None) + self._write_preference = None @property def _aggregation_target(self): @@ -97,9 +98,12 @@ class _AggregationCommand(object): result, session, server, sock_info, secondary_ok) def get_read_preference(self, session): - if self._performs_write: - return ReadPreference.PRIMARY - return self._target._read_preference_for(session) + if self._write_preference: + return self._write_preference + pref = self._target._read_preference_for(session) + if self._performs_write and pref != ReadPreference.PRIMARY: + self._write_preference = pref = _AggWritePref(pref) + return pref def get_cursor(self, session, server, sock_info, secondary_ok): # Serialize command. diff --git a/pymongo/collection.py b/pymongo/collection.py index 092163c40..ea11875ce 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -1915,9 +1915,9 @@ class Collection(common.BaseObject): collection. The :meth:`aggregate` method obeys the :attr:`read_preference` of this - :class:`Collection`, except when ``$out`` or ``$merge`` are used, in - which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` - is used. + :class:`Collection`, except when ``$out`` or ``$merge`` are used on + MongoDB <5.0, in which case + :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` is used. .. note:: This method does not support the 'explain' option. Please use :meth:`~pymongo.database.Database.command` instead. An @@ -1958,6 +1958,8 @@ class Collection(common.BaseObject): .. versionchanged:: 4.1 Added ``let`` parameter. + Support $merge and $out executing on secondaries according to the + collection's :attr:`read_preference`. .. versionchanged:: 4.0 Removed the ``useCursor`` option. .. versionchanged:: 3.9 diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 41e701706..9c98e5d21 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1155,7 +1155,7 @@ class MongoClient(common.BaseObject): with self._get_socket(server, session) as sock_info: secondary_ok = (single and not sock_info.is_mongos) or ( - read_preference != ReadPreference.PRIMARY) + read_preference.mode != ReadPreference.PRIMARY.mode) yield sock_info, secondary_ok @contextlib.contextmanager diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index c60240822..2471d5834 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -424,6 +424,45 @@ class Nearest(_ServerMode): self.max_staleness, selection)) +class _AggWritePref: + """Agg $out/$merge write preference. + + * If there are readable servers and there is any pre-5.0 server, use + primary read preference. + * Otherwise use `pref` read preference. + + :Parameters: + - `pref`: The read preference to use on MongoDB 5.0+. + """ + + __slots__ = ('pref', 'effective_pref') + + def __init__(self, pref): + self.pref = pref + self.effective_pref = ReadPreference.PRIMARY + + def selection_hook(self, topology_description): + common_wv = topology_description.common_wire_version + if (topology_description.has_readable_server( + ReadPreference.PRIMARY_PREFERRED) and + common_wv and common_wv < 13): + self.effective_pref = ReadPreference.PRIMARY + else: + self.effective_pref = self.pref + + def __call__(self, selection): + """Apply this read preference to a Selection.""" + return self.effective_pref(selection) + + def __repr__(self): + return "_AggWritePref(pref=%r)" % (self.pref,) + + # Proxy other calls to the effective_pref so that _AggWritePref can be + # used in place of an actual read preference. + def __getattr__(self, name): + return getattr(self.effective_pref, name) + + _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest) diff --git a/pymongo/topology_description.py b/pymongo/topology_description.py index 4fe897dce..c13d00a64 100644 --- a/pymongo/topology_description.py +++ b/pymongo/topology_description.py @@ -19,7 +19,7 @@ from random import sample from pymongo import common from pymongo.errors import ConfigurationError -from pymongo.read_preferences import ReadPreference +from pymongo.read_preferences import ReadPreference, _AggWritePref from pymongo.server_description import ServerDescription from pymongo.server_selectors import Selection from pymongo.server_type import SERVER_TYPE @@ -263,21 +263,24 @@ class TopologyDescription(object): selector.min_wire_version, common_wv)) + if isinstance(selector, _AggWritePref): + selector.selection_hook(self) + if self.topology_type == TOPOLOGY_TYPE.Unknown: return [] elif self.topology_type in (TOPOLOGY_TYPE.Single, TOPOLOGY_TYPE.LoadBalanced): # Ignore selectors for standalone and load balancer mode. return self.known_servers - elif address: + if address: # Ignore selectors when explicit address is requested. description = self.server_descriptions().get(address) return [description] if description else [] - elif self.topology_type == TOPOLOGY_TYPE.Sharded: - # Ignore read preference. - selection = Selection.from_topology_description(self) - else: - selection = selector(Selection.from_topology_description(self)) + + selection = Selection.from_topology_description(self) + # Ignore read preference for sharded clusters. + if self.topology_type != TOPOLOGY_TYPE.Sharded: + selection = selector(selection) # Apply custom selector followed by localThresholdMS. if custom_selector is not None and selection: diff --git a/test/crud/unified/aggregate-write-readPreference.json b/test/crud/unified/aggregate-write-readPreference.json new file mode 100644 index 000000000..28327e8d8 --- /dev/null +++ b/test/crud/unified/aggregate-write-readPreference.json @@ -0,0 +1,460 @@ +{ + "description": "aggregate-write-readPreference", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "topologies": [ + "replicaset", + "sharded", + "load-balanced" + ] + } + ], + "_yamlAnchors": { + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + }, + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "uriOptions": { + "readConcernLevel": "local", + "w": 1 + } + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "db0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "coll0", + "collectionOptions": { + "readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + } + } + } + }, + { + "collection": { + "id": "collection1", + "database": "database0", + "collectionName": "coll1" + } + } + ], + "initialData": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + }, + { + "collectionName": "coll1", + "databaseName": "db0", + "documents": [] + } + ], + "tests": [ + { + "description": "Aggregate with $out includes read preference for 5.0+ server", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "serverless": "forbid" + } + ], + "operations": [ + { + "object": "collection0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$out": "coll1" + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "coll0", + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$out": "coll1" + } + ], + "$readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll1", + "databaseName": "db0", + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ] + }, + { + "description": "Aggregate with $out omits read preference for pre-5.0 server", + "runOnRequirements": [ + { + "minServerVersion": "4.2", + "maxServerVersion": "4.4.99", + "serverless": "forbid" + } + ], + "operations": [ + { + "object": "collection0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$out": "coll1" + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "coll0", + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$out": "coll1" + } + ], + "$readPreference": { + "mode": "primary" + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll1", + "databaseName": "db0", + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ] + }, + { + "description": "Aggregate with $merge includes read preference for 5.0+ server", + "runOnRequirements": [ + { + "minServerVersion": "5.0" + } + ], + "operations": [ + { + "object": "collection0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$merge": { + "into": "coll1" + } + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "coll0", + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$merge": { + "into": "coll1" + } + } + ], + "$readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll1", + "databaseName": "db0", + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ] + }, + { + "description": "Aggregate with $merge omits read preference for pre-5.0 server", + "runOnRequirements": [ + { + "minServerVersion": "4.2", + "maxServerVersion": "4.4.99" + } + ], + "operations": [ + { + "object": "collection0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$merge": { + "into": "coll1" + } + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "coll0", + "pipeline": [ + { + "$match": { + "_id": { + "$gt": 1 + } + } + }, + { + "$sort": { + "x": 1 + } + }, + { + "$merge": { + "into": "coll1" + } + } + ], + "$readPreference": { + "mode": "primary" + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll1", + "databaseName": "db0", + "documents": [ + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ] + } + ] +} diff --git a/test/crud/unified/db-aggregate-write-readPreference.json b/test/crud/unified/db-aggregate-write-readPreference.json new file mode 100644 index 000000000..269299e3c --- /dev/null +++ b/test/crud/unified/db-aggregate-write-readPreference.json @@ -0,0 +1,446 @@ +{ + "description": "db-aggregate-write-readPreference", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "topologies": [ + "replicaset" + ], + "serverless": "forbid" + } + ], + "_yamlAnchors": { + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + }, + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "uriOptions": { + "readConcernLevel": "local", + "w": 1 + } + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "db0", + "databaseOptions": { + "readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + } + } + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "coll0" + } + } + ], + "initialData": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [] + } + ], + "tests": [ + { + "description": "Database-level aggregate with $out includes read preference for 5.0+ server", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "serverless": "forbid" + } + ], + "operations": [ + { + "object": "database0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$out": "coll0" + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$out": "coll0" + } + ], + "$readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [ + { + "_id": 1 + } + ] + } + ] + }, + { + "description": "Database-level aggregate with $out omits read preference for pre-5.0 server", + "runOnRequirements": [ + { + "minServerVersion": "4.2", + "maxServerVersion": "4.4.99", + "serverless": "forbid" + } + ], + "operations": [ + { + "object": "database0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$out": "coll0" + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$out": "coll0" + } + ], + "$readPreference": { + "mode": "primary" + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [ + { + "_id": 1 + } + ] + } + ] + }, + { + "description": "Database-level aggregate with $merge includes read preference for 5.0+ server", + "runOnRequirements": [ + { + "minServerVersion": "5.0" + } + ], + "operations": [ + { + "object": "database0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$merge": { + "into": "coll0" + } + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$merge": { + "into": "coll0" + } + } + ], + "$readPreference": { + "mode": "secondaryPreferred", + "maxStalenessSeconds": 600 + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [ + { + "_id": 1 + } + ] + } + ] + }, + { + "description": "Database-level aggregate with $merge omits read preference for pre-5.0 server", + "runOnRequirements": [ + { + "minServerVersion": "4.2", + "maxServerVersion": "4.4.99" + } + ], + "operations": [ + { + "object": "database0", + "name": "aggregate", + "arguments": { + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$merge": { + "into": "coll0" + } + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$listLocalSessions": {} + }, + { + "$limit": 1 + }, + { + "$addFields": { + "_id": 1 + } + }, + { + "$project": { + "_id": 1 + } + }, + { + "$merge": { + "into": "coll0" + } + } + ], + "$readPreference": { + "mode": "primary" + }, + "readConcern": { + "level": "local" + }, + "writeConcern": { + "w": 1 + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "db0", + "documents": [ + { + "_id": 1 + } + ] + } + ] + } + ] +} diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 18dbd0bee..bbc89b9d1 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -433,7 +433,9 @@ class TestCommandAndReadPreference(IntegrationTest): [{'$project': {'_id': 1}}]) def test_aggregate_write(self): - self._test_coll_helper(False, self.c.pymongo_test.test, + # 5.0 servers support $out on secondaries. + secondary_ok = client_context.version.at_least(5, 0) + self._test_coll_helper(secondary_ok, self.c.pymongo_test.test, 'aggregate', [{'$project': {'_id': 1}}, {'$out': "agg_write_test"}])