PYTHON-812 - Make local threshold global and immutable.

This commit is contained in:
Bernie Hackett 2014-12-29 17:51:02 -08:00
parent a788b1f1aa
commit 4e13a39db9
15 changed files with 85 additions and 198 deletions

View File

@ -28,9 +28,9 @@
.. autoattribute:: max_message_size
.. autoattribute:: min_wire_version
.. autoattribute:: max_wire_version
.. autoattribute:: local_threshold_ms
.. autoattribute:: codec_options
.. autoattribute:: read_preference
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: write_concern
.. autoattribute:: is_locked
.. automethod:: database_names

View File

@ -27,9 +27,9 @@
.. autoattribute:: max_message_size
.. autoattribute:: min_wire_version
.. autoattribute:: max_wire_version
.. autoattribute:: local_threshold_ms
.. autoattribute:: codec_options
.. autoattribute:: read_preference
.. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: write_concern
.. automethod:: database_names
.. automethod:: drop_database

View File

@ -50,10 +50,8 @@ def _parse_read_preference(options):
return options['read_preference']
mode = options.get('readpreference', 0)
threshold = options.get('secondaryacceptablelatencyms',
options.get('localthresholdms', 15))
tags = options.get('readpreferencetags')
return make_read_preference(mode, threshold, tags)
return make_read_preference(mode, tags)
def _parse_write_concern(options):
@ -121,19 +119,30 @@ class ClientOptions(object):
def __init__(self, username, password, database, options):
options = dict([validate(opt, val) for opt, val in iteritems(options)])
self.__codec_options = _parse_codec_options(options)
self.__credentials = _parse_credentials(
username, password, database, options)
self.__codec_options = _parse_codec_options(options)
self.__local_threshold_ms = options.get('localthresholdms', 15)
self.__pool_options = _parse_pool_options(options)
self.__read_preference = _parse_read_preference(options)
self.__replica_set_name = options.get('replicaset')
self.__write_concern = _parse_write_concern(options)
@property
def codec_options(self):
"""A :class:`~pymongo.codec_options.CodecOptions` instance."""
return self.__codec_options
@property
def credentials(self):
"""A :class:`~pymongo.auth.MongoCredentials` instance or None."""
return self.__credentials
@property
def local_threshold_ms(self):
"""The local threshold for this instance."""
return self.__local_threshold_ms
@property
def pool_options(self):
"""A :class:`~pymongo.pool.PoolOptions` instance."""
@ -149,11 +158,6 @@ class ClientOptions(object):
"""Replica set name or None."""
return self.__replica_set_name
@property
def codec_options(self):
"""A :class:`~pymongo.codec_options.CodecOptions` instance."""
return self.__codec_options
@property
def write_concern(self):
"""A :class:`~pymongo.write_concern.WriteConcern` instance."""

View File

