diff --git a/doc/api/pymongo/mongo_client.rst b/doc/api/pymongo/mongo_client.rst index 9df5029f2..345404877 100644 --- a/doc/api/pymongo/mongo_client.rst +++ b/doc/api/pymongo/mongo_client.rst @@ -28,9 +28,9 @@ .. autoattribute:: max_message_size .. autoattribute:: min_wire_version .. autoattribute:: max_wire_version + .. autoattribute:: local_threshold_ms .. autoattribute:: codec_options .. autoattribute:: read_preference - .. autoattribute:: secondary_acceptable_latency_ms .. autoattribute:: write_concern .. autoattribute:: is_locked .. automethod:: database_names diff --git a/doc/api/pymongo/mongo_replica_set_client.rst b/doc/api/pymongo/mongo_replica_set_client.rst index 5181c75c9..824609cd3 100644 --- a/doc/api/pymongo/mongo_replica_set_client.rst +++ b/doc/api/pymongo/mongo_replica_set_client.rst @@ -27,9 +27,9 @@ .. autoattribute:: max_message_size .. autoattribute:: min_wire_version .. autoattribute:: max_wire_version + .. autoattribute:: local_threshold_ms .. autoattribute:: codec_options .. autoattribute:: read_preference - .. autoattribute:: secondary_acceptable_latency_ms .. autoattribute:: write_concern .. automethod:: database_names .. automethod:: drop_database diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 0fbc39d90..737bab62f 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -50,10 +50,8 @@ def _parse_read_preference(options): return options['read_preference'] mode = options.get('readpreference', 0) - threshold = options.get('secondaryacceptablelatencyms', - options.get('localthresholdms', 15)) tags = options.get('readpreferencetags') - return make_read_preference(mode, threshold, tags) + return make_read_preference(mode, tags) def _parse_write_concern(options): @@ -121,19 +119,30 @@ class ClientOptions(object): def __init__(self, username, password, database, options): options = dict([validate(opt, val) for opt, val in iteritems(options)]) + self.__codec_options = _parse_codec_options(options) self.__credentials = _parse_credentials( username, password, database, options) - self.__codec_options = _parse_codec_options(options) + self.__local_threshold_ms = options.get('localthresholdms', 15) self.__pool_options = _parse_pool_options(options) self.__read_preference = _parse_read_preference(options) self.__replica_set_name = options.get('replicaset') self.__write_concern = _parse_write_concern(options) + @property + def codec_options(self): + """A :class:`~pymongo.codec_options.CodecOptions` instance.""" + return self.__codec_options + @property def credentials(self): """A :class:`~pymongo.auth.MongoCredentials` instance or None.""" return self.__credentials + @property + def local_threshold_ms(self): + """The local threshold for this instance.""" + return self.__local_threshold_ms + @property def pool_options(self): """A :class:`~pymongo.pool.PoolOptions` instance.""" @@ -149,11 +158,6 @@ class ClientOptions(object): """Replica set name or None.""" return self.__replica_set_name - @property - def codec_options(self): - """A :class:`~pymongo.codec_options.CodecOptions` instance.""" - return self.__codec_options - @property def write_concern(self): """A :class:`~pymongo.write_concern.WriteConcern` instance.""" diff --git a/pymongo/common.py b/pymongo/common.py index 5755d3362..1f65254ff 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -331,7 +331,6 @@ VALIDATORS = { 'readpreference': validate_read_preference_mode, 'readpreferencetags': validate_read_preference_tags, 'localthresholdms': validate_positive_float, - 'secondaryacceptablelatencyms': validate_positive_float, 'authmechanism': validate_auth_mechanism, 'authsource': validate_string, 'authmechanismproperties': validate_auth_mechanism_properties, @@ -432,28 +431,6 @@ class BaseObject(object): read_preference = property(__get_read_pref, __set_read_pref) - def __get_latency(self): - """Deprecated. Use ``client.read_preference.local_threshold_ms``. - - See :class:`~pymongo.read_preferences.ReadPreference`. - - .. note:: ``secondary_acceptable_latency_ms`` is ignored when talking - to a replica set *through* a mongos. The equivalent is the - localThreshold_ command line option. - - .. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold - """ - return self.__read_pref.local_threshold_ms - - def __set_latency(self, threshold): - warnings.warn("The secondary_acceptable_latency_ms attribute is " - "deprecated", DeprecationWarning, stacklevel=2) - mode = self.__read_pref.mode - tag_sets = self.__read_pref.tag_sets - self.__read_pref = make_read_preference(mode, threshold, tag_sets) - - secondary_acceptable_latency_ms = property(__get_latency, __set_latency) - def __get_tags(self): return self.__read_pref.tag_sets @@ -461,8 +438,7 @@ class BaseObject(object): warnings.warn("The tag_sets attribute is deprecated", DeprecationWarning, stacklevel=2) mode = self.__read_pref.mode - threshold = self.__read_pref.local_threshold_ms - self.__read_pref = make_read_preference(mode, threshold, tag_sets) + self.__read_pref = make_read_preference(mode, tag_sets) tag_sets = property(__get_tags, __set_tags) diff --git a/pymongo/database.py b/pymongo/database.py index 71c5d4210..08a13fcc9 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -342,16 +342,14 @@ class Database(common.BaseObject): command.update(kwargs) orig = pref = read_preference or self.read_preference - threshold = kwargs.pop('secondary_acceptable_latency_ms', None) tags = kwargs.pop('tags_sets', None) - if threshold or tags: + if tags: warnings.warn("The secondary_acceptable_latency_ms " "and tag_sets options are deprecated", DeprecationWarning, stacklevel=3) mode = orig.mode tags = tags or orig.tag_sets - threshold = threshold or orig.local_threshold_ms - orig = make_read_preference(mode, threshold, tags) + orig = make_read_preference(mode, tags) if command_name not in SECONDARY_OK_COMMANDS: pref = ReadPreference.PRIMARY @@ -436,8 +434,8 @@ class Database(common.BaseObject): be added to the command document before it is sent .. versionchanged:: 3.0 - Deprecated the `tag_sets` and `secondary_acceptable_latency_ms` - options. + Deprecated the `tag_sets` option. + Removed the `secondary_acceptable_latency_ms` option. Removed `compile_re` option: PyMongo now always represents BSON regular expressions as :class:`~bson.regex.Regex` objects. Use :meth:`~bson.regex.Regex.try_compile` to attempt to convert from a diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 310fee1ca..f713cc33e 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -315,7 +315,8 @@ class MongoClient(common.BaseObject): pool_class=pool_class, pool_options=options.pool_options, monitor_class=monitor_class, - condition_class=condition_class) + condition_class=condition_class, + local_threshold_ms=options.local_threshold_ms) self._topology = Topology(self._topology_settings) if connect: @@ -610,6 +611,11 @@ class MongoClient(common.BaseObject): return self._server_property( 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE) + @property + def local_threshold_ms(self): + """The local threshold for this instance.""" + return self.__options.local_threshold_ms + def _writable_max_wire_version(self): """Connect to a writable server and get its max wire protocol version. diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index 8440e6afa..1d2f0fc86 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -17,8 +17,8 @@ from collections import Mapping, namedtuple from pymongo.errors import ConfigurationError -from pymongo.server_selectors import (near_member_with_tags_server_selector, - near_secondary_with_tags_server_selector, +from pymongo.server_selectors import (member_with_tags_server_selector, + secondary_with_tags_server_selector, writable_server_selector) @@ -64,20 +64,15 @@ class ServerMode(object): """Base class for all read preferences. """ - __slots__ = ("__mongos_mode", "__mode", "__threshold", "__tag_sets") + __slots__ = ("__mongos_mode", "__mode", "__tag_sets") - def __init__(self, mode, local_threshold_ms=15, tag_sets=None): + def __init__(self, mode, tag_sets=None): if mode == _PRIMARY and tag_sets is not None: raise ConfigurationError("Read preference primary " "cannot be combined with tags") self.__mongos_mode = _MONGOS_MODES[mode] self.__mode = mode self.__tag_sets = _validate_tag_sets(tag_sets) - try: - self.__threshold = float(local_threshold_ms) - except (ValueError, TypeError): - raise ConfigurationError("local_threshold_ms must " - "be a positive integer or float") @property def name(self): @@ -99,22 +94,6 @@ class ServerMode(object): """ return self.__mode - @property - def local_threshold_ms(self): - """An integer. Any replica-set member whose ping time is within - ``local_threshold_ms`` of the nearest member may accept reads. - When used with mongos high availability, any mongos whose ping - time is within ``local_threshold_ms`` of the nearest mongos - may be chosen as the new mongos during a failover. Default 15 - milliseconds. - - .. note:: ``local_threshold_ms`` is ignored when talking - to a replica set through a mongos. The equivalent is the - `localThreshold `_ - command line option. - """ - return self.__threshold - @property def tag_sets(self): """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to @@ -131,13 +110,14 @@ class ServerMode(object): return self.__tag_sets or [{}] def __repr__(self): - return "%s(local_threshold_ms=%d, tag_sets=%r)" % ( - self.name, self.__threshold, self.__tag_sets) + return "%s(tag_sets=%r)" % ( + self.name, self.__tag_sets) def __eq__(self, other): - return (self.mode == other.mode and - self.local_threshold_ms == other.local_threshold_ms and - self.tag_sets == other.tag_sets) + if isinstance(other, ServerMode): + return (self.mode == other.mode and + self.tag_sets == other.tag_sets) + raise NotImplementedError def __ne__(self, other): return not self == other @@ -151,14 +131,10 @@ class Primary(ServerMode): * When connected to a mongos queries are sent to the primary of a shard. * When connected to a replica set queries are sent to the primary of the replica set. - - :Parameters: - - `local_threshold_ms`: Used for mongos high availability. The - :attr:`~local_threshold_ms` when selecting a failover mongos. """ - def __init__(self, local_threshold_ms=15): - super(Primary, self).__init__(_PRIMARY, local_threshold_ms) + def __init__(self): + super(Primary, self).__init__(_PRIMARY) def __call__(self, server_descriptions): """Return matching ServerDescriptions from a list.""" @@ -167,6 +143,11 @@ class Primary(ServerMode): def __repr__(self): return "Primary" + def __eq__(self, other): + if isinstance(other, ServerMode): + return other.mode == _PRIMARY + raise NotImplementedError + class PrimaryPreferred(ServerMode): """PrimaryPreferred read preference. @@ -179,15 +160,12 @@ class PrimaryPreferred(ServerMode): available, otherwise a secondary. :Parameters: - - `local_threshold_ms`: The :attr:`~local_threshold_ms` when - selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not available. """ - def __init__(self, local_threshold_ms=15, tag_sets=None): - super(PrimaryPreferred, self).__init__( - _PRIMARY_PREFERRED, local_threshold_ms, tag_sets) + def __init__(self, tag_sets=None): + super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets) def __call__(self, server_descriptions): """Return matching ServerDescriptions from a list.""" @@ -195,9 +173,8 @@ class PrimaryPreferred(ServerMode): if writable_servers: return writable_servers else: - return near_secondary_with_tags_server_selector( + return secondary_with_tags_server_selector( self.tag_sets, - self.local_threshold_ms, server_descriptions) @@ -212,20 +189,16 @@ class Secondary(ServerMode): secondaries. An error is raised if no secondaries are available. :Parameters: - - `local_threshold_ms`: The :attr:`~local_threshold_ms` when - selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, local_threshold_ms=15, tag_sets=None): - super(Secondary, self).__init__( - _SECONDARY, local_threshold_ms, tag_sets) + def __init__(self, tag_sets=None): + super(Secondary, self).__init__(_SECONDARY, tag_sets) def __call__(self, server_descriptions): """Return matching ServerDescriptions from a list.""" - return near_secondary_with_tags_server_selector( + return secondary_with_tags_server_selector( self.tag_sets, - self.local_threshold_ms, server_descriptions) @@ -240,20 +213,16 @@ class SecondaryPreferred(ServerMode): secondaries, or the primary if no secondary is available. :Parameters: - - `local_threshold_ms`: The :attr:`~local_threshold_ms` when - selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, local_threshold_ms=15, tag_sets=None): - super(SecondaryPreferred, self).__init__( - _SECONDARY_PREFERRED, local_threshold_ms, tag_sets) + def __init__(self, tag_sets=None): + super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets) def __call__(self, server_descriptions): """Return matching ServerDescriptions from a list.""" - secondaries = near_secondary_with_tags_server_selector( + secondaries = secondary_with_tags_server_selector( self.tag_sets, - self.local_threshold_ms, server_descriptions) if secondaries: @@ -273,33 +242,29 @@ class Nearest(ServerMode): members. :Parameters: - - `local_threshold_ms`: The :attr:`~local_threshold_ms` when - selecting a secondary. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference """ - def __init__(self, local_threshold_ms=15, tag_sets=None): - super(Nearest, self).__init__( - _NEAREST, local_threshold_ms, tag_sets) + def __init__(self, tag_sets=None): + super(Nearest, self).__init__(_NEAREST, tag_sets) def __call__(self, server_descriptions): """Return matching ServerDescriptions from a list.""" - return near_member_with_tags_server_selector( + return member_with_tags_server_selector( self.tag_sets or [{}], - self.local_threshold_ms, server_descriptions) _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest) -def make_read_preference(mode, local_threshold_ms, tag_sets): +def make_read_preference(mode, tag_sets): if mode == _PRIMARY: if tag_sets not in (None, [{}]): raise ConfigurationError("Read preference primary " "cannot be combined with tags") - return Primary(local_threshold_ms) - return _ALL_READ_PREFERENCES[mode](local_threshold_ms, tag_sets) + return Primary() + return _ALL_READ_PREFERENCES[mode](tag_sets) _MODES = ( diff --git a/pymongo/server_selectors.py b/pymongo/server_selectors.py index 24d4b7807..b8774a71c 100644 --- a/pymongo/server_selectors.py +++ b/pymongo/server_selectors.py @@ -22,8 +22,7 @@ def any_server_selector(server_descriptions): def address_server_selector(address, server_descriptions): - return [s for s in server_descriptions - if s.address == address] + return [s for s in server_descriptions if s.address == address] def writable_server_selector(server_descriptions): @@ -80,7 +79,7 @@ def tag_sets_server_selector(tag_sets, server_descriptions): return [] -def near_enough_server_selector(latency_ms, server_descriptions): +def apply_local_threshold(latency_ms, server_descriptions): """All servers with round trip times within latency_ms of the fastest one. No ServerDescription's round_trip_time can be None. @@ -99,25 +98,12 @@ def near_enough_server_selector(latency_ms, server_descriptions): if (s.round_trip_time - fastest) < latency_ms / 1000.] -def near_secondary_with_tags_server_selector( - tag_sets, - latency_ms, - server_descriptions): +def secondary_with_tags_server_selector(tag_sets, server_descriptions): """All near-enough secondaries matching the tag sets.""" - return near_enough_server_selector( - latency_ms, - tag_sets_server_selector( - tag_sets, - secondary_server_selector(server_descriptions))) + return tag_sets_server_selector( + tag_sets, secondary_server_selector(server_descriptions)) -def near_member_with_tags_server_selector( - tag_sets, - latency_ms, - server_descriptions): +def member_with_tags_server_selector(tag_sets, server_descriptions): """All near-enough members matching the tag sets.""" - return near_enough_server_selector( - latency_ms, - tag_sets_server_selector( - tag_sets, - server_descriptions)) + return tag_sets_server_selector(tag_sets, server_descriptions) diff --git a/pymongo/settings.py b/pymongo/settings.py index b0e71ad7a..4ac5c3a92 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -31,6 +31,7 @@ class TopologySettings(object): pool_options=None, monitor_class=None, condition_class=None, + local_threshold_ms=15, ): """Represent MongoClient's configuration. @@ -42,6 +43,7 @@ class TopologySettings(object): self._pool_options = pool_options or PoolOptions() self._monitor_class = monitor_class or monitor.Monitor self._condition_class = condition_class or threading.Condition + self._local_threshold_ms = local_threshold_ms self._direct = (len(self._seeds) == 1 and not replica_set_name) @property @@ -69,6 +71,10 @@ class TopologySettings(object): def condition_class(self): return self._condition_class + @property + def local_threshold_ms(self): + return self._local_threshold_ms + @property def direct(self): """Connect directly to a single server, or use a set of servers? diff --git a/pymongo/topology.py b/pymongo/topology.py index 769455ca2..85b9308df 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -28,6 +28,7 @@ from pymongo.errors import AutoReconnect from pymongo.monotonic import time as _time from pymongo.server import Server from pymongo.server_selectors import (address_server_selector, + apply_local_threshold, arbiter_server_selector, secondary_server_selector, writable_server_selector) @@ -277,7 +278,9 @@ class Topology(object): # Ignore the selector. return self._description.known_servers else: - return selector(self._description.known_servers) + sds = selector(self._description.known_servers) + return apply_local_threshold( + self._settings.local_threshold_ms, sds) def _update_servers(self): """Sync our Servers from TopologyDescription.server_descriptions. diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index cf16805f2..4333b131c 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -633,11 +633,6 @@ class TestReadPreference(HATestCase): assertReadFrom(other_secondary, NEAREST) - # High secondaryAcceptableLatencyMS, should read from all members - assertReadFromAll( - [primary, secondary, other_secondary], - NEAREST, secondary_acceptable_latency_ms=1000*1000) - self.clear_ping_times() assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}]) @@ -747,55 +742,6 @@ class TestReadPreference(HATestCase): self.clear_ping_times() - def test_pinning(self): - raise SkipTest('Pinning not implemented in PyMongo 3') - - c = MongoClient(self.seed, replicaSet=self.name) - - # Verify that changing the mode unpins the member. We'll try it for - # every relevant change of mode. - for mode0, mode1 in itertools.permutations( - (PRIMARY, SECONDARY, SECONDARY_PREFERRED, NEAREST), 2 - ): - # Try reading and then changing modes and reading again, see if we - # read from a different host - for _ in range(1000): - # pin to this host - host = utils.read_from_which_host(c, mode0) - # unpin? - new_host = utils.read_from_which_host(c, mode1) - if host != new_host: - # Reading with a different mode unpinned, hooray! - break - else: - self.fail("Changing from mode %r to mode " - "%r never unpinned" % (mode0, mode1)) - - # Now verify changing the tag_sets unpins the member. - tags0 = [{'a': 'a'}, {}] - tags1 = [{'a': 'x'}, {}] - for _ in range(1000): - host = utils.read_from_which_host(c, NEAREST, tags0) - new_host = utils.read_from_which_host(c, NEAREST, tags1) - if host != new_host: - break - else: - self.fail( - "Changing from tags %s to tags %s never unpinned" % ( - tags0, tags1)) - - # Finally, verify changing the secondary_acceptable_latency_ms unpins - # the member. - for _ in range(1000): - host = utils.read_from_which_host(c, SECONDARY, None, 15) - new_host = utils.read_from_which_host(c, SECONDARY, None, 20) - if host != new_host: - break - else: - self.fail( - "Changing secondary_acceptable_latency_ms from 15 to 20" - " never unpinned") - def tearDown(self): self.c.close() super(TestReadPreference, self).tearDown() diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 1c4140c47..f9bb31ad9 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -139,15 +139,15 @@ class TestReadPreferences(TestReadPreferencesBase): def test_threshold_validation(self): self.assertEqual(17, rs_client( localThresholdMS=17 - ).read_preference.local_threshold_ms) + ).local_threshold_ms) self.assertEqual(42, rs_client( localThresholdMS=42 - ).read_preference.local_threshold_ms) + ).local_threshold_ms) self.assertEqual(666, rs_client( localthresholdms=666 - ).read_preference.local_threshold_ms) + ).local_threshold_ms) def test_primary(self): self.assertReadsFrom('primary', @@ -263,7 +263,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): (SecondaryPreferred, SERVER_TYPE.RSSecondary), (Nearest, 'any'), ]: - self.c.read_preference = mode(local_threshold_ms=1000*1000) + self.c.read_preference = mode() for i in range(10): if server_type == 'any': used = set() diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 2eabfd8ce..2547c05f0 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -143,7 +143,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase): pair, replicaSet=self.name, max_pool_size=25, document_class=SON, tz_aware=True, read_preference=secondary, - secondaryacceptablelatencyms=77) + localThresholdMS=77) self.assertEqual(c.max_pool_size, 25) self.assertEqual(c.document_class, SON) diff --git a/test/test_topology.py b/test/test_topology.py index 530995e33..309abb081 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -633,12 +633,12 @@ class TestServerSelectionErrors(TopologyTest): self.assertMessage( 'No replica set members match selector' - ' "Secondary(local_threshold_ms=15, tag_sets=None)"', + ' "Secondary(tag_sets=None)"', t, ReadPreference.SECONDARY) self.assertMessage( "No replica set members match selector" - " \"Secondary(local_threshold_ms=15, tag_sets=[{'dc': 'ny'}])\"", + " \"Secondary(tag_sets=[{'dc': 'ny'}])\"", t, Secondary(tag_sets=[{'dc': 'ny'}])) def test_bad_replica_set_name(self): diff --git a/test/utils.py b/test/utils.py index 32b1e0cef..7f3bcac56 100644 --- a/test/utils.py +++ b/test/utils.py @@ -279,7 +279,6 @@ def read_from_which_host( client, pref, tag_sets=None, - secondary_acceptable_latency_ms=None ): """Read from a client with the given Read Preference. @@ -289,16 +288,14 @@ def read_from_which_host( - `client`: A MongoClient - `mode`: A ReadPreference - `tag_sets`: List of dicts of tags for data-center-aware reads - - `secondary_acceptable_latency_ms`: Size of latency window """ db = client.pymongo_test if isinstance(tag_sets, dict): tag_sets = [tag_sets] - if tag_sets or secondary_acceptable_latency_ms: - threshold = secondary_acceptable_latency_ms or pref.local_threshold_ms + if tag_sets: tags = tag_sets or pref.tag_sets - pref = pref.__class__(threshold, tags) + pref = pref.__class__(tags) db.read_preference = pref