PYTHON-2554 Support aggregate $merge and $out executing on secondaries (#774)

This commit is contained in:
Shane Harvey 2021-12-07 16:26:01 -08:00 committed by GitHub
parent e154642968
commit bf992c20a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 978 additions and 16 deletions

View File

@ -4,6 +4,8 @@ Changelog
Changes in Version 4.1
----------------------
PyMongo 4.0 brings a number of improvements including:
- :meth:`pymongo.collection.Collection.update_one`,
:meth:`pymongo.collection.Collection.update_many`,
:meth:`pymongo.collection.Collection.delete_one`,
@ -15,6 +17,10 @@ Changes in Version 4.1
and :meth:`pymongo.collection.Collection.find` all support a new keyword
argument ``let`` which is a map of parameter names and values. Parameters
can then be accessed as variables in an aggregate expression context.
- :meth:`~pymongo.collection.Collection.aggregate` now supports
$merge and $out executing on secondaries on MongoDB >=5.0.
aggregate() now always obeys the collection's :attr:`read_preference` on
MongoDB >= 5.0.
Changes in Version 4.0

View File

@ -19,7 +19,7 @@ from bson.son import SON
from pymongo import common
from pymongo.collation import validate_collation_or_none
from pymongo.errors import ConfigurationError
from pymongo.read_preferences import ReadPreference
from pymongo.read_preferences import _AggWritePref, ReadPreference
class _AggregationCommand(object):
@ -70,6 +70,7 @@ class _AggregationCommand(object):
options.pop('collation', None))
self._max_await_time_ms = options.pop('maxAwaitTimeMS', None)
self._write_preference = None
@property
def _aggregation_target(self):
@ -97,9 +98,12 @@ class _AggregationCommand(object):
result, session, server, sock_info, secondary_ok)
def get_read_preference(self, session):
if self._performs_write:
return ReadPreference.PRIMARY
return self._target._read_preference_for(session)
if self._write_preference:
return self._write_preference
pref = self._target._read_preference_for(session)
if self._performs_write and pref != ReadPreference.PRIMARY:
self._write_preference = pref = _AggWritePref(pref)
return pref
def get_cursor(self, session, server, sock_info, secondary_ok):
# Serialize command.

View File

@ -1915,9 +1915,9 @@ class Collection(common.BaseObject):
collection.
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
:class:`Collection`, except when ``$out`` or ``$merge`` are used, in
which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`
is used.
:class:`Collection`, except when ``$out`` or ``$merge`` are used on
MongoDB <5.0, in which case
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` is used.
.. note:: This method does not support the 'explain' option. Please
use :meth:`~pymongo.database.Database.command` instead. An
@ -1958,6 +1958,8 @@ class Collection(common.BaseObject):
.. versionchanged:: 4.1
Added ``let`` parameter.
Support $merge and $out executing on secondaries according to the
collection's :attr:`read_preference`.
.. versionchanged:: 4.0
Removed the ``useCursor`` option.
.. versionchanged:: 3.9

View File

@ -1155,7 +1155,7 @@ class MongoClient(common.BaseObject):
with self._get_socket(server, session) as sock_info:
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
read_preference.mode != ReadPreference.PRIMARY.mode)
yield sock_info, secondary_ok
@contextlib.contextmanager

View File

@ -424,6 +424,45 @@ class Nearest(_ServerMode):
self.max_staleness, selection))
class _AggWritePref:
"""Agg $out/$merge write preference.
* If there are readable servers and there is any pre-5.0 server, use
primary read preference.
* Otherwise use `pref` read preference.
:Parameters:
- `pref`: The read preference to use on MongoDB 5.0+.
"""
__slots__ = ('pref', 'effective_pref')
def __init__(self, pref):
self.pref = pref
self.effective_pref = ReadPreference.PRIMARY
def selection_hook(self, topology_description):
common_wv = topology_description.common_wire_version
if (topology_description.has_readable_server(
ReadPreference.PRIMARY_PREFERRED) and
common_wv and common_wv < 13):
self.effective_pref = ReadPreference.PRIMARY
else:
self.effective_pref = self.pref
def __call__(self, selection):
"""Apply this read preference to a Selection."""
return self.effective_pref(selection)
def __repr__(self):
return "_AggWritePref(pref=%r)" % (self.pref,)
# Proxy other calls to the effective_pref so that _AggWritePref can be
# used in place of an actual read preference.
def __getattr__(self, name):
return getattr(self.effective_pref, name)
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
Secondary, SecondaryPreferred, Nearest)

View File

