PYTHON-1340 - Implement causally consistent reads

This commit is contained in:
Bernie Hackett 2017-11-09 17:16:55 -08:00
parent 1e802b3108
commit 4eda4ffaec
8 changed files with 364 additions and 28 deletions

View File

@ -28,7 +28,6 @@ from pymongo.common import (validate_is_mapping,
from pymongo.collation import validate_collation_or_none
from pymongo.errors import (BulkWriteError,
ConfigurationError,
DocumentTooLarge,
InvalidOperation,
OperationFailure)
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
@ -261,7 +260,7 @@ class _Bulk(object):
check_keys = run.op_type == _INSERT
ops = islice(run.ops, idx_offset, None)
# Run as many ops as possible.
client._send_cluster_time(cmd)
client._send_cluster_time(cmd, s)
request_id, msg, to_send = _do_batched_write_command(
self.namespace, run.op_type, cmd, ops, check_keys,
self.collection.codec_options, bwc)
@ -269,6 +268,9 @@ class _Bulk(object):
raise InvalidOperation("cannot do an empty bulk write")
result = bwc.write_command(request_id, msg, to_send)
client._receive_cluster_time(result)
if s is not None:
s._advance_cluster_time(result.get("$clusterTime"))
s._advance_operation_time(result.get("operationTime"))
results.append((idx_offset, result))
if self.ordered and "writeErrors" in result:
break

View File

@ -23,7 +23,7 @@ Causally Consistent Reads
.. code-block:: python
with client.start_session(causally_consistent_reads=True) as session:
with client.start_session(causal_consistency=True) as session:
collection = client.db.collection
collection.update_one({'_id': 1}, {'$set': {'x': 10}}, session=session)
secondary_c = collection.with_options(
@ -32,10 +32,10 @@ Causally Consistent Reads
# A secondary read waits for replication of the write.
secondary_c.find_one({'_id': 1}, session=session)
If `causally_consistent_reads` is True, read operations that use the session are
causally after previous read and write operations. Using a causally consistent
session, an application can read its own writes and is guaranteed monotonic
reads, even when reading from replica set secondaries.
If `causal_consistency` is True (the default), read operations that use
the session are causally after previous read and write operations. Using a
causally consistent session, an application can read its own writes and is
guaranteed monotonic reads, even when reading from replica set secondaries.
Classes
=======
@ -46,6 +46,7 @@ import uuid
from bson.binary import Binary
from bson.int64 import Int64
from bson.timestamp import Timestamp
from pymongo import monotonic
from pymongo.errors import InvalidOperation
@ -55,16 +56,16 @@ class SessionOptions(object):
"""Options for a new :class:`ClientSession`.
:Parameters:
- `causally_consistent_reads` (optional): If True, read operations are
causally ordered within the session.
- `causal_consistency` (optional): If True (the default), read
operations are causally ordered within the session.
"""
def __init__(self, causally_consistent_reads=False):
self._causally_consistent_reads = causally_consistent_reads
def __init__(self, causal_consistency=True):
self._causal_consistency = causal_consistency
@property
def causally_consistent_reads(self):
"""Whether causally consistent reads are configured."""
return self._causally_consistent_reads
def causal_consistency(self):
"""Whether causal consistency is configured."""
return self._causal_consistency
class ClientSession(object):
@ -75,6 +76,8 @@ class ClientSession(object):
self._server_session = server_session
self._options = options
self._authset = authset
self._cluster_time = None
self._operation_time = None
def end_session(self):
"""Finish this session.
@ -117,6 +120,64 @@ class ClientSession(object):
return self._server_session.session_id
@property
def cluster_time(self):
"""The cluster time returned by the last operation executed
in this session.
"""
return self._cluster_time
@property
def operation_time(self):
"""The operation time returned by the last operation executed
in this session.
"""
return self._operation_time
def _advance_cluster_time(self, cluster_time):
"""Internal cluster time helper."""
if self._cluster_time is None:
self._cluster_time = cluster_time
elif cluster_time is not None:
if cluster_time["clusterTime"] > self._cluster_time["clusterTime"]:
self._cluster_time = cluster_time
def advance_cluster_time(self, cluster_time):
"""Update the cluster time for this session.
:Parameters:
- `cluster_time`: The
:data:`~pymongo.client_session.ClientSession.cluster_time` from
another `ClientSession` instance.
"""
if not isinstance(cluster_time, collections.Mapping):
raise TypeError(
"cluster_time must be a subclass of collections.Mapping")
if not isinstance(cluster_time.get("clusterTime"), Timestamp):
raise ValueError("Invalid cluster_time")
self._advance_cluster_time(cluster_time)
def _advance_operation_time(self, operation_time):
"""Internal operation time helper."""
if self._operation_time is None:
self._operation_time = operation_time
elif operation_time is not None:
if operation_time > self._operation_time:
self._operation_time = operation_time
def advance_operation_time(self, operation_time):
"""Update the operation time for this session.
:Parameters:
- `operation_time`: The
:data:`~pymongo.client_session.ClientSession.operation_time` from
another `ClientSession` instance.
"""
if not isinstance(operation_time, Timestamp):
raise TypeError("operation_time must be an instance "
"of bson.timestamp.Timestamp")
self._advance_operation_time(operation_time)
@property
def has_ended(self):
"""True if this session is finished."""

View File

@ -42,7 +42,6 @@ class CommandCursor(object):
The parameter 'retrieved' is unused.
"""
self.__collection = collection
self.__session = None
self.__id = cursor_info['id']
self.__address = address
self.__data = deque(cursor_info['firstBatch'])
@ -153,8 +152,14 @@ class CommandCursor(object):
self.__id,
self.__collection.codec_options)
if from_command:
client._receive_cluster_time(docs[0])
helpers._check_command_response(docs[0])
first = docs[0]
client._receive_cluster_time(first)
if self.__session is not None:
self.__session._advance_cluster_time(
first.get('$clusterTime'))
self.__session._advance_operation_time(
first.get('operationTime'))
helpers._check_command_response(first)
except OperationFailure as exc:
kill()

View File

@ -966,8 +966,14 @@ class Cursor(object):
cursor_id=self.__id,
codec_options=self.__codec_options)
if from_command:
client._receive_cluster_time(docs[0])
helpers._check_command_response(docs[0])
first = docs[0]
client._receive_cluster_time(first)
if self.__session is not None:
self.__session._advance_cluster_time(
first.get("$clusterTime"))
self.__session._advance_operation_time(
first.get("operationTime"))
helpers._check_command_response(first)
except OperationFailure as exc:
self.__killed = True

