diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 36b1da460..b76f7c05b 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -95,7 +95,7 @@ from pymongo.errors import (ConfigurationError, InvalidOperation, OperationFailure) from pymongo.read_concern import ReadConcern -from pymongo.read_preferences import ReadPreference +from pymongo.read_preferences import ReadPreference, _ServerMode from pymongo.write_concern import WriteConcern @@ -159,9 +159,11 @@ class TransactionOptions(object): .. versionadded:: 3.7 """ - def __init__(self, read_concern=None, write_concern=None): + def __init__(self, read_concern=None, write_concern=None, + read_preference=None): self._read_concern = read_concern self._write_concern = write_concern + self._read_preference = read_preference if read_concern is not None: if not isinstance(read_concern, ReadConcern): raise TypeError("read_concern must be an instance of " @@ -176,6 +178,11 @@ class TransactionOptions(object): raise ConfigurationError( "transactions must use an acknowledged write concern, " "not: %r" % (write_concern,)) + if read_preference is not None: + if not isinstance(read_preference, _ServerMode): + raise TypeError("%r is not valid for read_preference. See " + "pymongo.read_preferences for valid " + "options." % (read_preference,)) @property def read_concern(self): @@ -187,6 +194,11 @@ class TransactionOptions(object): """This transaction's :class:`~write_concern.WriteConcern`.""" return self._write_concern + @property + def read_preference(self): + """This transaction's :class:`~read_preference.ReadPreference`.""" + return self._read_preference + class _TransactionContext(object): """Internal transaction context manager for start_transaction.""" @@ -294,7 +306,8 @@ class ClientSession(object): return val return getattr(self.client, name) - def start_transaction(self, read_concern=None, write_concern=None): + def start_transaction(self, read_concern=None, write_concern=None, + read_preference=None): """Start a multi-statement transaction. Takes the same arguments as :class:`TransactionOptions`. @@ -308,9 +321,11 @@ class ClientSession(object): read_concern = self._inherit_option("read_concern", read_concern) write_concern = self._inherit_option("write_concern", write_concern) + read_preference = self._inherit_option( + "read_preference", read_preference) self._transaction = _Transaction(TransactionOptions( - read_concern=read_concern, write_concern=write_concern)) + read_concern, write_concern, read_preference)) self._server_session._transaction_id += 1 return _TransactionContext(self) @@ -342,13 +357,16 @@ class ClientSession(object): # Not really started. return - # TODO: retryable. And it's weird to pass parse_write_concern_error - # from outside database.py. - self._client.admin.command( - command_name, - session=self, - write_concern=self._transaction.opts.write_concern, - parse_write_concern_error=True) + # TODO: commitTransaction should be a retryable write. + # Use _command directly because commit/abort are writes and must + # always go to the primary. + with self._client._socket_for_writes() as sock_info: + return self._client.admin._command( + sock_info, + command_name, + session=self, + write_concern=self._transaction.opts.write_concern, + parse_write_concern_error=True) finally: self._transaction = None @@ -415,6 +433,12 @@ class ClientSession(object): return True return False + def _txn_read_preference(self): + """Return read preference of this transaction or None.""" + if self._in_transaction_or_auto_start(): + return self._transaction.opts.read_preference + return None + def _apply_to(self, command, is_retryable, read_preference): self._check_ended() self._in_transaction_or_auto_start() diff --git a/pymongo/collection.py b/pymongo/collection.py index 13c4042b8..f83a172ff 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -14,7 +14,6 @@ """Collection level utilities for Mongo.""" -import collections import datetime import warnings @@ -184,11 +183,14 @@ class Collection(common.BaseObject): unicode_decode_error_handler='replace', document_class=dict) - def _socket_for_reads(self): - return self.__database.client._socket_for_reads(self.read_preference) + def _socket_for_reads(self, session): + return self.__database.client._socket_for_reads( + self._read_preference_for(session)) - def _socket_for_primary_reads(self): - return self.__database.client._socket_for_reads(ReadPreference.PRIMARY) + def _socket_for_primary_reads(self, session): + read_pref = ((session and session._txn_read_preference()) + or ReadPreference.PRIMARY) + return self.__database.client._socket_for_reads(read_pref) def _socket_for_writes(self): return self.__database.client._socket_for_writes() @@ -198,7 +200,6 @@ class Collection(common.BaseObject): codec_options=None, check=True, allowable_errors=None, read_concern=None, write_concern=None, - parse_write_concern_error=False, collation=None, session=None, retryable_write=False): @@ -217,8 +218,6 @@ class Collection(common.BaseObject): - `write_concern`: An instance of :class:`~pymongo.write_concern.WriteConcern`. This option is only valid for MongoDB 3.4 and above. - - `parse_write_concern_error` (optional): Whether to parse a - ``writeConcernError`` field in the command response. - `collation` (optional) - An instance of :class:`~pymongo.collation.Collation`. - `session` (optional): a @@ -232,13 +231,13 @@ class Collection(common.BaseObject): self.__database.name, command, slave_ok, - read_preference or self.read_preference, + read_preference or self._read_preference_for(session), codec_options or self.codec_options, check, allowable_errors, read_concern=read_concern, write_concern=write_concern, - parse_write_concern_error=parse_write_concern_error, + parse_write_concern_error=True, collation=collation, session=s, client=self.__database.client, @@ -256,7 +255,6 @@ class Collection(common.BaseObject): self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, write_concern=self.write_concern, - parse_write_concern_error=True, collation=collation, session=session) def __getattr__(self, name): @@ -1493,7 +1491,7 @@ class Collection(common.BaseObject): ('numCursors', num_cursors)]) cmd.update(kwargs) - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session) as (sock_info, slave_ok): result = self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, session=session) @@ -1509,7 +1507,7 @@ class Collection(common.BaseObject): def _count(self, cmd, collation=None, session=None): """Internal count helper.""" - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session) as (sock_info, slave_ok): res = self._command( sock_info, cmd, slave_ok, allowable_errors=["ns missing"], @@ -1628,7 +1626,6 @@ class Collection(common.BaseObject): sock_info, cmd, read_preference=ReadPreference.PRIMARY, codec_options=_UNICODE_REPLACE_CODEC_OPTIONS, write_concern=self.write_concern, - parse_write_concern_error=True, session=session) return names @@ -1660,7 +1657,6 @@ class Collection(common.BaseObject): sock_info, cmd, read_preference=ReadPreference.PRIMARY, codec_options=_UNICODE_REPLACE_CODEC_OPTIONS, write_concern=self.write_concern, - parse_write_concern_error=True, session=session) def create_index(self, keys, session=None, **kwargs): @@ -1880,7 +1876,6 @@ class Collection(common.BaseObject): read_preference=ReadPreference.PRIMARY, allowable_errors=["ns not found"], write_concern=self.write_concern, - parse_write_concern_error=True, session=session) def reindex(self, session=None, **kwargs): @@ -1914,7 +1909,7 @@ class Collection(common.BaseObject): with self._socket_for_writes() as sock_info: return self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, - parse_write_concern_error=True, session=session) + session=session) def list_indexes(self, session=None): """Get a cursor over the index documents for this collection. @@ -1940,13 +1935,15 @@ class Collection(common.BaseObject): codec_options = CodecOptions(SON) coll = self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY) - with self._socket_for_primary_reads() as (sock_info, slave_ok): + with self._socket_for_primary_reads(session) as (sock_info, slave_ok): cmd = SON([("listIndexes", self.__name), ("cursor", {})]) + read_pref = ((session and session._txn_read_preference()) + or ReadPreference.PRIMARY) if sock_info.max_wire_version > 2: with self.__database.client._tmp_session(session, False) as s: try: cursor = self._command(sock_info, cmd, slave_ok, - ReadPreference.PRIMARY, + read_pref, codec_options, session=s)["cursor"] except OperationFailure as exc: @@ -1962,7 +1959,7 @@ class Collection(common.BaseObject): res = message._first_batch( sock_info, self.__database.name, "system.indexes", {"ns": self.__full_name}, 0, slave_ok, codec_options, - ReadPreference.PRIMARY, cmd, + read_pref, cmd, self.database.client._event_listeners) cursor = res["cursor"] # Note that a collection can only have 64 indexes, so there @@ -2061,7 +2058,7 @@ class Collection(common.BaseObject): "batchSize", kwargs.pop("batchSize", None)) # If the server does not support the "cursor" option we # ignore useCursor and batchSize. - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session) as (sock_info, slave_ok): dollar_out = pipeline and '$out' in pipeline[-1] if use_cursor: if "cursor" not in kwargs: @@ -2092,9 +2089,9 @@ class Collection(common.BaseObject): self.__database.name, cmd, slave_ok, - self.read_preference, + self._read_preference_for(session), self.codec_options, - parse_write_concern_error=dollar_out, + parse_write_concern_error=True, read_concern=read_concern, collation=collation, session=session, @@ -2350,7 +2347,7 @@ class Collection(common.BaseObject): collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session=None) as (sock_info, slave_ok): return self._command(sock_info, cmd, slave_ok, collation=collation)["retval"] @@ -2451,7 +2448,7 @@ class Collection(common.BaseObject): kwargs["query"] = filter collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session) as (sock_info, slave_ok): return self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, collation=collation, session=session)["values"] @@ -2523,24 +2520,21 @@ class Collection(common.BaseObject): cmd.update(kwargs) inline = 'inline' in cmd['out'] - with self._socket_for_primary_reads() as (sock_info, slave_ok): + with self._socket_for_primary_reads(session) as (sock_info, slave_ok): if (sock_info.max_wire_version >= 5 and self.write_concern and not inline): cmd['writeConcern'] = self.write_concern.document cmd.update(kwargs) if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and inline): - # No need to parse 'writeConcernError' here, since the command - # is an inline map reduce. - response = self._command( - sock_info, cmd, slave_ok, ReadPreference.PRIMARY, - read_concern=self.read_concern, - collation=collation, session=session) + read_concern = self.read_concern else: - response = self._command( - sock_info, cmd, slave_ok, ReadPreference.PRIMARY, - parse_write_concern_error=not inline, - collation=collation, session=session) + read_concern = None + + response = self._command( + sock_info, cmd, slave_ok, self._read_preference_for(session), + read_concern=read_concern, + collation=collation, session=session) if full_response or not response.get('result'): return response @@ -2592,7 +2586,7 @@ class Collection(common.BaseObject): ("out", {"inline": 1})]) collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads() as (sock_info, slave_ok): + with self._socket_for_reads(session) as (sock_info, slave_ok): if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: res = self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index dff93af2d..e74de9e41 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -223,13 +223,14 @@ class CommandCursor(object): if self.__id: # Get More dbname, collname = self.__ns.split('.', 1) + read_pref = self.__collection._read_preference_for(self.session) self.__send_message( self._getmore_class(dbname, collname, self.__batch_size, self.__id, self.__collection.codec_options, - self.__collection.read_preference, + read_pref, self.__session, self.__collection.database.client, self.__max_await_time_ms)) diff --git a/pymongo/common.py b/pymongo/common.py index 78b30585e..374cb7213 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -674,6 +674,14 @@ class BaseObject(object): """ return self.__read_preference + def _read_preference_for(self, session): + """Read only access to the read preference of this instance or session. + """ + # Override this operation's read preference with the transaction's. + if session: + return session._txn_read_preference() or self.__read_preference + return self.__read_preference + @property def read_concern(self): """Read only access to the :class:`~pymongo.read_concern.ReadConcern` diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 3c40f7901..4660e9d8e 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -211,12 +211,11 @@ class Cursor(object): self.__retrieved = 0 self.__codec_options = collection.codec_options - self.__read_preference = collection.read_preference + # Read preference is set when the initial find is sent. + self.__read_preference = None self.__read_concern = collection.read_concern self.__query_flags = cursor_type - if self.__read_preference != ReadPreference.PRIMARY: - self.__query_flags |= _QUERY_OPTIONS["slave_okay"] if no_cursor_timeout: self.__query_flags |= _QUERY_OPTIONS["no_timeout"] if allow_partial_results: @@ -918,16 +917,9 @@ class Cursor(object): def duration(): return datetime.datetime.now() - start if operation: - kwargs = { - "read_preference": self.__read_preference, - "exhaust": self.__exhaust, - } - if self.__address is not None: - kwargs["address"] = self.__address - try: - response = client._send_message_with_response(operation, - **kwargs) + response = client._send_message_with_response( + operation, exhaust=self.__exhaust, address=self.__address) self.__address = response.address if self.__exhaust: # 'response' is an ExhaustResponse. @@ -1060,6 +1052,13 @@ class Cursor(object): def _unpack_response(self, response, cursor_id, codec_options): return response.unpack_response(cursor_id, codec_options) + def _read_preference(self): + if self.__read_preference is None: + # Save the read preference for getMore commands. + self.__read_preference = self.__collection._read_preference_for( + self.session) + return self.__read_preference + def _refresh(self): """Refreshes the cursor with more data from Mongo. @@ -1081,7 +1080,7 @@ class Cursor(object): self.__query_spec(), self.__projection, self.__codec_options, - self.__read_preference, + self._read_preference(), self.__limit, self.__batch_size, self.__read_concern, @@ -1106,7 +1105,7 @@ class Cursor(object): limit, self.__id, self.__codec_options, - self.__read_preference, + self._read_preference(), self.__session, self.__collection.database.client, self.__max_await_time_ms) diff --git a/pymongo/database.py b/pymongo/database.py index 57733b957..0505064ec 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -300,15 +300,6 @@ class Database(common.BaseObject): self, name, False, codec_options, read_preference, write_concern, read_concern) - def _collection_default_options(self, name, **kargs): - """Get a Collection instance with the default settings.""" - wc = (self.write_concern - if self.write_concern.acknowledged else WriteConcern()) - return self.get_collection( - name, codec_options=DEFAULT_CODEC_OPTIONS, - read_preference=ReadPreference.PRIMARY, - write_concern=wc) - def create_collection(self, name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, session=None, **kwargs): @@ -525,17 +516,20 @@ class Database(common.BaseObject): .. mongodoc:: commands """ - client = self.__client - with client._socket_for_reads(read_preference) as (sock_info, slave_ok): + read_preference = ((session and session._txn_read_preference()) + or read_preference) + with self.__client._socket_for_reads( + read_preference) as (sock_info, slave_ok): return self._command(sock_info, command, slave_ok, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) - def _list_collections(self, sock_info, slave_okay, session=None, **kwargs): + def _list_collections(self, sock_info, slave_okay, session, + read_preference, **kwargs): """Internal listCollections helper.""" coll = self.get_collection( - "$cmd", read_preference=ReadPreference.PRIMARY) + "$cmd", read_preference=read_preference) if sock_info.max_wire_version > 2: cmd = SON([("listCollections", 1), ("cursor", {})]) @@ -543,7 +537,9 @@ class Database(common.BaseObject): with self.__client._tmp_session( session, close=False) as tmp_session: cursor = self._command( - sock_info, cmd, slave_okay, session=tmp_session)["cursor"] + sock_info, cmd, slave_okay, + read_preference=read_preference, + session=tmp_session)["cursor"] return CommandCursor( coll, cursor, @@ -583,10 +579,13 @@ class Database(common.BaseObject): .. versionadded:: 3.6 """ + read_pref = ((session and session._txn_read_preference()) + or ReadPreference.PRIMARY) with self.__client._socket_for_reads( - ReadPreference.PRIMARY) as (sock_info, slave_okay): + read_pref) as (sock_info, slave_okay): return self._list_collections( - sock_info, slave_okay, session=session, **kwargs) + sock_info, slave_okay, session, read_preference=read_pref, + **kwargs) def list_collection_names(self, session=None): """Get a list of all the collection names in this database. @@ -648,10 +647,9 @@ class Database(common.BaseObject): self.__client._purge_index(self.__name, name) - with self.__client._socket_for_reads( - ReadPreference.PRIMARY) as (sock_info, slave_ok): + with self.__client._socket_for_writes() as sock_info: return self._command( - sock_info, 'drop', slave_ok, _unicode(name), + sock_info, 'drop', value=_unicode(name), allowable_errors=['ns not found'], write_concern=self.write_concern, parse_write_concern_error=True, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index aa7231c78..5a536fd91 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -973,7 +973,7 @@ class MongoClient(common.BaseObject): @contextlib.contextmanager def _socket_for_reads(self, read_preference): - preference = read_preference or ReadPreference.PRIMARY + assert read_preference is not None, "read_preference must not be None" # Get a socket for a server matching the read preference, and yield # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to # mongods with topology type Single. If the server type is Mongos, @@ -983,13 +983,14 @@ class MongoClient(common.BaseObject): topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single server = topology.select_server(read_preference) + with self._get_socket(server) as sock_info: slave_ok = (single and not sock_info.is_mongos) or ( - preference != ReadPreference.PRIMARY) + read_preference != ReadPreference.PRIMARY) yield sock_info, slave_ok - def _send_message_with_response(self, operation, read_preference=None, - exhaust=False, address=None): + def _send_message_with_response(self, operation, exhaust=False, + address=None): """Send a message to MongoDB and return a Response. :Parameters: @@ -1011,16 +1012,14 @@ class MongoClient(common.BaseObject): raise AutoReconnect('server %s:%d no longer available' % address) else: - selector = read_preference or writable_server_selector - server = topology.select_server(selector) + server = topology.select_server(operation.read_preference) - # A _Query's slaveOk bit is already set for queries with non-primary - # read preference. If this is a direct connection to a mongod, override - # and *always* set the slaveOk bit. See bullet point 2 in - # server-selection.rst#topology-type-single. + # If this is a direct connection to a mongod, *always* set the slaveOk + # bit. See bullet point 2 in server-selection.rst#topology-type-single. set_slave_ok = ( topology.description.topology_type == TOPOLOGY_TYPE.Single - and server.description.server_type != SERVER_TYPE.Mongos) + and server.description.server_type != SERVER_TYPE.Mongos) or ( + operation.read_preference != ReadPreference.PRIMARY) return self._reset_on_error( server, @@ -1542,12 +1541,10 @@ class MongoClient(common.BaseObject): "of %s or a Database" % (string_type.__name__,)) self._purge_index(name) - with self._socket_for_reads( - ReadPreference.PRIMARY) as (sock_info, slave_ok): + with self._socket_for_writes() as sock_info: self[name]._command( sock_info, "dropDatabase", - slave_ok=slave_ok, read_preference=ReadPreference.PRIMARY, write_concern=self.write_concern, parse_write_concern_error=True, diff --git a/test/test_command_monitoring_spec.py b/test/test_command_monitoring_spec.py index b34ed3f26..3627be26b 100644 --- a/test/test_command_monitoring_spec.py +++ b/test/test_command_monitoring_spec.py @@ -25,11 +25,10 @@ import pymongo from bson import json_util from pymongo import monitoring from pymongo.errors import OperationFailure -from pymongo.read_preferences import (make_read_preference, - read_pref_mode_from_name) from pymongo.write_concern import WriteConcern from test import unittest, client_context from test.utils import single_client, wait_until, EventListener +from test.utils_selection_tests import parse_read_preference # Location of JSON test specifications. _TEST_PATH = os.path.join( @@ -82,10 +81,8 @@ def create_test(scenario_def, test): self.listener.results.clear() name = camel_to_snake(test['operation']['name']) if 'read_preference' in test['operation']: - mode = read_pref_mode_from_name( - test['operation']['read_preference']['mode']) - coll = coll.with_options( - read_preference=make_read_preference(mode, None)) + coll = coll.with_options(read_preference=parse_read_preference( + test['operation']['read_preference'])) test_args = test['operation']['arguments'] if 'writeConcern' in test_args: diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index d6616afad..f2e0e5c83 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -316,7 +316,8 @@ class ReadPrefTester(MongoClient): @contextlib.contextmanager def _socket_for_reads(self, read_preference): - context = super(ReadPrefTester, self)._socket_for_reads(read_preference) + context = super(ReadPrefTester, self)._socket_for_reads( + read_preference) with context as (sock_info, slave_ok): self.record_a_read(sock_info.address) yield sock_info, slave_ok @@ -412,18 +413,12 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): self._test_fn(server_type, func) def test_create_collection(self): - # Collections should be created on primary, obviously + # create_collection runs listCollections on the primary to check if + # the collection already exists. self._test_primary_helper( lambda: self.c.pymongo_test.create_collection( 'some_collection%s' % random.randint(0, MAXSIZE))) - def test_drop_collection(self): - self._test_primary_helper( - lambda: self.c.pymongo_test.drop_collection('some_collection')) - - self._test_primary_helper( - lambda: self.c.pymongo_test.some_collection.drop()) - def test_group(self): with warnings.catch_warnings(): warnings.simplefilter("ignore") diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index dd2e51abe..d3da31d9b 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -134,7 +134,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase): cursor = c.pymongo_test.test.find() self.assertEqual( - ReadPreference.PRIMARY, cursor._Cursor__read_preference) + ReadPreference.PRIMARY, cursor._read_preference()) tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}] secondary = Secondary(tag_sets=tag_sets) @@ -155,13 +155,13 @@ class TestReplicaSetClient(TestReplicaSetClientBase): cursor = c.pymongo_test.test.find() self.assertEqual( - secondary, cursor._Cursor__read_preference) + secondary, cursor._read_preference()) nearest = Nearest(tag_sets=[{'dc': 'ny'}, {}]) cursor = c.pymongo_test.get_collection( "test", read_preference=nearest).find() - self.assertEqual(nearest, cursor._Cursor__read_preference) + self.assertEqual(nearest, cursor._read_preference()) self.assertEqual(c.max_bson_size, 16777216) c.close() diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 815656c6a..1159f7497 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -27,7 +27,6 @@ from bson.son import SON from pymongo.errors import (ConnectionFailure, ServerSelectionTimeoutError) -from pymongo.monitoring import _SENSITIVE_COMMANDS from pymongo.mongo_client import MongoClient from pymongo.operations import (InsertOne, DeleteMany, @@ -38,7 +37,9 @@ from pymongo.operations import (InsertOne, from pymongo.write_concern import WriteConcern from test import unittest, client_context, IntegrationTest, SkipTest -from test.utils import rs_or_single_client, EventListener, DeprecationFilter +from test.utils import (rs_or_single_client, + DeprecationFilter, + OvertCommandListener) from test.test_crud import check_result, run_operation # Location of JSON test specifications. @@ -46,23 +47,6 @@ _TEST_PATH = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'retryable_writes') -class CommandListener(EventListener): - def started(self, event): - if event.command_name.lower() in _SENSITIVE_COMMANDS: - return - super(CommandListener, self).started(event) - - def succeeded(self, event): - if event.command_name.lower() in _SENSITIVE_COMMANDS: - return - super(CommandListener, self).succeeded(event) - - def failed(self, event): - if event.command_name.lower() in _SENSITIVE_COMMANDS: - return - super(CommandListener, self).failed(event) - - class TestAllScenarios(IntegrationTest): @classmethod @@ -240,7 +224,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest): @classmethod def setUpClass(cls): super(TestRetryableWrites, cls).setUpClass() - cls.listener = CommandListener() + cls.listener = OvertCommandListener() cls.client = rs_or_single_client( retryWrites=True, event_listeners=[cls.listener]) cls.db = cls.client.pymongo_test @@ -260,7 +244,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest): ('mode', 'off')])) def test_supported_single_statement_no_retry(self): - listener = CommandListener() + listener = OvertCommandListener() client = rs_or_single_client( retryWrites=False, event_listeners=[listener]) self.addCleanup(client.close) @@ -350,7 +334,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest): def test_server_selection_timeout_not_retried(self): """A ServerSelectionTimeoutError is not retried.""" - listener = CommandListener() + listener = OvertCommandListener() client = MongoClient( 'somedomainthatdoesntexist.org', serverSelectionTimeoutMS=1, @@ -370,7 +354,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest): """A ServerSelectionTimeoutError on the retry attempt raises the original error. """ - listener = CommandListener() + listener = OvertCommandListener() client = rs_or_single_client( retryWrites=True, event_listeners=[listener]) self.addCleanup(client.close) diff --git a/test/test_transactions.py b/test/test_transactions.py index 7361b8ec0..559e0e3b9 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -32,12 +32,12 @@ from pymongo.errors import (ConfigurationError, OperationFailure, PyMongoError) from pymongo.read_concern import ReadConcern -from pymongo.read_preferences import (make_read_preference, - read_pref_mode_from_name) +from pymongo.read_preferences import ReadPreference from pymongo.results import _WriteResult, BulkWriteResult from test import unittest, client_context, IntegrationTest -from test.utils import EventListener, rs_client +from test.utils import OvertCommandListener, rs_client +from test.utils_selection_tests import parse_read_preference # Location of JSON test specifications. _TEST_PATH = os.path.join( @@ -74,8 +74,10 @@ class TestTransactions(IntegrationTest): default_options = TransactionOptions() self.assertIsNone(default_options.read_concern) self.assertIsNone(default_options.write_concern) + self.assertIsNone(default_options.read_preference) TransactionOptions(read_concern=ReadConcern(), - write_concern=WriteConcern()) + write_concern=WriteConcern(), + read_preference=ReadPreference.PRIMARY) with self.assertRaisesRegex(TypeError, "read_concern must be "): TransactionOptions(read_concern={}) with self.assertRaisesRegex(TypeError, "write_concern must be "): @@ -84,6 +86,9 @@ class TestTransactions(IntegrationTest): ConfigurationError, "transactions must use an acknowledged write concern"): TransactionOptions(write_concern=WriteConcern(w=0)) + with self.assertRaisesRegex( + TypeError, "is not valid for read_preference"): + TransactionOptions(read_preference={}) # TODO: factor the following function with test_crud.py. def check_result(self, expected_result, result): @@ -138,8 +143,7 @@ class TestTransactions(IntegrationTest): arguments.update(arguments.pop("options", {})) pref = write_c = read_c = None if 'readPreference' in arguments: - pref = make_read_preference(read_pref_mode_from_name( - arguments.pop('readPreference')['mode']), tag_sets=None) + pref = parse_read_preference(arguments.pop('readPreference')) if 'writeConcern' in arguments: write_c = WriteConcern(**dict(arguments.pop('writeConcern'))) @@ -150,7 +154,8 @@ class TestTransactions(IntegrationTest): if name == 'start_transaction': cmd = partial(session.start_transaction, write_concern=write_c, - read_concern=read_c) + read_concern=read_c, + read_preference=pref) elif name in ('commit_transaction', 'abort_transaction'): cmd = getattr(session, name) else: @@ -195,7 +200,10 @@ class TestTransactions(IntegrationTest): if name == "aggregate": if arguments["pipeline"] and "$out" in arguments["pipeline"][-1]: - out = collection.database[arguments["pipeline"][-1]["$out"]] + # Read from the primary to ensure causal consistency. + out = collection.database.get_collection( + arguments["pipeline"][-1]["$out"], + read_preference=ReadPreference.PRIMARY) return out.find() if isinstance(result, Cursor) or isinstance(result, CommandCursor): @@ -279,7 +287,7 @@ def end_sessions(sessions): def create_test(scenario_def, test): def run_scenario(self): - listener = EventListener() + listener = OvertCommandListener() # New client, to avoid interference from pooled sessions. # Convert test['clientOptions'] to dict to avoid a Jython bug using "**" # with ScenarioDict. @@ -319,9 +327,16 @@ def create_test(scenario_def, test): else: write_concern = None + if 'readPreference' in txn_opts: + read_pref = parse_read_preference( + txn_opts['readPreference']) + else: + read_pref = None + txn_opts = client_session.TransactionOptions( read_concern=read_concern, - write_concern=write_concern + write_concern=write_concern, + read_preference=read_pref, ) opts['default_transaction_options'] = txn_opts @@ -339,7 +354,8 @@ def create_test(scenario_def, test): for op in test['operations']: expected_result = op.get('result') if expect_error_message(expected_result): - with self.assertRaises(PyMongoError) as context: + with self.assertRaises(PyMongoError, + msg=op.get('name')) as context: self.run_operation(sessions, collection, op.copy()) self.assertIn(expected_result['errorContains'].lower(), @@ -363,7 +379,10 @@ def create_test(scenario_def, test): # Assert final state is expected. expected_c = test['outcome'].get('collection') if expected_c is not None: - self.assertEqual(list(collection.find()), expected_c['data']) + # Read from the primary to ensure causal consistency. + primary_coll = collection.with_options( + read_preference=ReadPreference.PRIMARY) + self.assertEqual(list(primary_coll.find()), expected_c['data']) return run_scenario diff --git a/test/transactions/read-pref.json b/test/transactions/read-pref.json index ff1636ea2..b1db6bfbf 100644 --- a/test/transactions/read-pref.json +++ b/test/transactions/read-pref.json @@ -2,7 +2,7 @@ "data": [], "tests": [ { - "description": "primary", + "description": "default readPreference", "operations": [ { "name": "startTransaction", @@ -11,26 +11,42 @@ } }, { - "name": "insertOne", + "name": "insertMany", "arguments": { - "document": { - "_id": 1 - }, + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ], "session": "session0" }, "result": { - "insertedId": 1 + "insertedIds": { + "0": 1, + "1": 2, + "2": 3, + "3": 4 + } } }, { "name": "count", "arguments": { + "readPreference": { + "mode": "Secondary" + }, "filter": { "_id": 1 }, - "readPreference": { - "mode": "primary" - }, "session": "session0" }, "result": 1 @@ -38,17 +54,33 @@ { "name": "find", "arguments": { + "readPreference": { + "mode": "Secondary" + }, + "batchSize": 3, "session": "session0" }, "result": [ { "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 } ] }, { "name": "aggregate", "arguments": { + "readPreference": { + "mode": "Secondary" + }, "pipeline": [ { "$project": { @@ -62,6 +94,15 @@ "result": [ { "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 } ] }, @@ -77,18 +118,477 @@ "data": [ { "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 } ] } } }, { - "description": "write and read", + "description": "primary readPreference", "operations": [ { "name": "startTransaction", "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "Primary" + } + } + } + }, + { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ], "session": "session0" + }, + "result": { + "insertedIds": { + "0": 1, + "1": 2, + "2": 3, + "3": 4 + } + } + }, + { + "name": "count", + "arguments": { + "readPreference": { + "mode": "Secondary" + }, + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "find", + "arguments": { + "readPreference": { + "mode": "Secondary" + }, + "batchSize": 3, + "session": "session0" + }, + "result": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + { + "name": "aggregate", + "arguments": { + "readPreference": { + "mode": "Secondary" + }, + "pipeline": [ + { + "$project": { + "_id": 1 + } + } + ], + "batchSize": 3, + "session": "session0" + }, + "result": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + { + "name": "commitTransaction", + "arguments": { + "session": "session0" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + }, + { + "description": "secondary readPreference", + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "Secondary" + } + } + } + }, + { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ], + "session": "session0" + }, + "result": { + "insertedIds": { + "0": 1, + "1": 2, + "2": 3, + "3": 4 + } + } + }, + { + "name": "count", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "find", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "aggregate", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "pipeline": [ + { + "$project": { + "_id": 1 + } + } + ], + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "abortTransaction", + "arguments": { + "session": "session0" + } + } + ], + "outcome": { + "collection": { + "data": [] + } + } + }, + { + "description": "primaryPreferred readPreference", + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "PrimaryPreferred" + } + } + } + }, + { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ], + "session": "session0" + }, + "result": { + "insertedIds": { + "0": 1, + "1": 2, + "2": 3, + "3": 4 + } + } + }, + { + "name": "count", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "find", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "aggregate", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "pipeline": [ + { + "$project": { + "_id": 1 + } + } + ], + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "abortTransaction", + "arguments": { + "session": "session0" + } + } + ], + "outcome": { + "collection": { + "data": [] + } + } + }, + { + "description": "nearest readPreference", + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "Nearest" + } + } + } + }, + { + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ], + "session": "session0" + }, + "result": { + "insertedIds": { + "0": 1, + "1": 2, + "2": 3, + "3": 4 + } + } + }, + { + "name": "count", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "find", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "aggregate", + "arguments": { + "readPreference": { + "mode": "Primary" + }, + "pipeline": [ + { + "$project": { + "_id": 1 + } + } + ], + "batchSize": 3, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "abortTransaction", + "arguments": { + "session": "session0" + } + } + ], + "outcome": { + "collection": { + "data": [] + } + } + }, + { + "description": "secondary write only", + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "Secondary" + } + } } }, { @@ -103,49 +603,6 @@ "insertedId": 1 } }, - { - "name": "count", - "arguments": { - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "find", - "arguments": { - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "aggregate", - "arguments": { - "pipeline": [ - { - "$project": { - "_id": 1 - } - } - ], - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, { "name": "commitTransaction", "arguments": { @@ -162,146 +619,6 @@ ] } } - }, - { - "description": "non-primary readPreference", - "operations": [ - { - "name": "startTransaction", - "arguments": { - "session": "session0" - } - }, - { - "name": "count", - "arguments": { - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "find", - "arguments": { - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "aggregate", - "arguments": { - "pipeline": [ - { - "$project": { - "_id": 1 - } - } - ], - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "commitTransaction", - "arguments": { - "session": "session0" - } - } - ], - "outcome": { - "collection": { - "data": [] - } - } - }, - { - "description": "conflict", - "operations": [ - { - "name": "startTransaction", - "arguments": { - "session": "session0" - } - }, - { - "name": "count", - "arguments": { - "readPreference": { - "mode": "primary" - }, - "session": "session0" - }, - "result": 0 - }, - { - "name": "count", - "arguments": { - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "find", - "arguments": { - "session": "session0", - "readPreference": { - "mode": "secondary" - } - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "aggregate", - "arguments": { - "pipeline": [ - { - "$project": { - "_id": 1 - } - } - ], - "readPreference": { - "mode": "secondary" - }, - "session": "session0" - }, - "result": { - "errorContains": "read preference in a transaction must be primary" - } - }, - { - "name": "commitTransaction", - "arguments": { - "session": "session0" - } - } - ], - "outcome": { - "collection": { - "data": [] - } - } } ] } diff --git a/test/transactions/transaction-options.json b/test/transactions/transaction-options.json index 618c7b9ac..a50f35fdd 100644 --- a/test/transactions/transaction-options.json +++ b/test/transactions/transaction-options.json @@ -1258,6 +1258,314 @@ ] } } + }, + { + "description": "readPreference inherited from client", + "clientOptions": { + "readPreference": "secondary" + }, + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0" + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "count", + "arguments": { + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "commitTransaction", + "arguments": { + "session": "session0" + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + } + ] + } + } + }, + { + "description": "readPreference inherited from defaultTransactionOptions", + "clientOptions": { + "readPreference": "primary" + }, + "sessionOptions": { + "session0": { + "defaultTransactionOptions": { + "readPreference": { + "mode": "Secondary" + } + } + } + }, + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0" + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "count", + "arguments": { + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "commitTransaction", + "arguments": { + "session": "session0" + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + } + ] + } + } + }, + { + "description": "startTransaction overrides readPreference", + "clientOptions": { + "readPreference": "primary" + }, + "sessionOptions": { + "session0": { + "defaultTransactionOptions": { + "readPreference": { + "mode": "Primary" + } + } + } + }, + "operations": [ + { + "name": "startTransaction", + "arguments": { + "session": "session0", + "options": { + "readPreference": { + "mode": "Secondary" + } + } + } + }, + { + "name": "insertOne", + "arguments": { + "document": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "count", + "arguments": { + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "result": { + "errorContains": "read preference in a transaction must be primary" + } + }, + { + "name": "commitTransaction", + "arguments": { + "session": "session0" + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "readConcern": null, + "writeConcern": null + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + } + ] + } + } } ] } diff --git a/test/utils.py b/test/utils.py index 79582fc6d..aa13f2556 100644 --- a/test/utils.py +++ b/test/utils.py @@ -29,6 +29,7 @@ from functools import partial from pymongo import MongoClient, monitoring from pymongo.errors import AutoReconnect, OperationFailure +from pymongo.monitoring import _SENSITIVE_COMMANDS from pymongo.server_selectors import (any_server_selector, writable_server_selector) from pymongo.write_concern import WriteConcern @@ -74,6 +75,21 @@ class EventListener(monitoring.CommandListener): self.results['failed'].append(event) +class OvertCommandListener(EventListener): + """A CommandListener that ignores sensitive commands.""" + def started(self, event): + if event.command_name.lower() not in _SENSITIVE_COMMANDS: + super(OvertCommandListener, self).started(event) + + def succeeded(self, event): + if event.command_name.lower() not in _SENSITIVE_COMMANDS: + super(OvertCommandListener, self).succeeded(event) + + def failed(self, event): + if event.command_name.lower() not in _SENSITIVE_COMMANDS: + super(OvertCommandListener, self).failed(event) + + class ServerAndTopologyEventListener(monitoring.ServerListener, monitoring.TopologyListener): """Listens to all events.""" diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index 767655266..66d31b823 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -180,23 +180,14 @@ def create_test(scenario_def): else: # Make first letter lowercase to match read_pref's modes. pref_def = scenario_def['read_preference'] - mode_string = pref_def.get('mode', 'primary') - mode_string = mode_string[:1].lower() + mode_string[1:] - mode = read_preferences.read_pref_mode_from_name(mode_string) - max_staleness = pref_def.get('maxStalenessSeconds', -1) - tag_sets = pref_def.get('tag_sets') - if scenario_def.get('error'): with self.assertRaises((ConfigurationError, ValueError)): # Error can be raised when making Read Pref or selecting. - pref = read_preferences.make_read_preference( - mode, tag_sets=tag_sets, max_staleness=max_staleness) - + pref = parse_read_preference(pref_def) top_latency.select_server(pref) return - pref = read_preferences.make_read_preference( - mode, tag_sets=tag_sets, max_staleness=max_staleness) + pref = parse_read_preference(pref_def) # Select servers. if not scenario_def.get('suitable_servers'): @@ -281,3 +272,14 @@ def create_selection_tests(test_dir): setattr(TestAllScenarios, new_test.__name__, new_test) return TestAllScenarios + + +def parse_read_preference(pref): + # Make first letter lowercase to match read_pref's modes. + mode_string = pref.get('mode', 'primary') + mode_string = mode_string[:1].lower() + mode_string[1:] + mode = read_preferences.read_pref_mode_from_name(mode_string) + max_staleness = pref.get('maxStalenessSeconds', -1) + tag_sets = pref.get('tag_sets') + return read_preferences.make_read_preference( + mode, tag_sets=tag_sets, max_staleness=max_staleness)