@ -19,7 +19,7 @@ from random import sample
from pymongo import common
from pymongo.errors import ConfigurationError
from pymongo.read_preferences import ReadPreference
from pymongo.read_preferences import ReadPreference, _AggWritePref
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import Selection
from pymongo.server_type import SERVER_TYPE
@ -263,21 +263,24 @@ class TopologyDescription(object):
selector.min_wire_version,
common_wv))
if isinstance(selector, _AggWritePref):
selector.selection_hook(self)
if self.topology_type == TOPOLOGY_TYPE.Unknown:
return []
elif self.topology_type in (TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
# Ignore selectors for standalone and load balancer mode.
return self.known_servers
elif address:
if address:
# Ignore selectors when explicit address is requested.
description = self.server_descriptions().get(address)
return [description] if description else []
elif self.topology_type == TOPOLOGY_TYPE.Sharded:
# Ignore read preference.
selection = Selection.from_topology_description(self)
else:
selection = selector(Selection.from_topology_description(self))
selection = Selection.from_topology_description(self)
# Ignore read preference for sharded clusters.
if self.topology_type != TOPOLOGY_TYPE.Sharded:
selection = selector(selection)
# Apply custom selector followed by localThresholdMS.
if custom_selector is not None and selection:

View File

@ -0,0 +1,460 @@
{
"description": "aggregate-write-readPreference",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"topologies": [
"replicaset",
"sharded",
"load-balanced"
]
}
],
"_yamlAnchors": {
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
},
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
],
"uriOptions": {
"readConcernLevel": "local",
"w": 1
}
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "db0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0",
"collectionOptions": {
"readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
}
}
}
},
{
"collection": {
"id": "collection1",
"database": "database0",
"collectionName": "coll1"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
}
]
},
{
"collectionName": "coll1",
"databaseName": "db0",
"documents": []
}
],
"tests": [
{
"description": "Aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0",
"serverless": "forbid"
}
],
"operations": [
{
"object": "collection0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$out": "coll1"
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "coll0",
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$out": "coll1"
}
],
"$readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll1",
"databaseName": "db0",
"documents": [
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
}
]
}
]
},
{
"description": "Aggregate with $out omits read preference for pre-5.0 server",
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
{
"object": "collection0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$out": "coll1"
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "coll0",
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$out": "coll1"
}
],
"$readPreference": {
"mode": "primary"
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll1",
"databaseName": "db0",
"documents": [
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
}
]
}
]
},
{
"description": "Aggregate with $merge includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
}
],
"operations": [
{
"object": "collection0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$merge": {
"into": "coll1"
}
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "coll0",
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$merge": {
"into": "coll1"
}
}
],
"$readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll1",
"databaseName": "db0",
"documents": [
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
}
]
}
]
},
{
"description": "Aggregate with $merge omits read preference for pre-5.0 server",
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
}
],
"operations": [
{
"object": "collection0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$merge": {
"into": "coll1"
}
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "coll0",
"pipeline": [
{
"$match": {
"_id": {
"$gt": 1
}
}
},
{
"$sort": {
"x": 1
}
},
{
"$merge": {
"into": "coll1"
}
}
],
"$readPreference": {
"mode": "primary"
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll1",
"databaseName": "db0",
"documents": [
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
}
]
}
]
}
]
}

View File

@ -0,0 +1,446 @@
{
"description": "db-aggregate-write-readPreference",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"topologies": [
"replicaset"
],
"serverless": "forbid"
}
],
"_yamlAnchors": {
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
},
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
],
"uriOptions": {
"readConcernLevel": "local",
"w": 1
}
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "db0",
"databaseOptions": {
"readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
}
}
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "coll0"
}
}
],
"initialData": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": []
}
],
"tests": [
{
"description": "Database-level aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0",
"serverless": "forbid"
}
],
"operations": [
{
"object": "database0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$out": "coll0"
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": 1,
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$out": "coll0"
}
],
"$readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": [
{
"_id": 1
}
]
}
]
},
{
"description": "Database-level aggregate with $out omits read preference for pre-5.0 server",
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
{
"object": "database0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$out": "coll0"
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": 1,
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$out": "coll0"
}
],
"$readPreference": {
"mode": "primary"
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": [
{
"_id": 1
}
]
}
]
},
{
"description": "Database-level aggregate with $merge includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
}
],
"operations": [
{
"object": "database0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$merge": {
"into": "coll0"
}
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": 1,
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$merge": {
"into": "coll0"
}
}
],
"$readPreference": {
"mode": "secondaryPreferred",
"maxStalenessSeconds": 600
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": [
{
"_id": 1
}
]
}
]
},
{
"description": "Database-level aggregate with $merge omits read preference for pre-5.0 server",
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
}
],
"operations": [
{
"object": "database0",
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$merge": {
"into": "coll0"
}
}
]
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": 1,
"pipeline": [
{
"$listLocalSessions": {}
},
{
"$limit": 1
},
{
"$addFields": {
"_id": 1
}
},
{
"$project": {
"_id": 1
}
},
{
"$merge": {
"into": "coll0"
}
}
],
"$readPreference": {
"mode": "primary"
},
"readConcern": {
"level": "local"
},
"writeConcern": {
"w": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "coll0",
"databaseName": "db0",
"documents": [
{
"_id": 1
}
]
}
]
}
]
}

View File

@ -433,7 +433,9 @@ class TestCommandAndReadPreference(IntegrationTest):
[{'$project': {'_id': 1}}])
def test_aggregate_write(self):
self._test_coll_helper(False, self.c.pymongo_test.test,
# 5.0 servers support $out on secondaries.
secondary_ok = client_context.version.at_least(5, 0)
self._test_coll_helper(secondary_ok, self.c.pymongo_test.test,
'aggregate',
[{'$project': {'_id': 1}}, {'$out': "agg_write_test"}])