View File

@ -183,8 +183,12 @@ def _gen_explain_command(
if session:
explain['lsid'] = session._use_lsid()
if (session.options.causal_consistency
and session.operation_time is not None):
explain.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
client._send_cluster_time(explain)
client._send_cluster_time(explain, session)
return explain
@ -223,8 +227,12 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
if options & val])
if session:
cmd['lsid'] = session._use_lsid()
if (session.options.causal_consistency
and session.operation_time is not None):
cmd.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
if client:
client._send_cluster_time(cmd)
client._send_cluster_time(cmd, session)
return cmd
@ -239,7 +247,7 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms,
cmd['maxTimeMS'] = max_await_time_ms
if session:
cmd['lsid'] = session._use_lsid()
client._send_cluster_time(cmd)
client._send_cluster_time(cmd, session)
return cmd

View File

@ -1309,7 +1309,9 @@ class MongoClient(common.BaseObject):
return session
try:
return self.start_session()
# Don't make implied sessions causally consistent. Applications
# should always opt-in.
return self.start_session(causal_consistency=False)
except (ConfigurationError, InvalidOperation):
# Sessions not supported, or multiple users authenticated.
return None
@ -1337,8 +1339,16 @@ class MongoClient(common.BaseObject):
else:
yield None
def _send_cluster_time(self, command):
cluster_time = self._topology.max_cluster_time()
def _send_cluster_time(self, command, session):
topology_time = self._topology.max_cluster_time()
session_time = session.cluster_time if session else None
if topology_time and session_time:
if topology_time['clusterTime'] > session_time['clusterTime']:
cluster_time = topology_time
else:
cluster_time = session_time
else:
cluster_time = topology_time or session_time
if cluster_time:
command['$clusterTime'] = cluster_time

View File

