PYTHON-2767 Support snapshot reads on secondaries (#656)

Add the MongoClient.start_session snapshot option.
This commit is contained in:
Shane Harvey 2021-06-25 16:12:12 -07:00 committed by GitHub
parent a7921604f1
commit 14160aed04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1193 additions and 32 deletions

View File

@ -126,14 +126,29 @@ class SessionOptions(object):
"""Options for a new :class:`ClientSession`.
:Parameters:
- `causal_consistency` (optional): If True (the default), read
operations are causally ordered within the session.
- `causal_consistency` (optional): If True, read operations are causally
ordered within the session. Defaults to True when the ``snapshot``
option is ``False``.
- `default_transaction_options` (optional): The default
TransactionOptions to use for transactions started on this session.
- `snapshot` (optional): If True, then all reads performed using this
session will read from the same snapshot. This option is incompatible
with ``causal_consistency=True``. Defaults to ``False``.
.. versionchanged:: 3.12
Added the ``snapshot`` parameter.
"""
def __init__(self,
causal_consistency=True,
default_transaction_options=None):
causal_consistency=None,
default_transaction_options=None,
snapshot=False):
if snapshot:
if causal_consistency:
raise ConfigurationError('snapshot reads do not support '
'causal_consistency=True')
causal_consistency = False
elif causal_consistency is None:
causal_consistency = True
self._causal_consistency = causal_consistency
if default_transaction_options is not None:
if not isinstance(default_transaction_options, TransactionOptions):
@ -142,6 +157,7 @@ class SessionOptions(object):
"pymongo.client_session.TransactionOptions, not: %r" %
(default_transaction_options,))
self._default_transaction_options = default_transaction_options
self._snapshot = snapshot
@property
def causal_consistency(self):
@ -157,6 +173,14 @@ class SessionOptions(object):
"""
return self._default_transaction_options
@property
def snapshot(self):
"""Whether snapshot reads are configured.
.. versionadded:: 3.12
"""
return self._snapshot
class TransactionOptions(object):
"""Options for :meth:`ClientSession.start_transaction`.
@ -388,6 +412,7 @@ class ClientSession(object):
self._options = options
self._cluster_time = None
self._operation_time = None
self._snapshot_time = None
# Is this an implicitly created session?
self._implicit = implicit
self._transaction = _Transaction(None, client)
@ -603,6 +628,10 @@ class ClientSession(object):
"""
self._check_ended()
if self.options.snapshot:
raise InvalidOperation("Transactions are not supported in "
"snapshot sessions")
if self.in_transaction:
raise InvalidOperation("Transaction already in progress")
@ -781,6 +810,12 @@ class ClientSession(object):
"""Process a response to a command that was run with this session."""
self._advance_cluster_time(reply.get('$clusterTime'))
self._advance_operation_time(reply.get('operationTime'))
if self._options.snapshot and self._snapshot_time is None:
if 'cursor' in reply:
ct = reply['cursor'].get('atClusterTime')
else:
ct = reply.get('atClusterTime')
self._snapshot_time = ct
if self.in_transaction and self._transaction.sharded:
recovery_token = reply.get('recoveryToken')
if recovery_token:
@ -854,15 +889,9 @@ class ClientSession(object):
if self._transaction.opts.read_concern:
rc = self._transaction.opts.read_concern.document
else:
rc = {}
if (self.options.causal_consistency
and self.operation_time is not None):
rc['afterClusterTime'] = self.operation_time
if rc:
command['readConcern'] = rc
if rc:
command['readConcern'] = rc
self._update_read_concern(command)
command['txnNumber'] = self._server_session.transaction_id
command['autocommit'] = False
@ -871,6 +900,17 @@ class ClientSession(object):
self._check_ended()
self._server_session.inc_transaction_id()
def _update_read_concern(self, cmd):
if (self.options.causal_consistency
and self.operation_time is not None):
cmd.setdefault('readConcern', {})[
'afterClusterTime'] = self.operation_time
if self.options.snapshot:
rc = cmd.setdefault('readConcern', {})
rc['level'] = 'snapshot'
if self._snapshot_time is not None:
rc['atClusterTime'] = self._snapshot_time
class _ServerSession(object):
def __init__(self, generation):

View File

@ -315,12 +315,8 @@ class _Query(object):
if session:
session._apply_to(cmd, False, self.read_preference)
# Explain does not support readConcern.
if (not explain and session.options.causal_consistency
and session.operation_time is not None
and not session.in_transaction):
cmd.setdefault(
'readConcern', {})[
'afterClusterTime'] = session.operation_time
if not explain and not session.in_transaction:
session._update_read_concern(cmd)
sock_info.send_cluster_time(cmd, session, self.client)
# Support auto encryption
client = self.client

View File

@ -1658,8 +1658,9 @@ class MongoClient(common.BaseObject):
self, server_session, opts, implicit)
def start_session(self,
causal_consistency=True,
default_transaction_options=None):
causal_consistency=None,
default_transaction_options=None,
snapshot=False):
"""Start a logical session.
This method takes the same parameters as
@ -1682,7 +1683,8 @@ class MongoClient(common.BaseObject):
return self.__start_session(
False,
causal_consistency=causal_consistency,
default_transaction_options=default_transaction_options)
default_transaction_options=default_transaction_options,
snapshot=snapshot)
def _get_server_session(self):
"""Internal: start or resume a _ServerSession."""