@ -331,7 +331,6 @@ VALIDATORS = {
'readpreference': validate_read_preference_mode,
'readpreferencetags': validate_read_preference_tags,
'localthresholdms': validate_positive_float,
'secondaryacceptablelatencyms': validate_positive_float,
'authmechanism': validate_auth_mechanism,
'authsource': validate_string,
'authmechanismproperties': validate_auth_mechanism_properties,
@ -432,28 +431,6 @@ class BaseObject(object):
read_preference = property(__get_read_pref, __set_read_pref)
def __get_latency(self):
"""Deprecated. Use ``client.read_preference.local_threshold_ms``.
See :class:`~pymongo.read_preferences.ReadPreference`.
.. note:: ``secondary_acceptable_latency_ms`` is ignored when talking
to a replica set *through* a mongos. The equivalent is the
localThreshold_ command line option.
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
"""
return self.__read_pref.local_threshold_ms
def __set_latency(self, threshold):
warnings.warn("The secondary_acceptable_latency_ms attribute is "
"deprecated", DeprecationWarning, stacklevel=2)
mode = self.__read_pref.mode
tag_sets = self.__read_pref.tag_sets
self.__read_pref = make_read_preference(mode, threshold, tag_sets)
secondary_acceptable_latency_ms = property(__get_latency, __set_latency)
def __get_tags(self):
return self.__read_pref.tag_sets
@ -461,8 +438,7 @@ class BaseObject(object):
warnings.warn("The tag_sets attribute is deprecated",
DeprecationWarning, stacklevel=2)
mode = self.__read_pref.mode
threshold = self.__read_pref.local_threshold_ms
self.__read_pref = make_read_preference(mode, threshold, tag_sets)
self.__read_pref = make_read_preference(mode, tag_sets)
tag_sets = property(__get_tags, __set_tags)

View File

@ -342,16 +342,14 @@ class Database(common.BaseObject):
command.update(kwargs)
orig = pref = read_preference or self.read_preference
threshold = kwargs.pop('secondary_acceptable_latency_ms', None)
tags = kwargs.pop('tags_sets', None)
if threshold or tags:
if tags:
warnings.warn("The secondary_acceptable_latency_ms "
"and tag_sets options are deprecated",
DeprecationWarning, stacklevel=3)
mode = orig.mode
tags = tags or orig.tag_sets
threshold = threshold or orig.local_threshold_ms
orig = make_read_preference(mode, threshold, tags)
orig = make_read_preference(mode, tags)
if command_name not in SECONDARY_OK_COMMANDS:
pref = ReadPreference.PRIMARY
@ -436,8 +434,8 @@ class Database(common.BaseObject):
be added to the command document before it is sent
.. versionchanged:: 3.0
Deprecated the `tag_sets` and `secondary_acceptable_latency_ms`
options.
Deprecated the `tag_sets` option.
Removed the `secondary_acceptable_latency_ms` option.
Removed `compile_re` option: PyMongo now always represents BSON
regular expressions as :class:`~bson.regex.Regex` objects. Use
:meth:`~bson.regex.Regex.try_compile` to attempt to convert from a

View File

@ -315,7 +315,8 @@ class MongoClient(common.BaseObject):
pool_class=pool_class,
pool_options=options.pool_options,
monitor_class=monitor_class,
condition_class=condition_class)
condition_class=condition_class,
local_threshold_ms=options.local_threshold_ms)
self._topology = Topology(self._topology_settings)
if connect:
@ -610,6 +611,11 @@ class MongoClient(common.BaseObject):
return self._server_property(
'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
@property
def local_threshold_ms(self):
"""The local threshold for this instance."""
return self.__options.local_threshold_ms
def _writable_max_wire_version(self):
"""Connect to a writable server and get its max wire protocol version.

View File

@ -17,8 +17,8 @@
from collections import Mapping, namedtuple
from pymongo.errors import ConfigurationError
from pymongo.server_selectors import (near_member_with_tags_server_selector,
near_secondary_with_tags_server_selector,
from pymongo.server_selectors import (member_with_tags_server_selector,
secondary_with_tags_server_selector,
writable_server_selector)
@ -64,20 +64,15 @@ class ServerMode(object):
"""Base class for all read preferences.
"""
__slots__ = ("__mongos_mode", "__mode", "__threshold", "__tag_sets")
__slots__ = ("__mongos_mode", "__mode", "__tag_sets")
def __init__(self, mode, local_threshold_ms=15, tag_sets=None):
def __init__(self, mode, tag_sets=None):
if mode == _PRIMARY and tag_sets is not None:
raise ConfigurationError("Read preference primary "
"cannot be combined with tags")
self.__mongos_mode = _MONGOS_MODES[mode]
self.__mode = mode
self.__tag_sets = _validate_tag_sets(tag_sets)
try:
self.__threshold = float(local_threshold_ms)
except (ValueError, TypeError):
raise ConfigurationError("local_threshold_ms must "
"be a positive integer or float")
@property
def name(self):
@ -99,22 +94,6 @@ class ServerMode(object):
"""
return self.__mode
@property
def local_threshold_ms(self):
"""An integer. Any replica-set member whose ping time is within
``local_threshold_ms`` of the nearest member may accept reads.
When used with mongos high availability, any mongos whose ping
time is within ``local_threshold_ms`` of the nearest mongos
may be chosen as the new mongos during a failover. Default 15
milliseconds.
.. note:: ``local_threshold_ms`` is ignored when talking
to a replica set through a mongos. The equivalent is the
`localThreshold <http://docs.mongodb.org/manual/reference/mongos/#cmdoption--localThreshold>`_
command line option.
"""
return self.__threshold
@property
def tag_sets(self):
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
@ -131,13 +110,14 @@ class ServerMode(object):
return self.__tag_sets or [{}]
def __repr__(self):
return "%s(local_threshold_ms=%d, tag_sets=%r)" % (
self.name, self.__threshold, self.__tag_sets)
return "%s(tag_sets=%r)" % (
self.name, self.__tag_sets)
def __eq__(self, other):
return (self.mode == other.mode and
self.local_threshold_ms == other.local_threshold_ms and
self.tag_sets == other.tag_sets)
if isinstance(other, ServerMode):
return (self.mode == other.mode and
self.tag_sets == other.tag_sets)
raise NotImplementedError
def __ne__(self, other):
return not self == other
@ -151,14 +131,10 @@ class Primary(ServerMode):
* When connected to a mongos queries are sent to the primary of a shard.
* When connected to a replica set queries are sent to the primary of
the replica set.
:Parameters:
- `local_threshold_ms`: Used for mongos high availability. The
:attr:`~local_threshold_ms` when selecting a failover mongos.
"""
def __init__(self, local_threshold_ms=15):
super(Primary, self).__init__(_PRIMARY, local_threshold_ms)
def __init__(self):
super(Primary, self).__init__(_PRIMARY)
def __call__(self, server_descriptions):
"""Return matching ServerDescriptions from a list."""
@ -167,6 +143,11 @@ class Primary(ServerMode):
def __repr__(self):
return "Primary"
def __eq__(self, other):
if isinstance(other, ServerMode):
return other.mode == _PRIMARY
raise NotImplementedError
class PrimaryPreferred(ServerMode):
"""PrimaryPreferred read preference.
@ -179,15 +160,12 @@ class PrimaryPreferred(ServerMode):
available, otherwise a secondary.
:Parameters:
- `local_threshold_ms`: The :attr:`~local_threshold_ms` when
selecting a secondary.
- `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
available.
"""
def __init__(self, local_threshold_ms=15, tag_sets=None):
super(PrimaryPreferred, self).__init__(
_PRIMARY_PREFERRED, local_threshold_ms, tag_sets)
def __init__(self, tag_sets=None):
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
def __call__(self, server_descriptions):
"""Return matching ServerDescriptions from a list."""
@ -195,9 +173,8 @@ class PrimaryPreferred(ServerMode):
if writable_servers:
return writable_servers
else:
return near_secondary_with_tags_server_selector(
return secondary_with_tags_server_selector(
self.tag_sets,
self.local_threshold_ms,
server_descriptions)
@ -212,20 +189,16 @@ class Secondary(ServerMode):
secondaries. An error is raised if no secondaries are available.
:Parameters:
- `local_threshold_ms`: The :attr:`~local_threshold_ms` when
selecting a secondary.
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, local_threshold_ms=15, tag_sets=None):
super(Secondary, self).__init__(
_SECONDARY, local_threshold_ms, tag_sets)
def __init__(self, tag_sets=None):
super(Secondary, self).__init__(_SECONDARY, tag_sets)
def __call__(self, server_descriptions):
"""Return matching ServerDescriptions from a list."""
return near_secondary_with_tags_server_selector(
return secondary_with_tags_server_selector(
self.tag_sets,
self.local_threshold_ms,
server_descriptions)
@ -240,20 +213,16 @@ class SecondaryPreferred(ServerMode):
secondaries, or the primary if no secondary is available.
:Parameters:
- `local_threshold_ms`: The :attr:`~local_threshold_ms` when
selecting a secondary.
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, local_threshold_ms=15, tag_sets=None):
super(SecondaryPreferred, self).__init__(
_SECONDARY_PREFERRED, local_threshold_ms, tag_sets)
def __init__(self, tag_sets=None):
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
def __call__(self, server_descriptions):
"""Return matching ServerDescriptions from a list."""
secondaries = near_secondary_with_tags_server_selector(
secondaries = secondary_with_tags_server_selector(
self.tag_sets,
self.local_threshold_ms,
server_descriptions)
if secondaries:
@ -273,33 +242,29 @@ class Nearest(ServerMode):
members.
:Parameters:
- `local_threshold_ms`: The :attr:`~local_threshold_ms` when
selecting a secondary.
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
"""
def __init__(self, local_threshold_ms=15, tag_sets=None):
super(Nearest, self).__init__(
_NEAREST, local_threshold_ms, tag_sets)
def __init__(self, tag_sets=None):
super(Nearest, self).__init__(_NEAREST, tag_sets)
def __call__(self, server_descriptions):
"""Return matching ServerDescriptions from a list."""
return near_member_with_tags_server_selector(
return member_with_tags_server_selector(
self.tag_sets or [{}],
self.local_threshold_ms,
server_descriptions)
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
Secondary, SecondaryPreferred, Nearest)
def make_read_preference(mode, local_threshold_ms, tag_sets):
def make_read_preference(mode, tag_sets):
if mode == _PRIMARY:
if tag_sets not in (None, [{}]):
raise ConfigurationError("Read preference primary "
"cannot be combined with tags")
return Primary(local_threshold_ms)
return _ALL_READ_PREFERENCES[mode](local_threshold_ms, tag_sets)
return Primary()
return _ALL_READ_PREFERENCES[mode](tag_sets)
_MODES = (

View File

@ -22,8 +22,7 @@ def any_server_selector(server_descriptions):
def address_server_selector(address, server_descriptions):
return [s for s in server_descriptions
if s.address == address]
return [s for s in server_descriptions if s.address == address]
def writable_server_selector(server_descriptions):
@ -80,7 +79,7 @@ def tag_sets_server_selector(tag_sets, server_descriptions):
return []
def near_enough_server_selector(latency_ms, server_descriptions):
def apply_local_threshold(latency_ms, server_descriptions):
"""All servers with round trip times within latency_ms of the fastest one.
No ServerDescription's round_trip_time can be None.
@ -99,25 +98,12 @@ def near_enough_server_selector(latency_ms, server_descriptions):
if (s.round_trip_time - fastest) < latency_ms / 1000.]
def near_secondary_with_tags_server_selector(
tag_sets,
latency_ms,
server_descriptions):
def secondary_with_tags_server_selector(tag_sets, server_descriptions):
"""All near-enough secondaries matching the tag sets."""
return near_enough_server_selector(
latency_ms,
tag_sets_server_selector(
tag_sets,
secondary_server_selector(server_descriptions)))
return tag_sets_server_selector(
tag_sets, secondary_server_selector(server_descriptions))
def near_member_with_tags_server_selector(
tag_sets,
latency_ms,
server_descriptions):
def member_with_tags_server_selector(tag_sets, server_descriptions):
"""All near-enough members matching the tag sets."""
return near_enough_server_selector(
latency_ms,
tag_sets_server_selector(
tag_sets,
server_descriptions))
return tag_sets_server_selector(tag_sets, server_descriptions)

View File

@ -31,6 +31,7 @@ class TopologySettings(object):
pool_options=None,
monitor_class=None,
condition_class=None,
local_threshold_ms=15,
):
"""Represent MongoClient's configuration.
@ -42,6 +43,7 @@ class TopologySettings(object):
self._pool_options = pool_options or PoolOptions()
self._monitor_class = monitor_class or monitor.Monitor
self._condition_class = condition_class or threading.Condition
self._local_threshold_ms = local_threshold_ms
self._direct = (len(self._seeds) == 1 and not replica_set_name)
@property
@ -69,6 +71,10 @@ class TopologySettings(object):
def condition_class(self):
return self._condition_class
@property
def local_threshold_ms(self):
return self._local_threshold_ms
@property
def direct(self):
"""Connect directly to a single server, or use a set of servers?

View File

@ -28,6 +28,7 @@ from pymongo.errors import AutoReconnect
from pymongo.monotonic import time as _time
from pymongo.server import Server
from pymongo.server_selectors import (address_server_selector,
apply_local_threshold,
arbiter_server_selector,
secondary_server_selector,
writable_server_selector)
@ -277,7 +278,9 @@ class Topology(object):
# Ignore the selector.
return self._description.known_servers
else:
return selector(self._description.known_servers)
sds = selector(self._description.known_servers)
return apply_local_threshold(
self._settings.local_threshold_ms, sds)
def _update_servers(self):
"""Sync our Servers from TopologyDescription.server_descriptions.

View File

@ -633,11 +633,6 @@ class TestReadPreference(HATestCase):
assertReadFrom(other_secondary, NEAREST)
# High secondaryAcceptableLatencyMS, should read from all members
assertReadFromAll(
[primary, secondary, other_secondary],
NEAREST, secondary_acceptable_latency_ms=1000*1000)
self.clear_ping_times()
assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}])
@ -747,55 +742,6 @@ class TestReadPreference(HATestCase):
self.clear_ping_times()
def test_pinning(self):
raise SkipTest('Pinning not implemented in PyMongo 3')
c = MongoClient(self.seed, replicaSet=self.name)
# Verify that changing the mode unpins the member. We'll try it for
# every relevant change of mode.
for mode0, mode1 in itertools.permutations(
(PRIMARY, SECONDARY, SECONDARY_PREFERRED, NEAREST), 2
):
# Try reading and then changing modes and reading again, see if we
# read from a different host
for _ in range(1000):
# pin to this host
host = utils.read_from_which_host(c, mode0)
# unpin?
new_host = utils.read_from_which_host(c, mode1)
if host != new_host:
# Reading with a different mode unpinned, hooray!
break
else:
self.fail("Changing from mode %r to mode "
"%r never unpinned" % (mode0, mode1))
# Now verify changing the tag_sets unpins the member.
tags0 = [{'a': 'a'}, {}]
tags1 = [{'a': 'x'}, {}]
for _ in range(1000):
host = utils.read_from_which_host(c, NEAREST, tags0)
new_host = utils.read_from_which_host(c, NEAREST, tags1)
if host != new_host:
break
else:
self.fail(
"Changing from tags %s to tags %s never unpinned" % (
tags0, tags1))
# Finally, verify changing the secondary_acceptable_latency_ms unpins
# the member.
for _ in range(1000):
host = utils.read_from_which_host(c, SECONDARY, None, 15)
new_host = utils.read_from_which_host(c, SECONDARY, None, 20)
if host != new_host:
break
else:
self.fail(
"Changing secondary_acceptable_latency_ms from 15 to 20"
" never unpinned")
def tearDown(self):
self.c.close()
super(TestReadPreference, self).tearDown()

View File

@ -139,15 +139,15 @@ class TestReadPreferences(TestReadPreferencesBase):
def test_threshold_validation(self):
self.assertEqual(17, rs_client(
localThresholdMS=17
).read_preference.local_threshold_ms)
).local_threshold_ms)
self.assertEqual(42, rs_client(
localThresholdMS=42
).read_preference.local_threshold_ms)
).local_threshold_ms)
self.assertEqual(666, rs_client(
localthresholdms=666
).read_preference.local_threshold_ms)
).local_threshold_ms)
def test_primary(self):
self.assertReadsFrom('primary',
@ -263,7 +263,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
(Nearest, 'any'),
]:
self.c.read_preference = mode(local_threshold_ms=1000*1000)
self.c.read_preference = mode()
for i in range(10):
if server_type == 'any':
used = set()

View File

@ -143,7 +143,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase):
pair, replicaSet=self.name, max_pool_size=25,
document_class=SON, tz_aware=True,
read_preference=secondary,
secondaryacceptablelatencyms=77)
localThresholdMS=77)
self.assertEqual(c.max_pool_size, 25)
self.assertEqual(c.document_class, SON)

View File

@ -633,12 +633,12 @@ class TestServerSelectionErrors(TopologyTest):
self.assertMessage(
'No replica set members match selector'
' "Secondary(local_threshold_ms=15, tag_sets=None)"',
' "Secondary(tag_sets=None)"',
t, ReadPreference.SECONDARY)
self.assertMessage(
"No replica set members match selector"
" \"Secondary(local_threshold_ms=15, tag_sets=[{'dc': 'ny'}])\"",
" \"Secondary(tag_sets=[{'dc': 'ny'}])\"",
t, Secondary(tag_sets=[{'dc': 'ny'}]))
def test_bad_replica_set_name(self):

View File

@ -279,7 +279,6 @@ def read_from_which_host(
client,
pref,
tag_sets=None,
secondary_acceptable_latency_ms=None
):
"""Read from a client with the given Read Preference.
@ -289,16 +288,14 @@ def read_from_which_host(
- `client`: A MongoClient
- `mode`: A ReadPreference
- `tag_sets`: List of dicts of tags for data-center-aware reads
- `secondary_acceptable_latency_ms`: Size of latency window
"""
db = client.pymongo_test
if isinstance(tag_sets, dict):
tag_sets = [tag_sets]
if tag_sets or secondary_acceptable_latency_ms:
threshold = secondary_acceptable_latency_ms or pref.local_threshold_ms
if tag_sets:
tags = tag_sets or pref.tag_sets
pref = pref.__class__(threshold, tags)
pref = pref.__class__(tags)
db.read_preference = pref