From 4eda4ffaec80bca271769cdc98a9f5a05bbc0c10 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 9 Nov 2017 17:16:55 -0800 Subject: [PATCH] PYTHON-1340 - Implement causally consistent reads --- pymongo/bulk.py | 6 +- pymongo/client_session.py | 85 ++++++++++++-- pymongo/command_cursor.py | 11 +- pymongo/cursor.py | 10 +- pymongo/message.py | 14 ++- pymongo/mongo_client.py | 16 ++- pymongo/network.py | 11 +- test/test_session.py | 239 +++++++++++++++++++++++++++++++++++++- 8 files changed, 364 insertions(+), 28 deletions(-) diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 832deaad4..c056e80fd 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -28,7 +28,6 @@ from pymongo.common import (validate_is_mapping, from pymongo.collation import validate_collation_or_none from pymongo.errors import (BulkWriteError, ConfigurationError, - DocumentTooLarge, InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, @@ -261,7 +260,7 @@ class _Bulk(object): check_keys = run.op_type == _INSERT ops = islice(run.ops, idx_offset, None) # Run as many ops as possible. - client._send_cluster_time(cmd) + client._send_cluster_time(cmd, s) request_id, msg, to_send = _do_batched_write_command( self.namespace, run.op_type, cmd, ops, check_keys, self.collection.codec_options, bwc) @@ -269,6 +268,9 @@ class _Bulk(object): raise InvalidOperation("cannot do an empty bulk write") result = bwc.write_command(request_id, msg, to_send) client._receive_cluster_time(result) + if s is not None: + s._advance_cluster_time(result.get("$clusterTime")) + s._advance_operation_time(result.get("operationTime")) results.append((idx_offset, result)) if self.ordered and "writeErrors" in result: break diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 2f030503a..151b701e3 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -23,7 +23,7 @@ Causally Consistent Reads .. code-block:: python - with client.start_session(causally_consistent_reads=True) as session: + with client.start_session(causal_consistency=True) as session: collection = client.db.collection collection.update_one({'_id': 1}, {'$set': {'x': 10}}, session=session) secondary_c = collection.with_options( @@ -32,10 +32,10 @@ Causally Consistent Reads # A secondary read waits for replication of the write. secondary_c.find_one({'_id': 1}, session=session) -If `causally_consistent_reads` is True, read operations that use the session are -causally after previous read and write operations. Using a causally consistent -session, an application can read its own writes and is guaranteed monotonic -reads, even when reading from replica set secondaries. +If `causal_consistency` is True (the default), read operations that use +the session are causally after previous read and write operations. Using a +causally consistent session, an application can read its own writes and is +guaranteed monotonic reads, even when reading from replica set secondaries. Classes ======= @@ -46,6 +46,7 @@ import uuid from bson.binary import Binary from bson.int64 import Int64 +from bson.timestamp import Timestamp from pymongo import monotonic from pymongo.errors import InvalidOperation @@ -55,16 +56,16 @@ class SessionOptions(object): """Options for a new :class:`ClientSession`. :Parameters: - - `causally_consistent_reads` (optional): If True, read operations are - causally ordered within the session. + - `causal_consistency` (optional): If True (the default), read + operations are causally ordered within the session. """ - def __init__(self, causally_consistent_reads=False): - self._causally_consistent_reads = causally_consistent_reads + def __init__(self, causal_consistency=True): + self._causal_consistency = causal_consistency @property - def causally_consistent_reads(self): - """Whether causally consistent reads are configured.""" - return self._causally_consistent_reads + def causal_consistency(self): + """Whether causal consistency is configured.""" + return self._causal_consistency class ClientSession(object): @@ -75,6 +76,8 @@ class ClientSession(object): self._server_session = server_session self._options = options self._authset = authset + self._cluster_time = None + self._operation_time = None def end_session(self): """Finish this session. @@ -117,6 +120,64 @@ class ClientSession(object): return self._server_session.session_id + @property + def cluster_time(self): + """The cluster time returned by the last operation executed + in this session. + """ + return self._cluster_time + + @property + def operation_time(self): + """The operation time returned by the last operation executed + in this session. + """ + return self._operation_time + + def _advance_cluster_time(self, cluster_time): + """Internal cluster time helper.""" + if self._cluster_time is None: + self._cluster_time = cluster_time + elif cluster_time is not None: + if cluster_time["clusterTime"] > self._cluster_time["clusterTime"]: + self._cluster_time = cluster_time + + def advance_cluster_time(self, cluster_time): + """Update the cluster time for this session. + + :Parameters: + - `cluster_time`: The + :data:`~pymongo.client_session.ClientSession.cluster_time` from + another `ClientSession` instance. + """ + if not isinstance(cluster_time, collections.Mapping): + raise TypeError( + "cluster_time must be a subclass of collections.Mapping") + if not isinstance(cluster_time.get("clusterTime"), Timestamp): + raise ValueError("Invalid cluster_time") + self._advance_cluster_time(cluster_time) + + def _advance_operation_time(self, operation_time): + """Internal operation time helper.""" + if self._operation_time is None: + self._operation_time = operation_time + elif operation_time is not None: + if operation_time > self._operation_time: + self._operation_time = operation_time + + def advance_operation_time(self, operation_time): + """Update the operation time for this session. + + :Parameters: + - `operation_time`: The + :data:`~pymongo.client_session.ClientSession.operation_time` from + another `ClientSession` instance. + """ + if not isinstance(operation_time, Timestamp): + raise TypeError("operation_time must be an instance " + "of bson.timestamp.Timestamp") + self._advance_operation_time(operation_time) + @property def has_ended(self): """True if this session is finished.""" diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index 1ff58ef8f..cb80f686a 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -42,7 +42,6 @@ class CommandCursor(object): The parameter 'retrieved' is unused. """ self.__collection = collection - self.__session = None self.__id = cursor_info['id'] self.__address = address self.__data = deque(cursor_info['firstBatch']) @@ -153,8 +152,14 @@ class CommandCursor(object): self.__id, self.__collection.codec_options) if from_command: - client._receive_cluster_time(docs[0]) - helpers._check_command_response(docs[0]) + first = docs[0] + client._receive_cluster_time(first) + if self.__session is not None: + self.__session._advance_cluster_time( + first.get('$clusterTime')) + self.__session._advance_operation_time( + first.get('operationTime')) + helpers._check_command_response(first) except OperationFailure as exc: kill() diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 37376c5d9..de4b78028 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -966,8 +966,14 @@ class Cursor(object): cursor_id=self.__id, codec_options=self.__codec_options) if from_command: - client._receive_cluster_time(docs[0]) - helpers._check_command_response(docs[0]) + first = docs[0] + client._receive_cluster_time(first) + if self.__session is not None: + self.__session._advance_cluster_time( + first.get("$clusterTime")) + self.__session._advance_operation_time( + first.get("operationTime")) + helpers._check_command_response(first) except OperationFailure as exc: self.__killed = True diff --git a/pymongo/message.py b/pymongo/message.py index 03e77cabe..cc18472c7 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -183,8 +183,12 @@ def _gen_explain_command( if session: explain['lsid'] = session._use_lsid() + if (session.options.causal_consistency + and session.operation_time is not None): + explain.setdefault( + 'readConcern', {})['afterClusterTime'] = session.operation_time - client._send_cluster_time(explain) + client._send_cluster_time(explain, session) return explain @@ -223,8 +227,12 @@ def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options, if options & val]) if session: cmd['lsid'] = session._use_lsid() + if (session.options.causal_consistency + and session.operation_time is not None): + cmd.setdefault( + 'readConcern', {})['afterClusterTime'] = session.operation_time if client: - client._send_cluster_time(cmd) + client._send_cluster_time(cmd, session) return cmd @@ -239,7 +247,7 @@ def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms, cmd['maxTimeMS'] = max_await_time_ms if session: cmd['lsid'] = session._use_lsid() - client._send_cluster_time(cmd) + client._send_cluster_time(cmd, session) return cmd diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index c1e7a376e..c4ad0d4de 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1309,7 +1309,9 @@ class MongoClient(common.BaseObject): return session try: - return self.start_session() + # Don't make implied sessions causally consistent. Applications + # should always opt-in. + return self.start_session(causal_consistency=False) except (ConfigurationError, InvalidOperation): # Sessions not supported, or multiple users authenticated. return None @@ -1337,8 +1339,16 @@ class MongoClient(common.BaseObject): else: yield None - def _send_cluster_time(self, command): - cluster_time = self._topology.max_cluster_time() + def _send_cluster_time(self, command, session): + topology_time = self._topology.max_cluster_time() + session_time = session.cluster_time if session else None + if topology_time and session_time: + if topology_time['clusterTime'] > session_time['clusterTime']: + cluster_time = topology_time + else: + cluster_time = session_time + else: + cluster_time = topology_time or session_time if cluster_time: command['$clusterTime'] = cluster_time diff --git a/pymongo/network.py b/pymongo/network.py index ae940f2a2..64efb92b1 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -90,7 +90,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos, if retryable_write: spec['txnNumber'] = session._transaction_id() if client: - client._send_cluster_time(spec) + client._send_cluster_time(spec, session) # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec @@ -98,6 +98,10 @@ def command(sock, dbname, spec, slave_ok, is_mongos, spec = message._maybe_add_read_preference(spec, read_preference) if read_concern.level: spec['readConcern'] = read_concern.document + if (session and session.options.causal_consistency + and session.operation_time is not None): + spec.setdefault( + 'readConcern', {})['afterClusterTime'] = session.operation_time if collation is not None: spec['collation'] = collation @@ -126,6 +130,11 @@ def command(sock, dbname, spec, slave_ok, is_mongos, response_doc = unpacked_docs[0] if client: client._receive_cluster_time(response_doc) + if session: + session._advance_cluster_time( + response_doc.get('$clusterTime')) + session._advance_operation_time( + response_doc.get('operationTime')) if check: helpers._check_command_response( response_doc, None, allowable_errors, diff --git a/test/test_session.py b/test/test_session.py index 01f8f2762..f47a2aa85 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -20,12 +20,14 @@ import sys from bson import DBRef from bson.py3compat import StringIO from gridfs import GridFS, GridFSBucket -from pymongo import InsertOne, IndexModel, OFF, monitoring +from pymongo import ASCENDING, InsertOne, IndexModel, OFF, monitoring from pymongo.errors import (ConfigurationError, InvalidOperation, OperationFailure) from pymongo.monotonic import time as _time -from test import IntegrationTest, client_context, db_user, db_pwd, SkipTest +from pymongo.read_concern import ReadConcern +from pymongo.write_concern import WriteConcern +from test import IntegrationTest, client_context, db_user, db_pwd, unittest, SkipTest from test.utils import ignore_deprecations, rs_or_single_client, EventListener @@ -623,6 +625,239 @@ class TestSession(IntegrationTest): lambda cursor: cursor.__del__()) +class TestCausalConsistency(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.listener = SessionTestListener() + cls.client = rs_or_single_client(event_listeners=[cls.listener]) + + @client_context.require_sessions + def setUp(self): + super(TestCausalConsistency, self).setUp() + + @client_context.require_no_standalone + def test_core(self): + with self.client.start_session() as sess: + self.assertIsNone(sess.cluster_time) + self.assertIsNone(sess.operation_time) + self.listener.results.clear() + self.client.pymongo_test.test.find_one(session=sess) + started = self.listener.results['started'][0] + cmd = started.command + self.assertIsNone(cmd.get('readConcern')) + op_time = sess.operation_time + self.assertIsNotNone(op_time) + succeeded = self.listener.results['succeeded'][0] + reply = succeeded.reply + self.assertEqual(op_time, reply.get('operationTime')) + + # No explicit session + self.client.pymongo_test.test.insert_one({}) + self.assertEqual(sess.operation_time, op_time) + self.listener.results.clear() + try: + self.client.pymongo_test.command('doesntexist', session=sess) + except: + pass + # operationTime was updated from a failed command + self.assertNotEqual(op_time, sess.operation_time) + failed = self.listener.results['failed'][0] + self.assertEqual( + sess.operation_time, failed.failure.get('operationTime')) + + with self.client.start_session() as sess2: + self.assertIsNone(sess2.cluster_time) + self.assertIsNone(sess2.operation_time) + self.assertRaises(TypeError, sess2.advance_cluster_time, 1) + self.assertRaises(ValueError, sess2.advance_cluster_time, {}) + self.assertRaises(TypeError, sess2.advance_operation_time, 1) + # No error + sess2.advance_cluster_time(sess.cluster_time) + sess2.advance_operation_time(sess.operation_time) + self.assertEqual(sess.cluster_time, sess2.cluster_time) + self.assertEqual(sess.operation_time, sess2.operation_time) + + def _test_reads(self, op): + coll = self.client.pymongo_test.test + with self.client.start_session() as sess: + coll.find_one({}, session=sess) + operation_time = sess.operation_time + self.assertIsNotNone(operation_time) + self.listener.results.clear() + op(coll, sess) + act = self.listener.results['started'][0].command.get( + 'readConcern', {}).get('afterClusterTime') + self.assertEqual(operation_time, act) + + @client_context.require_no_standalone + def test_reads(self): + self._test_reads( + lambda coll, session: list( + coll.database.list_collections(session=session))) + self._test_reads( + lambda coll, session: coll.database.list_collection_names( + session=session)) + self._test_reads( + lambda coll, session: coll.database.command( + 'ismaster', session=session)) + self._test_reads( + lambda coll, session: list(coll.aggregate([], session=session))) + # PYTHON-1398 + #self._test_reads( + # lambda coll, session: list( + # coll.aggregate_raw_batches([], session=session))) + self._test_reads( + lambda coll, session: list(coll.find({}, session=session))) + # PYTHON-1398 + #self._test_reads( + # lambda coll, session: list( + # coll.find_raw_batches({}, session=session))) + self._test_reads( + lambda coll, session: coll.find_one({}, session=session)) + self._test_reads( + lambda coll, session: coll.count(session=session)) + self._test_reads( + lambda coll, session: list(coll.list_indexes(session=session))) + self._test_reads( + lambda coll, session: coll.index_information(session=session)) + self._test_reads( + lambda coll, session: coll.options(session=session)) + self._test_reads( + lambda coll, session: coll.distinct('foo', session=session)) + self._test_reads( + lambda coll, session: coll.map_reduce( + 'function() {}', 'function() {}', 'output', session=session)) + self._test_reads( + lambda coll, session: coll.inline_map_reduce( + 'function() {}', 'function() {}', session=session)) + if not client_context.is_mongos: + self._test_reads( + lambda coll, session: list( + coll.parallel_scan(1, session=session))) + + def _test_writes(self, op): + coll = self.client.pymongo_test.test + with self.client.start_session() as sess: + op(coll, sess) + operation_time = sess.operation_time + self.assertIsNotNone(operation_time) + self.listener.results.clear() + coll.find_one({}, session=sess) + act = self.listener.results['started'][0].command.get( + 'readConcern', {}).get('afterClusterTime') + self.assertEqual(operation_time, act) + + @client_context.require_no_standalone + def test_writes(self): + self._test_writes( + lambda coll, session: coll.bulk_write( + [InsertOne({})], session=session)) + self._test_writes( + lambda coll, session: coll.insert_one({}, session=session)) + self._test_writes( + lambda coll, session: coll.insert_many([{}], session=session)) + self._test_writes( + lambda coll, session: coll.replace_one( + {'_id': 1}, {'x': 1}, session=session)) + self._test_writes( + lambda coll, session: coll.update_one( + {}, {'$set': {'X': 1}}, session=session)) + self._test_writes( + lambda coll, session: coll.update_many( + {}, {'$set': {'x': 1}}, session=session)) + self._test_writes( + lambda coll, session: coll.delete_one({}, session=session)) + self._test_writes( + lambda coll, session: coll.delete_many({}, session=session)) + self._test_writes( + lambda coll, session: coll.find_one_and_replace( + {'x': 1}, {'y': 1}, session=session)) + self._test_writes( + lambda coll, session: coll.find_one_and_update( + {'y': 1}, {'$set': {'x': 1}}, session=session)) + self._test_writes( + lambda coll, session: coll.find_one_and_delete( + {'x': 1}, session=session)) + self._test_writes( + lambda coll, session: coll.create_index("foo", session=session)) + self._test_writes( + lambda coll, session: coll.create_indexes( + [IndexModel([("bar", ASCENDING)])], session=session)) + self._test_writes( + lambda coll, session: coll.drop_index("foo_1", session=session)) + self._test_writes( + lambda coll, session: coll.drop_indexes(session=session)) + self._test_writes( + lambda coll, session: coll.reindex(session=session)) + + def test_session_not_causal(self): + with self.client.start_session(causal_consistency=False) as s: + self.client.pymongo_test.test.insert_one({}, session=s) + self.listener.results.clear() + self.client.pymongo_test.test.find_one({}, session=s) + act = self.listener.results['started'][0].command.get( + 'readConcern', {}).get('afterClusterTime') + self.assertIsNone(act) + + @client_context.require_standalone + def test_server_not_causal(self): + with self.client.start_session(causal_consistency=True) as s: + self.client.pymongo_test.test.insert_one({}, session=s) + self.listener.results.clear() + self.client.pymongo_test.test.find_one({}, session=s) + act = self.listener.results['started'][0].command.get( + 'readConcern', {}).get('afterClusterTime') + self.assertIsNone(act) + + @client_context.require_no_standalone + def test_read_concern(self): + with self.client.start_session(causal_consistency=True) as s: + coll = self.client.pymongo_test.test + coll.insert_one({}, session=s) + self.listener.results.clear() + coll.find_one({}, session=s) + read_concern = self.listener.results['started'][0].command.get( + 'readConcern') + self.assertIsNotNone(read_concern) + self.assertIsNone(read_concern.get('level')) + self.assertIsNotNone(read_concern.get('afterClusterTime')) + + coll = coll.with_options(read_concern=ReadConcern("majority")) + self.listener.results.clear() + coll.find_one({}, session=s) + read_concern = self.listener.results['started'][0].command.get( + 'readConcern') + self.assertIsNotNone(read_concern) + self.assertEqual(read_concern.get('level'), 'majority') + self.assertIsNotNone(read_concern.get('afterClusterTime')) + + def test_unacknowledged(self): + with self.client.start_session(causal_consistency=True) as s: + coll = self.client.pymongo_test.get_collection( + 'test', write_concern=WriteConcern(w=0)) + coll.insert_one({}, session=s) + self.assertIsNone(s.operation_time) + + @client_context.require_no_standalone + def test_cluster_time_with_server_support(self): + self.client.pymongo_test.test.insert_one({}) + self.listener.results.clear() + self.client.pymongo_test.test.find_one({}) + after_cluster_time = self.listener.results['started'][0].command.get( + '$clusterTime') + self.assertIsNotNone(after_cluster_time) + + @client_context.require_standalone + def test_cluster_time_no_server_support(self): + self.client.pymongo_test.test.insert_one({}) + self.listener.results.clear() + self.client.pymongo_test.test.find_one({}) + after_cluster_time = self.listener.results['started'][0].command.get( + '$clusterTime') + self.assertIsNone(after_cluster_time) + + class TestSessionsMultiAuth(IntegrationTest): @client_context.require_auth @client_context.require_sessions