View File

@ -90,10 +90,8 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
if read_concern and not (session and session.in_transaction):
if read_concern.level:
spec['readConcern'] = read_concern.document
if (session and session.options.causal_consistency
and session.operation_time is not None):
spec.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
if session:
session._update_read_concern(spec)
if collation is not None:
spec['collation'] = collation

View File

@ -0,0 +1,23 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_sessions_unified import *
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,105 @@
{
"description": "snapshot-sessions-not-supported-server-error",
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"maxServerVersion": "4.4.99",
"topologies": [
"replicaset, sharded-replicaset"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
},
{
"session": {
"id": "session0",
"client": "client0",
"sessionOptions": {
"snapshot": true
}
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "Server returns an error on find with snapshot",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {}
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
},
"commandName": "find",
"databaseName": "database0"
}
},
{
"commandFailedEvent": {
"commandName": "find"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,939 @@
{
"description": "snapshot-sessions",
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "5.0",
"topologies": [
"replicaset",
"sharded-replicaset"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
],
"ignoreCommandMonitoringEvents": [
"findAndModify",
"insert",
"update"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0",
"collectionOptions": {
"writeConcern": {
"w": "majority"
}
}
}
},
{
"session": {
"id": "session0",
"client": "client0",
"sessionOptions": {
"snapshot": true
}
}
},
{
"session": {
"id": "session1",
"client": "client0",
"sessionOptions": {
"snapshot": true
}
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 11
}
]
}
],
"tests": [
{
"description": "Find operation with snapshot",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 1,
"x": 12
}
},
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session1",
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 12
}
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 1,
"x": 13
}
},
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 13
}
]
},
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session1",
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 12
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"$$exists": false
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
}
]
}
]
},
{
"description": "Distinct operation with snapshot",
"operations": [
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectResult": [
11
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 2
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 2,
"x": 12
}
},
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session1"
},
"expectResult": [
11,
12
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 2
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 2,
"x": 13
}
},
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {}
},
"expectResult": [
11,
13
]
},
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectResult": [
11
]
},
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session1"
},
"expectResult": [
11,
12
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"$$exists": false
}
}
}
},
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
}
]
}
]
},
{
"description": "Aggregate operation with snapshot",
"operations": [
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
],
"session": "session0"
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 1,
"x": 12
}
},
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
],
"session": "session1"
},
"expectResult": [
{
"_id": 1,
"x": 12
}
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 1,
"x": 13
}
},
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
]
},
"expectResult": [
{
"_id": 1,
"x": 13
}
]
},
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
],
"session": "session0"
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
],
"session": "session1"
},
"expectResult": [
{
"_id": 1,
"x": 12
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"$$exists": false
}
}
}
},
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
}
]
}
]
},
{
"description": "Mixed operation with snapshot",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "findOneAndUpdate",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$inc": {
"x": 1
}
},
"returnDocument": "After"
},
"expectResult": {
"_id": 1,
"x": 12
}
},
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 12
}
]
},
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"_id": 1
}
}
],
"session": "session0"
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
},
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectResult": [
11
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"$$exists": false
}
}
}
},
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
}
]
}
]
},
{
"description": "Write commands with snapshot session do not affect snapshot reads",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {},
"session": "session0"
}
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"session": "session0",
"document": {
"_id": 22,
"x": 33
}
}
},
{
"name": "updateOne",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"session": "session0",
"update": {
"$inc": {
"x": 1
}
}
}
},
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"session": "session0"
},
"expectResult": [
{
"_id": 1,
"x": 11
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": true
}
}
}
}
}
]
}
]
},
{
"description": "First snapshot read does not send atClusterTime",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"filter": {},
"session": "session0"
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
},
"commandName": "find",
"databaseName": "database0"
}
}
]
}
]
},
{
"description": "StartTransaction fails in snapshot session",
"operations": [
{
"name": "startTransaction",
"object": "session0",
"expectError": {
"isError": true,
"isClientError": true,
"errorContains": "Transactions are not supported in snapshot sessions"
}
}
]
}
]
}

View File

@ -21,6 +21,8 @@ import time
from io import BytesIO
sys.path[0:0] = [""]
from bson import DBRef
from gridfs import GridFS, GridFSBucket
from pymongo import ASCENDING, InsertOne, IndexModel, OFF, monitoring
@ -712,6 +714,21 @@ class TestSession(IntegrationTest):
wait_until(drop_db, 'dropped database after w=0 writes')
def test_snapshot_incompatible_with_causal_consistency(self):
with self.client.start_session(causal_consistency=False,
snapshot=False):
pass
with self.client.start_session(causal_consistency=False,
snapshot=True):
pass
with self.client.start_session(causal_consistency=True,
snapshot=False):
pass
with self.assertRaises(ConfigurationError):
with self.client.start_session(causal_consistency=True,
snapshot=True):
pass
class TestCausalConsistency(unittest.TestCase):
@ -1153,7 +1170,7 @@ class TestClusterTime(IntegrationTest):
class TestSpec(SpecRunner):
# Location of JSON test specifications.
TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'sessions')
os.path.dirname(os.path.realpath(__file__)), 'sessions', 'legacy')
def last_two_command_events(self):
"""Return the last two command started events."""
@ -1198,3 +1215,6 @@ def create_test(scenario_def, test, name):
test_creator = TestCreator(create_test, TestSpec, TestSpec.TEST_PATH)
test_creator.create_tests()
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,33 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the Sessions unified spec tests."""
import os
import sys
sys.path[0:0] = [""]
from test import unittest
from test.unified_format import generate_test_classes
# Location of JSON test specifications.
TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'sessions', 'unified')
# Generate unified tests.
globals().update(generate_test_classes(TEST_PATH, module=__name__))
if __name__ == "__main__":
unittest.main()

View File

@ -40,8 +40,8 @@ from pymongo.change_stream import ChangeStream
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.errors import (
BulkWriteError, ConnectionFailure, InvalidOperation, NotPrimaryError,
PyMongoError)
BulkWriteError, ConnectionFailure, ConfigurationError, InvalidOperation,
NotPrimaryError, PyMongoError)
from pymongo.monitoring import (
CommandFailedEvent, CommandListener, CommandStartedEvent,
CommandSucceededEvent, _SENSITIVE_COMMANDS, PoolCreatedEvent,
@ -501,8 +501,11 @@ class MatchEvaluatorUtil(object):
self.match_result(value, actual[key], in_recursive_call=True)
if not is_root:
self.test.assertEqual(
set(expectation.keys()), set(actual.keys()))
expected_keys = set(expectation.keys())
for key, value in expectation.items():
if value == {'$$exists': False}:
expected_keys.remove(key)
self.test.assertEqual(expected_keys, set(actual.keys()))
def match_result(self, expectation, actual,
in_recursive_call=False):
@ -723,6 +726,8 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
# Connection errors are considered client errors.
if isinstance(exception, ConnectionFailure):
self.assertNotIsInstance(exception, NotPrimaryError)
elif isinstance(exception, (InvalidOperation, ConfigurationError)):
pass
else:
self.assertNotIsInstance(exception, PyMongoError)