@ -90,7 +90,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
if retryable_write:
spec['txnNumber'] = session._transaction_id()
if client:
client._send_cluster_time(spec)
client._send_cluster_time(spec, session)
# Publish the original command document, perhaps with lsid and $clusterTime.
orig = spec
@ -98,6 +98,10 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
spec = message._maybe_add_read_preference(spec, read_preference)
if read_concern.level:
spec['readConcern'] = read_concern.document
if (session and session.options.causal_consistency
and session.operation_time is not None):
spec.setdefault(
'readConcern', {})['afterClusterTime'] = session.operation_time
if collation is not None:
spec['collation'] = collation
@ -126,6 +130,11 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
response_doc = unpacked_docs[0]
if client:
client._receive_cluster_time(response_doc)
if session:
session._advance_cluster_time(
response_doc.get('$clusterTime'))
session._advance_operation_time(
response_doc.get('operationTime'))
if check:
helpers._check_command_response(
response_doc, None, allowable_errors,

View File

@ -20,12 +20,14 @@ import sys
from bson import DBRef
from bson.py3compat import StringIO
from gridfs import GridFS, GridFSBucket
from pymongo import InsertOne, IndexModel, OFF, monitoring
from pymongo import ASCENDING, InsertOne, IndexModel, OFF, monitoring
from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
from pymongo.monotonic import time as _time
from test import IntegrationTest, client_context, db_user, db_pwd, SkipTest
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from test import IntegrationTest, client_context, db_user, db_pwd, unittest, SkipTest
from test.utils import ignore_deprecations, rs_or_single_client, EventListener
@ -623,6 +625,239 @@ class TestSession(IntegrationTest):
lambda cursor: cursor.__del__())
class TestCausalConsistency(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.listener = SessionTestListener()
cls.client = rs_or_single_client(event_listeners=[cls.listener])
@client_context.require_sessions
def setUp(self):
super(TestCausalConsistency, self).setUp()
@client_context.require_no_standalone
def test_core(self):
with self.client.start_session() as sess:
self.assertIsNone(sess.cluster_time)
self.assertIsNone(sess.operation_time)
self.listener.results.clear()
self.client.pymongo_test.test.find_one(session=sess)
started = self.listener.results['started'][0]
cmd = started.command
self.assertIsNone(cmd.get('readConcern'))
op_time = sess.operation_time
self.assertIsNotNone(op_time)
succeeded = self.listener.results['succeeded'][0]
reply = succeeded.reply
self.assertEqual(op_time, reply.get('operationTime'))
# No explicit session
self.client.pymongo_test.test.insert_one({})
self.assertEqual(sess.operation_time, op_time)
self.listener.results.clear()
try:
self.client.pymongo_test.command('doesntexist', session=sess)
except:
pass
# operationTime was updated from a failed command
self.assertNotEqual(op_time, sess.operation_time)
failed = self.listener.results['failed'][0]
self.assertEqual(
sess.operation_time, failed.failure.get('operationTime'))
with self.client.start_session() as sess2:
self.assertIsNone(sess2.cluster_time)
self.assertIsNone(sess2.operation_time)
self.assertRaises(TypeError, sess2.advance_cluster_time, 1)
self.assertRaises(ValueError, sess2.advance_cluster_time, {})
self.assertRaises(TypeError, sess2.advance_operation_time, 1)
# No error
sess2.advance_cluster_time(sess.cluster_time)
sess2.advance_operation_time(sess.operation_time)
self.assertEqual(sess.cluster_time, sess2.cluster_time)
self.assertEqual(sess.operation_time, sess2.operation_time)
def _test_reads(self, op):
coll = self.client.pymongo_test.test
with self.client.start_session() as sess:
coll.find_one({}, session=sess)
operation_time = sess.operation_time
self.assertIsNotNone(operation_time)
self.listener.results.clear()
op(coll, sess)
act = self.listener.results['started'][0].command.get(
'readConcern', {}).get('afterClusterTime')
self.assertEqual(operation_time, act)
@client_context.require_no_standalone
def test_reads(self):
self._test_reads(
lambda coll, session: list(
coll.database.list_collections(session=session)))
self._test_reads(
lambda coll, session: coll.database.list_collection_names(
session=session))
self._test_reads(
lambda coll, session: coll.database.command(
'ismaster', session=session))
self._test_reads(
lambda coll, session: list(coll.aggregate([], session=session)))
# PYTHON-1398
#self._test_reads(
# lambda coll, session: list(
# coll.aggregate_raw_batches([], session=session)))
self._test_reads(
lambda coll, session: list(coll.find({}, session=session)))
# PYTHON-1398
#self._test_reads(
# lambda coll, session: list(
# coll.find_raw_batches({}, session=session)))
self._test_reads(
lambda coll, session: coll.find_one({}, session=session))
self._test_reads(
lambda coll, session: coll.count(session=session))
self._test_reads(
lambda coll, session: list(coll.list_indexes(session=session)))
self._test_reads(
lambda coll, session: coll.index_information(session=session))
self._test_reads(
lambda coll, session: coll.options(session=session))
self._test_reads(
lambda coll, session: coll.distinct('foo', session=session))
self._test_reads(
lambda coll, session: coll.map_reduce(
'function() {}', 'function() {}', 'output', session=session))
self._test_reads(
lambda coll, session: coll.inline_map_reduce(
'function() {}', 'function() {}', session=session))
if not client_context.is_mongos:
self._test_reads(
lambda coll, session: list(
coll.parallel_scan(1, session=session)))
def _test_writes(self, op):
coll = self.client.pymongo_test.test
with self.client.start_session() as sess:
op(coll, sess)
operation_time = sess.operation_time
self.assertIsNotNone(operation_time)
self.listener.results.clear()
coll.find_one({}, session=sess)
act = self.listener.results['started'][0].command.get(
'readConcern', {}).get('afterClusterTime')
self.assertEqual(operation_time, act)
@client_context.require_no_standalone
def test_writes(self):
self._test_writes(
lambda coll, session: coll.bulk_write(
[InsertOne({})], session=session))
self._test_writes(
lambda coll, session: coll.insert_one({}, session=session))
self._test_writes(
lambda coll, session: coll.insert_many([{}], session=session))
self._test_writes(
lambda coll, session: coll.replace_one(
{'_id': 1}, {'x': 1}, session=session))
self._test_writes(
lambda coll, session: coll.update_one(
{}, {'$set': {'X': 1}}, session=session))
self._test_writes(
lambda coll, session: coll.update_many(
{}, {'$set': {'x': 1}}, session=session))
self._test_writes(
lambda coll, session: coll.delete_one({}, session=session))
self._test_writes(
lambda coll, session: coll.delete_many({}, session=session))
self._test_writes(
lambda coll, session: coll.find_one_and_replace(
{'x': 1}, {'y': 1}, session=session))
self._test_writes(
lambda coll, session: coll.find_one_and_update(
{'y': 1}, {'$set': {'x': 1}}, session=session))
self._test_writes(
lambda coll, session: coll.find_one_and_delete(
{'x': 1}, session=session))
self._test_writes(
lambda coll, session: coll.create_index("foo", session=session))
self._test_writes(
lambda coll, session: coll.create_indexes(
[IndexModel([("bar", ASCENDING)])], session=session))
self._test_writes(
lambda coll, session: coll.drop_index("foo_1", session=session))
self._test_writes(
lambda coll, session: coll.drop_indexes(session=session))
self._test_writes(
lambda coll, session: coll.reindex(session=session))
def test_session_not_causal(self):
with self.client.start_session(causal_consistency=False) as s:
self.client.pymongo_test.test.insert_one({}, session=s)
self.listener.results.clear()
self.client.pymongo_test.test.find_one({}, session=s)
act = self.listener.results['started'][0].command.get(
'readConcern', {}).get('afterClusterTime')
self.assertIsNone(act)
@client_context.require_standalone
def test_server_not_causal(self):
with self.client.start_session(causal_consistency=True) as s:
self.client.pymongo_test.test.insert_one({}, session=s)
self.listener.results.clear()
self.client.pymongo_test.test.find_one({}, session=s)
act = self.listener.results['started'][0].command.get(
'readConcern', {}).get('afterClusterTime')
self.assertIsNone(act)
@client_context.require_no_standalone
def test_read_concern(self):
with self.client.start_session(causal_consistency=True) as s:
coll = self.client.pymongo_test.test
coll.insert_one({}, session=s)
self.listener.results.clear()
coll.find_one({}, session=s)
read_concern = self.listener.results['started'][0].command.get(
'readConcern')
self.assertIsNotNone(read_concern)
self.assertIsNone(read_concern.get('level'))
self.assertIsNotNone(read_concern.get('afterClusterTime'))
coll = coll.with_options(read_concern=ReadConcern("majority"))
self.listener.results.clear()
coll.find_one({}, session=s)
read_concern = self.listener.results['started'][0].command.get(
'readConcern')
self.assertIsNotNone(read_concern)
self.assertEqual(read_concern.get('level'), 'majority')
self.assertIsNotNone(read_concern.get('afterClusterTime'))
def test_unacknowledged(self):
with self.client.start_session(causal_consistency=True) as s:
coll = self.client.pymongo_test.get_collection(
'test', write_concern=WriteConcern(w=0))
coll.insert_one({}, session=s)
self.assertIsNone(s.operation_time)
@client_context.require_no_standalone
def test_cluster_time_with_server_support(self):
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
self.client.pymongo_test.test.find_one({})
after_cluster_time = self.listener.results['started'][0].command.get(
'$clusterTime')
self.assertIsNotNone(after_cluster_time)
@client_context.require_standalone
def test_cluster_time_no_server_support(self):
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
self.client.pymongo_test.test.find_one({})
after_cluster_time = self.listener.results['started'][0].command.get(
'$clusterTime')
self.assertIsNone(after_cluster_time)
class TestSessionsMultiAuth(IntegrationTest):
@client_context.require_auth
@client_context.require_sessions