diff --git a/doc/api/pymongo/cursor.rst b/doc/api/pymongo/cursor.rst index ea3aca50f..009c040d7 100644 --- a/doc/api/pymongo/cursor.rst +++ b/doc/api/pymongo/cursor.rst @@ -4,7 +4,7 @@ .. automodule:: pymongo.cursor :synopsis: Tools for iterating over MongoDB query results - .. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, exhaust=False, compile_re=True) + .. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, tag_sets=None, secondary_acceptable_latency_ms=None, exhaust=False, compile_re=True) :members: .. describe:: c[index] diff --git a/doc/examples/high_availability.rst b/doc/examples/high_availability.rst index 0218d20ee..80c546b7d 100644 --- a/doc/examples/high_availability.rst +++ b/doc/examples/high_availability.rst @@ -200,7 +200,7 @@ per-query basis, e.g.:: >>> db.collection.find_one(read_preference=ReadPreference.PRIMARY) Reads are configured using three options: **read_preference**, **tag_sets**, -and **acceptableLatencyMS**. +and *latency_threshold_ms**. **read_preference**: @@ -210,18 +210,18 @@ and **acceptableLatencyMS**. - ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is none, read from a secondary matching your choice of ``tag_sets`` and - ``acceptableLatencyMS``. + ``latency_threshold_ms``. - ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and - ``acceptableLatencyMS``. If no matching secondary is available, + ``latency_threshold_ms``. If no matching secondary is available, raise :class:`~pymongo.errors.AutoReconnect`. - ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of - ``tag_sets`` and ``acceptableLatencyMS`` if available, otherwise + ``tag_sets`` and ``latency_threshold_ms`` if available, otherwise from primary (regardless of the primary's tags and latency). - ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and - ``acceptableLatencyMS``. + ``latency_threshold_ms``. **tag_sets**: @@ -251,18 +251,18 @@ from any member that matches the mode, ignoring tags." See :mod:`~pymongo.read_preferences` for more information. -**acceptableLatencyMS**: +**latency_threshold_ms**: If multiple members match the mode and tag sets, MongoReplicaSetClient reads from among the nearest members, chosen according to ping time. By default, only members whose ping times are within 15 milliseconds of the nearest are used for queries. You can choose to distribute reads among members with -higher latencies by setting ``acceptableLatencyMS`` to a larger +higher latencies by setting ``latency_threshold_ms`` to a larger number. In that case, MongoReplicaSetClient distributes reads among matching -members within ``acceptableLatencyMS`` of the closest member's +members within ``latency_threshold_ms`` of the closest member's ping time. -.. note:: ``acceptableLatencyMS`` is ignored when talking to a +.. note:: ``latency_threshold_ms`` is ignored when talking to a replica set *through* a mongos. The equivalent is the localThreshold_ command line option. diff --git a/gridfs/__init__.py b/gridfs/__init__.py index 3eac49c79..9da271072 100644 --- a/gridfs/__init__.py +++ b/gridfs/__init__.py @@ -324,6 +324,8 @@ class GridFS(object): examined when performing the query - `read_preference` (optional): The read preference for this query. + - `tag_sets` **DEPRECATED** + - `secondary_acceptable_latency_ms` **DEPRECATED** - `compile_re` (optional): if ``False``, don't attempt to compile BSON regex objects into Python regexes. Return instances of :class:`~bson.regex.Regex` instead. diff --git a/gridfs/grid_file.py b/gridfs/grid_file.py index 42a320d60..fe8b9ff61 100644 --- a/gridfs/grid_file.py +++ b/gridfs/grid_file.py @@ -628,7 +628,8 @@ class GridOutCursor(Cursor): """ def __init__(self, collection, spec=None, skip=0, limit=0, timeout=True, sort=None, max_scan=None, - read_preference=None, compile_re=True): + read_preference=None, tag_sets=None, + secondary_acceptable_latency_ms=None, compile_re=True): """Create a new cursor, similar to the normal :class:`~pymongo.cursor.Cursor`. @@ -648,7 +649,8 @@ class GridOutCursor(Cursor): super(GridOutCursor, self).__init__( collection.files, spec, skip=skip, limit=limit, timeout=timeout, sort=sort, max_scan=max_scan, read_preference=read_preference, - compile_re=compile_re) + secondary_acceptable_latency_ms=secondary_acceptable_latency_ms, + tag_sets=tag_sets, compile_re=compile_re) def next(self): """Get next GridOut object from cursor. diff --git a/pymongo/collection.py b/pymongo/collection.py index 21065278a..533b4418b 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -823,6 +823,8 @@ class Collection(common.BaseObject): outgoing SON manipulators before returning. - `read_preference` (optional): The read preference for this query. + - `tag_sets` **DEPRECATED** + - `secondary_acceptable_latency_ms` **DEPRECATED** - `compile_re` (optional): if ``False``, don't attempt to compile BSON regex objects into Python regexes. Return instances of :class:`~bson.regex.Regex` instead. @@ -857,7 +859,8 @@ class Collection(common.BaseObject): version **>= 1.5.1** .. versionchanged:: 3.0 - Removed the `network_timeout`, `tag_sets`, and + Removed the `network_timeout` parameter. + Deprecated the `tag_sets`, and `secondary_acceptable_latency_ms` parameters. .. versionadded:: 2.7 diff --git a/pymongo/common.py b/pymongo/common.py index ac692d25e..585d7b910 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -14,11 +14,16 @@ """Functions and classes common to multiple pymongo modules.""" + import sys -from pymongo import read_preferences +import warnings from pymongo.auth import MECHANISMS from pymongo.errors import ConfigurationError +from pymongo.read_preferences import (make_read_preference, + read_pref_mode_from_name, + ReadPreference, + ServerMode) from pymongo.write_concern import WriteConcern from bson.binary import (OLD_UUID_SUBTYPE, UUID_SUBTYPE, JAVA_LEGACY, CSHARP_LEGACY) @@ -194,7 +199,7 @@ def validate_timeout_or_none(option, value): def validate_read_preference(dummy, value): """Validate a read preference. """ - if not isinstance(value, read_preferences.ServerMode): + if not isinstance(value, ServerMode): raise ConfigurationError("%r is not a " "valid read preference." % (value,)) return value @@ -204,7 +209,7 @@ def validate_read_preference_mode(dummy, name): """Validate read preference mode for a MongoReplicaSetClient. """ try: - return read_preferences.read_pref_mode_from_name(name) + return read_pref_mode_from_name(name) except ValueError: raise ConfigurationError("Not a valid read preference") @@ -278,7 +283,8 @@ VALIDATORS = { 'read_preference': validate_read_preference, 'readpreference': validate_read_preference_mode, 'readpreferencetags': validate_read_preference_tags, - 'acceptablelatencyms': validate_positive_float, + 'latencythresholdms': validate_positive_float, + 'secondaryacceptablelatencyms': validate_positive_float, 'auto_start_request': validate_boolean, 'use_greenlets': validate_boolean, 'authmechanism': validate_auth_mechanism, @@ -329,25 +335,24 @@ class BaseObject(object): def __init__(self, **options): - self.__read_pref = read_preferences.ReadPreference.PRIMARY + self.__read_pref = None self.__uuid_subtype = OLD_UUID_SUBTYPE self.__write_concern = None self.__set_options(options) + if 'read_preference' not in options: + mode = options.get('readpreference', 0) + latency = options.get('secondaryacceptablelatencyms', + options.get('latencythresholdms', 15)) + tags = options.get('readpreferencetags') + self.__read_pref = make_read_preference(mode, latency, tags) + def __set_options(self, options): """Validates and sets all options passed to this object.""" wc_opts = {} for option, value in iteritems(options): if option == 'read_preference': self.__read_pref = validate_read_preference(option, value) - elif option == 'readpreference': - klass = read_preferences.read_pref_class_from_mode(value) - if value == 0: - # Primary, no tags - self.__read_pref = klass() - continue - tags = options.get('readpreferencetags', None) - self.__read_pref = klass(tags) elif option == 'uuidrepresentation': self.__uuid_subtype = validate_uuid_subtype(option, value) elif option in WRITE_CONCERN_OPTIONS: @@ -436,6 +441,30 @@ class BaseObject(object): read_preference = property(__get_read_pref, __set_read_pref) + def __get_latency(self): + return self.__read_pref.latency_threshold_ms + + def __set_latency(self, latency): + warnings.warn("The secondary_acceptable_latency_ms attribute is " + "deprecated", DeprecationWarning, stacklevel=2) + mode = self.__read_pref.mode + tag_sets = self.__read_pref.tag_sets + self.__read_pref = make_read_preference(mode, latency, tag_sets) + + secondary_acceptable_latency_ms = property(__get_latency, __set_latency) + + def __get_tags(self): + return self.__read_pref.tag_sets + + def __set_tags(self, tag_sets): + warnings.warn("The tag_sets attribute is deprecated", + DeprecationWarning, stacklevel=2) + mode = self.__read_pref.mode + latency = self.__read_pref.latency_threshold_ms + self.__read_pref = make_read_preference(mode, latency, tag_sets) + + tag_sets = property(__get_tags, __set_tags) + def __get_uuid_subtype(self): """This attribute specifies which BSON Binary subtype is used when storing UUIDs. Historically UUIDs have been stored as BSON Binary diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 093bbf546..15e016e79 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -14,6 +14,8 @@ """Cursor class to iterate over Mongo query results.""" import copy +import warnings + from collections import deque from bson import RE_TYPE @@ -71,8 +73,9 @@ class Cursor(object): timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, - read_preference=None, exhaust=False, compile_re=True, - _uuid_subtype=None): + read_preference=None, tag_sets=None, + secondary_acceptable_latency_ms=None, + exhaust=False, compile_re=True, _uuid_subtype=None): """Create a new cursor. Should not be called directly by application developers - see @@ -148,7 +151,6 @@ class Cursor(object): self.__comment = None self.__as_class = as_class self.__manipulate = manipulate - self.__read_preference = read_preference or collection.read_preference self.__tz_aware = collection.database.connection.tz_aware self.__compile_re = compile_re self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype @@ -158,10 +160,21 @@ class Cursor(object): self.__retrieved = 0 self.__killed = False + self.__read_preference = read_preference or collection.read_preference + if secondary_acceptable_latency_ms or tag_sets: + warnings.warn("The secondary_acceptable_latency_ms " + "and tag_sets options are deprecated", + DeprecationWarning, stacklevel=3) + mode = self.__read_preference.mode + tags = tag_sets or self.__read_preference.tag_sets + latency = (secondary_acceptable_latency_ms or + self.__read_preference.latency_threshold_ms) + self.__read_preference = make_read_preference(mode, latency, tags) + self.__query_flags = 0 if tailable: self.__query_flags |= _QUERY_OPTIONS["tailable_cursor"] - if self.__read_preference != ReadPreference.PRIMARY: + if self.__read_preference.mode != ReadPreference.PRIMARY.mode: self.__query_flags |= _QUERY_OPTIONS["slave_okay"] if not timeout: self.__query_flags |= _QUERY_OPTIONS["no_timeout"] @@ -300,7 +313,7 @@ class Cursor(object): # PRIMARY to avoid problems with mongos versions that # don't support read preferences. if (self.__collection.database.connection.is_mongos and - self.__read_preference != ReadPreference.PRIMARY): + self.__read_preference.mode != ReadPreference.PRIMARY.mode): # For maximum backwards compatibility, don't set $readPreference # for SECONDARY_PREFERRED unless tags are in use. Just rely on diff --git a/pymongo/database.py b/pymongo/database.py index 05998179c..11d7e6c1d 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -295,25 +295,36 @@ class Database(common.BaseObject): fields = helpers._fields_list_to_dict(fields) command.update(kwargs) - orig = mode = read_preference or self.read_preference + orig = pref = read_preference or self.read_preference + latency = kwargs.pop('secondary_acceptable_latency_ms', None) + tags = kwargs.pop('tags_sets', None) + if latency or tags: + warnings.warn("The secondary_acceptable_latency_ms " + "and tag_sets options are deprecated", + DeprecationWarning, stacklevel=3) + mode = orig.mode + tags = tags or orig.tag_sets + latency = latency or orig.latency_threshold_ms + orig = make_read_preference(mode, latency, tags) + if command_name not in SECONDARY_OK_COMMANDS: - mode = ReadPreference.PRIMARY + pref = ReadPreference.PRIMARY # Special-case: mapreduce can go to secondaries only if inline elif command_name == 'mapreduce': out = command.get('out') if not isinstance(out, dict) or not out.get('inline'): - mode = ReadPreference.PRIMARY + pref = ReadPreference.PRIMARY # Special-case: aggregate with $out cannot go to secondaries. elif command_name == 'aggregate': for stage in command.get('pipeline', []): if '$out' in stage: - mode = ReadPreference.PRIMARY + pref = ReadPreference.PRIMARY break # Warn if mode will override read_preference. - if mode != orig: + if pref.mode != orig.mode: warnings.warn("%s does not support %s read preference " "and will be routed to the primary instead." % (command_name, orig.name), UserWarning, stacklevel=3) @@ -323,7 +334,7 @@ class Database(common.BaseObject): fields=fields, limit=-1, as_class=as_class, - read_preference=mode, + read_preference=pref, compile_re=compile_re, _uuid_subtype=uuid_subtype) for doc in cursor: @@ -391,11 +402,13 @@ class Database(common.BaseObject): Python-incompatible regular expressions, for example from ``currentOp`` - `read_preference`: The read preference for this operation. + - `tag_sets` **DEPRECATED** + - `secondary_acceptable_latency_ms` **DEPRECATED** - `**kwargs` (optional): additional keyword arguments will be added to the command document before it is sent .. versionchanged:: 3.0 - Removed the `tag_sets` and `secondary_acceptable_latency_ms` + Deprecated the `tag_sets` and `secondary_acceptable_latency_ms` options. .. versionchanged:: 2.7 Added ``compile_re`` option. diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index ffa496011..f33d333ba 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -203,10 +203,6 @@ class MongoClient(common.BaseObject): :class:`~pymongo.errors.AutoReconnect` "not master". See :class:`~pymongo.read_preferences.ReadPreference` for all available read preference options. - - `acceptableLatencyMS`: (integer) When used with mongos - high availability, any mongos whose ping time is within - acceptable_latency_ms of the nearest member may be chosen - as the new primary during a failover. Default 15 milliseconds. | **SSL configuration:** @@ -299,7 +295,6 @@ class MongoClient(common.BaseObject): self.__repl = options.get('replicaset') self.__direct = len(seeds) == 1 and not self.__repl - self.__acceptable_latency = options.get('acceptablelatencyms', 15) self.__net_timeout = options.get('sockettimeoutms') self.__conn_timeout = options.get('connecttimeoutms') self.__wait_queue_timeout = options.get('waitqueuetimeoutms') @@ -679,22 +674,6 @@ class MongoClient(common.BaseObject): return self.__member_property( 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE) - @property - def acceptable_latency_ms(self): - """When used with mongos high availability, any mongos whose ping time - is within acceptable_latency_ms of the nearest mongos may be - chosen as the new primary during a failover. Default 15 milliseconds. - - .. versionadded:: 2.3 - - .. note:: ``acceptable_latency_ms`` is ignored when talking - to a replica set *through* a mongos. The equivalent is the - localThreshold_ command line option. - - .. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption--localThreshold - """ - return self.__acceptable_latency - def __simple_command(self, sock_info, dbname, spec): """Send a command to the server. """ @@ -773,7 +752,7 @@ class MongoClient(common.BaseObject): Doesn't modify state. """ - latency = self.__acceptable_latency + latency = self.read_preference.latency_threshold_ms # Only used for mongos high availability, ping_time is in seconds. fastest = min([ member.ping_time for member in candidates]) diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 96a1774ff..5ec00509f 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -553,9 +553,6 @@ class MongoReplicaSetClient(common.BaseObject): - `read_preference`: The read preference for this client. See :class:`~pymongo.read_preferences.ReadPreference` for available options. - - `acceptableLatencyMS`: (integer) Any replica-set member - whose ping time is within acceptable_latency_ms of the - nearest member may accept reads. Default 15 milliseconds. | **SSL configuration:** @@ -646,7 +643,6 @@ class MongoReplicaSetClient(common.BaseObject): raise ConfigurationError("the replicaSet " "keyword parameter is required.") - self.__acceptable_latency = self.__opts.get('acceptablelatencyms', 15) self.__net_timeout = self.__opts.get('sockettimeoutms') self.__conn_timeout = self.__opts.get('connecttimeoutms') self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms') @@ -1006,18 +1002,6 @@ class MongoReplicaSetClient(common.BaseObject): return rs_state.primary_member.max_write_batch_size return common.MAX_WRITE_BATCH_SIZE - @property - def acceptable_latency_ms(self): - """Any replica-set member whose ping time is within - acceptable_latency_ms of the nearest member may accept - reads. Defaults to 15 milliseconds. - - See :class:`~pymongo.read_preferences.ReadPreference`. - - .. versionadded:: 2.3 - """ - return self.__acceptable_latency - @property def auto_start_request(self): """Is auto_start_request enabled? @@ -1662,7 +1646,7 @@ class MongoReplicaSetClient(common.BaseObject): members=members, mode=pref.mode, tag_sets=pref.tag_sets, - latency=self.__acceptable_latency) + latency=pref.latency_threshold_ms) if not member: # Ran out of members to try diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index dee176157..503ed2a9a 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -63,14 +63,20 @@ class ServerMode(object): """Base class for all read preferences. """ - __slots__ = ("__mode", "__mongos_mode", "__tag_sets") + __slots__ = ("__mongos_mode", "__mode", "__latency", "__tag_sets") - def __init__(self, mode, tag_sets=None): + def __init__(self, mode, latency_threshold_ms=15, tag_sets=None): if mode == _PRIMARY and tag_sets is not None: - raise ConfigurationError("PRIMARY cannot be combined with tags") - self.__mode = mode + raise ConfigurationError("Read preference primary " + "cannot be combined with tags") self.__mongos_mode = _MONGOS_MODES[mode] + self.__mode = mode self.__tag_sets = _validate_tag_sets(tag_sets) + try: + self.__latency = float(latency_threshold_ms) + except (ValueError, TypeError): + raise ConfigurationError("latency_threshold_ms must " + "be a positive integer or float") @property def name(self): @@ -92,6 +98,17 @@ class ServerMode(object): """ return self.__mode + @property + def latency_threshold_ms(self): + """int - Any replica-set member whose ping time is within + `~latency_threshold_ms` of the nearest member may accept reads. + When used with mongos high availability, any mongos whose ping + time is within `~latency_threshold_ms` of the nearest mongos + may be chosen as the new mongos during a failover. Default 15 + milliseconds. + """ + return self.__latency + @property def tag_sets(self): """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to @@ -108,10 +125,13 @@ class ServerMode(object): return self.__tag_sets or [{}] def __repr__(self): - return "%s(tag_sets=%r)" % (self.name, self.__tag_sets) + return "%s(latency_threshold_ms=%d, tag_sets=%r)" % ( + self.name, self.__latency, self.__tag_sets) def __eq__(self, other): - return self.mode == other.mode and self.tag_sets == other.tag_sets + return (self.mode == other.mode and + self.latency_threshold_ms == other.latency_threshold_ms and + self.tag_sets == other.tag_sets) def __ne__(self, other): return not self == other @@ -125,19 +145,17 @@ class Primary(ServerMode): * When connected to a mongos queries are sent to the primary of a shard. * When connected to a replica set queries are sent to the primary of the replica set. + + :Parameters: + - `latency_threshold_ms`: Used for mongos high availability. The + :attr:`~latency_threshold_ms` when selecting a failover mongos. """ - def __init__(self): - super(Primary, self).__init__(_PRIMARY) + def __init__(self, latency_threshold_ms=15): + super(Primary, self).__init__(_PRIMARY, latency_threshold_ms) def __repr__(self): - return "Primary()" - - def __eq__(self, other): - return other.mode == _PRIMARY - - def __ne__(self, other): - return other.mode != _PRIMARY + return "Primary(latency_threshold_ms=%d)" % self.latency_threshold_ms class PrimaryPreferred(ServerMode): @@ -151,12 +169,15 @@ class PrimaryPreferred(ServerMode): available, otherwise a secondary. :Parameters: + - `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when + selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not available. """ - def __init__(self, tag_sets=None): - super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets) + def __init__(self, latency_threshold_ms=15, tag_sets=None): + super(PrimaryPreferred, self).__init__( + _PRIMARY_PREFERRED, latency_threshold_ms, tag_sets) class Secondary(ServerMode): @@ -170,11 +191,14 @@ class Secondary(ServerMode): secondaries. An error is raised if no secondaries are available. :Parameters: + - `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when + selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, tag_sets=None): - super(Secondary, self).__init__(_SECONDARY, tag_sets) + def __init__(self, latency_threshold_ms=15, tag_sets=None): + super(Secondary, self).__init__( + _SECONDARY, latency_threshold_ms, tag_sets) class SecondaryPreferred(ServerMode): @@ -188,11 +212,14 @@ class SecondaryPreferred(ServerMode): secondaries, or the primary if no secondary is available. :Parameters: + - `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when + selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, tag_sets=None): - super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets) + def __init__(self, latency_threshold_ms=15, tag_sets=None): + super(SecondaryPreferred, self).__init__( + _SECONDARY_PREFERRED, latency_threshold_ms, tag_sets) class Nearest(ServerMode): @@ -206,21 +233,26 @@ class Nearest(ServerMode): members. :Parameters: + - `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when + selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, tag_sets=None): - super(Nearest, self).__init__(_NEAREST, tag_sets) + def __init__(self, latency_threshold_ms=15, tag_sets=None): + super(Nearest, self).__init__( + _NEAREST, latency_threshold_ms, tag_sets) _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest) - -def read_pref_class_from_mode(mode): - """Get the read preference class for a specific mode. - """ - return _ALL_READ_PREFERENCES[mode] +def make_read_preference(mode, latency_threshold_ms, tag_sets): + if mode == _PRIMARY: + if tag_sets not in (None, [{}]): + raise ConfigurationError("Read preference primary " + "cannot be combined with tags") + return Primary(latency_threshold_ms) + return _ALL_READ_PREFERENCES[mode](latency_threshold_ms, tag_sets) _MODES = ( diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index 712c7bb49..30bbf858b 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -653,6 +653,11 @@ class TestReadPreference(HATestCase): assertReadFrom(other_secondary, NEAREST) + # High secondaryAcceptableLatencyMS, should read from all members + assertReadFromAll( + [primary, secondary, other_secondary], + NEAREST, secondary_acceptable_latency_ms=1000*1000) + self.clear_ping_times() assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}]) @@ -815,6 +820,18 @@ class TestReadPreference(HATestCase): "Changing from tags %s to tags %s never unpinned" % ( tags0, tags1)) + # Finally, verify changing the secondary_acceptable_latency_ms unpins + # the member. + for _ in range(1000): + host = utils.read_from_which_host(c, SECONDARY, None, 15) + new_host = utils.read_from_which_host(c, SECONDARY, None, 20) + if host != new_host: + break + else: + self.fail( + "Changing secondary_acceptable_latency_ms from 15 to 20" + " never unpinned") + def tearDown(self): self.c.close() super(TestReadPreference, self).tearDown() diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 66bcd27f7..fd560e747 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -95,15 +95,15 @@ class TestReadPreferences(TestReadPreferencesBase): self.assertRaises(ConfigurationError, ServerMode, 0, tag_sets=[{}]) - S = Secondary([{}]) + S = Secondary(tag_sets=[{}]) self.assertEqual([{}], self._get_client(read_preference=S).read_preference.tag_sets) - S = Secondary([{'k': 'v'}]) + S = Secondary(tag_sets=[{'k': 'v'}]) self.assertEqual([{'k': 'v'}], self._get_client(read_preference=S).read_preference.tag_sets) - S = Secondary([{'k': 'v'}, {}]) + S = Secondary(tag_sets=[{'k': 'v'}, {}]) self.assertEqual([{'k': 'v'}, {}], self._get_client(read_preference=S).read_preference.tag_sets) @@ -118,16 +118,16 @@ class TestReadPreferences(TestReadPreferencesBase): def test_latency_validation(self): self.assertEqual(17, self._get_client( - acceptableLatencyMS=17 - ).acceptable_latency_ms) + latencyThresholdMS=17 + ).read_preference.latency_threshold_ms) self.assertEqual(42, self._get_client( - acceptableLatencyMS=42 - ).acceptable_latency_ms) + latencyThresholdMS=42 + ).read_preference.latency_threshold_ms) self.assertEqual(666, self._get_client( - acceptablelatencyms=666 - ).acceptable_latency_ms) + latencythresholdms=666 + ).read_preference.latency_threshold_ms) def test_primary(self): self.assertReadsFrom('primary', @@ -151,11 +151,11 @@ class TestReadPreferences(TestReadPreferencesBase): read_preference=ReadPreference.SECONDARY_PREFERRED) def test_nearest(self): - # With high acceptableLatencyMS, expect to read from any + # With high latencyThresholdMS, expect to read from any # member c = self._get_client( read_preference=ReadPreference.NEAREST, - acceptableLatencyMS=10000, # 10 seconds + latencyThresholdMS=10000, # 10 seconds auto_start_request=False) data_members = set(self.hosts).difference(set(self.arbiters)) @@ -203,7 +203,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): replicaSet=self.name, auto_start_request=False, # Effectively ignore members' ping times so we can test the effect # of ReadPreference modes only - acceptableLatencyMS=1000*1000) + latencyThresholdMS=1000*1000) self.client_version = Version.from_client(self.c) def tearDown(self): @@ -538,7 +538,7 @@ class TestMongosConnection(unittest.TestCase): None, [{}] ): # Create a client e.g. with read_preference=NEAREST - c = get_client(read_preference=mode(tag_sets)) + c = get_client(read_preference=mode(tag_sets=tag_sets)) self.assertEqual(is_mongos, c.is_mongos) cursor = c.pymongo_test.test.find() @@ -577,7 +577,7 @@ class TestMongosConnection(unittest.TestCase): [{'dc': 'la'}, {'dc': 'sf'}], [{'dc': 'la'}, {'dc': 'sf'}, {}], ): - c = get_client(read_preference=mode(tag_sets)) + c = get_client(read_preference=mode(tag_sets=tag_sets)) self.assertEqual(is_mongos, c.is_mongos) cursor = c.pymongo_test.test.find() diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 6e6360bfa..ea421264e 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -238,11 +238,11 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): ReadPreference.PRIMARY, cursor._Cursor__read_preference) tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}] - secondary = Secondary(tag_sets) + secondary = Secondary(tag_sets=tag_sets) c = MongoReplicaSetClient(pair, replicaSet=self.name, max_pool_size=25, document_class=SON, tz_aware=True, read_preference=secondary, - acceptablelatencyms=77) + secondaryacceptablelatencyms=77) self.assertEqual(c.primary, self.primary) self.assertEqual(c.hosts, self.hosts) self.assertEqual(c.arbiters, self.arbiters) @@ -257,7 +257,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): self.assertEqual( secondary, cursor._Cursor__read_preference) - nearest = Nearest([{'dc': 'ny'}, {}]) + nearest = Nearest(tag_sets=[{'dc': 'ny'}, {}]) cursor = c.pymongo_test.test.find(read_preference=nearest) self.assertEqual( @@ -1023,7 +1023,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): def test_pinned_member(self): latency = 1000 * 1000 - client = self._get_client(acceptablelatencyms=latency) + client = self._get_client(secondaryacceptablelatencyms=latency) host = read_from_which_host(client, ReadPreference.SECONDARY) self.assertTrue(host in client.secondaries) diff --git a/test/utils.py b/test/utils.py index f24286b27..8a7214f8e 100644 --- a/test/utils.py +++ b/test/utils.py @@ -276,8 +276,9 @@ class RendezvousThread(threading.Thread): def read_from_which_host( rsc, - mode, + pref, tag_sets=None, + secondary_acceptable_latency_ms=None ): """Read from a MongoReplicaSetClient with the given Read Preference mode, tags, and acceptable latency. Return the 'host:port' which was read from. @@ -288,10 +289,16 @@ def read_from_which_host( - `tag_sets`: List of dicts of tags for data-center-aware reads """ db = rsc.pymongo_test - db.read_preference = mode + if isinstance(tag_sets, dict): tag_sets = [tag_sets] - db.tag_sets = tag_sets or [{}] + if tag_sets or secondary_acceptable_latency_ms: + mode = pref.mode + latency = secondary_acceptable_latency_ms or pref.latency_threshold_ms + tags = tag_sets or pref.tag_sets + pref = pref.__class__(mode, latency, tags) + + db.read_preference = pref cursor = db.test.find() try: