PYTHON-719 Read preference backward compatibility
This commit does a few things: - Adds tag_sets back (deprecated) - Adds secondary_acceptable_latency_ms back (deprecated) - Makes acceptable latency a per read preference setting - Cleans up read preference instance generation - Adds latencyThresholdMS as an alias for secondaryAcceptableLatencyMS. The name may change before 3.0 is released.
This commit is contained in:
parent
665440be62
commit
be35ff71a2
@ -4,7 +4,7 @@
|
||||
.. automodule:: pymongo.cursor
|
||||
:synopsis: Tools for iterating over MongoDB query results
|
||||
|
||||
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, exhaust=False, compile_re=True)
|
||||
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=None, tag_sets=None, secondary_acceptable_latency_ms=None, exhaust=False, compile_re=True)
|
||||
:members:
|
||||
|
||||
.. describe:: c[index]
|
||||
|
||||
@ -200,7 +200,7 @@ per-query basis, e.g.::
|
||||
>>> db.collection.find_one(read_preference=ReadPreference.PRIMARY)
|
||||
|
||||
Reads are configured using three options: **read_preference**, **tag_sets**,
|
||||
and **acceptableLatencyMS**.
|
||||
and *latency_threshold_ms**.
|
||||
|
||||
**read_preference**:
|
||||
|
||||
@ -210,18 +210,18 @@ and **acceptableLatencyMS**.
|
||||
|
||||
- ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
|
||||
none, read from a secondary matching your choice of ``tag_sets`` and
|
||||
``acceptableLatencyMS``.
|
||||
``latency_threshold_ms``.
|
||||
|
||||
- ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and
|
||||
``acceptableLatencyMS``. If no matching secondary is available,
|
||||
``latency_threshold_ms``. If no matching secondary is available,
|
||||
raise :class:`~pymongo.errors.AutoReconnect`.
|
||||
|
||||
- ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of
|
||||
``tag_sets`` and ``acceptableLatencyMS`` if available, otherwise
|
||||
``tag_sets`` and ``latency_threshold_ms`` if available, otherwise
|
||||
from primary (regardless of the primary's tags and latency).
|
||||
|
||||
- ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and
|
||||
``acceptableLatencyMS``.
|
||||
``latency_threshold_ms``.
|
||||
|
||||
**tag_sets**:
|
||||
|
||||
@ -251,18 +251,18 @@ from any member that matches the mode, ignoring tags."
|
||||
|
||||
See :mod:`~pymongo.read_preferences` for more information.
|
||||
|
||||
**acceptableLatencyMS**:
|
||||
**latency_threshold_ms**:
|
||||
|
||||
If multiple members match the mode and tag sets, MongoReplicaSetClient reads
|
||||
from among the nearest members, chosen according to ping time. By default,
|
||||
only members whose ping times are within 15 milliseconds of the nearest
|
||||
are used for queries. You can choose to distribute reads among members with
|
||||
higher latencies by setting ``acceptableLatencyMS`` to a larger
|
||||
higher latencies by setting ``latency_threshold_ms`` to a larger
|
||||
number. In that case, MongoReplicaSetClient distributes reads among matching
|
||||
members within ``acceptableLatencyMS`` of the closest member's
|
||||
members within ``latency_threshold_ms`` of the closest member's
|
||||
ping time.
|
||||
|
||||
.. note:: ``acceptableLatencyMS`` is ignored when talking to a
|
||||
.. note:: ``latency_threshold_ms`` is ignored when talking to a
|
||||
replica set *through* a mongos. The equivalent is the localThreshold_ command
|
||||
line option.
|
||||
|
||||
|
||||
@ -324,6 +324,8 @@ class GridFS(object):
|
||||
examined when performing the query
|
||||
- `read_preference` (optional): The read preference for
|
||||
this query.
|
||||
- `tag_sets` **DEPRECATED**
|
||||
- `secondary_acceptable_latency_ms` **DEPRECATED**
|
||||
- `compile_re` (optional): if ``False``, don't attempt to compile
|
||||
BSON regex objects into Python regexes. Return instances of
|
||||
:class:`~bson.regex.Regex` instead.
|
||||
|
||||
@ -628,7 +628,8 @@ class GridOutCursor(Cursor):
|
||||
"""
|
||||
def __init__(self, collection, spec=None, skip=0, limit=0,
|
||||
timeout=True, sort=None, max_scan=None,
|
||||
read_preference=None, compile_re=True):
|
||||
read_preference=None, tag_sets=None,
|
||||
secondary_acceptable_latency_ms=None, compile_re=True):
|
||||
"""Create a new cursor, similar to the normal
|
||||
:class:`~pymongo.cursor.Cursor`.
|
||||
|
||||
@ -648,7 +649,8 @@ class GridOutCursor(Cursor):
|
||||
super(GridOutCursor, self).__init__(
|
||||
collection.files, spec, skip=skip, limit=limit, timeout=timeout,
|
||||
sort=sort, max_scan=max_scan, read_preference=read_preference,
|
||||
compile_re=compile_re)
|
||||
secondary_acceptable_latency_ms=secondary_acceptable_latency_ms,
|
||||
tag_sets=tag_sets, compile_re=compile_re)
|
||||
|
||||
def next(self):
|
||||
"""Get next GridOut object from cursor.
|
||||
|
||||
@ -823,6 +823,8 @@ class Collection(common.BaseObject):
|
||||
outgoing SON manipulators before returning.
|
||||
- `read_preference` (optional): The read preference for
|
||||
this query.
|
||||
- `tag_sets` **DEPRECATED**
|
||||
- `secondary_acceptable_latency_ms` **DEPRECATED**
|
||||
- `compile_re` (optional): if ``False``, don't attempt to compile
|
||||
BSON regex objects into Python regexes. Return instances of
|
||||
:class:`~bson.regex.Regex` instead.
|
||||
@ -857,7 +859,8 @@ class Collection(common.BaseObject):
|
||||
version **>= 1.5.1**
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Removed the `network_timeout`, `tag_sets`, and
|
||||
Removed the `network_timeout` parameter.
|
||||
Deprecated the `tag_sets`, and
|
||||
`secondary_acceptable_latency_ms` parameters.
|
||||
|
||||
.. versionadded:: 2.7
|
||||
|
||||
@ -14,11 +14,16 @@
|
||||
|
||||
|
||||
"""Functions and classes common to multiple pymongo modules."""
|
||||
|
||||
import sys
|
||||
from pymongo import read_preferences
|
||||
import warnings
|
||||
|
||||
from pymongo.auth import MECHANISMS
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.read_preferences import (make_read_preference,
|
||||
read_pref_mode_from_name,
|
||||
ReadPreference,
|
||||
ServerMode)
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from bson.binary import (OLD_UUID_SUBTYPE, UUID_SUBTYPE,
|
||||
JAVA_LEGACY, CSHARP_LEGACY)
|
||||
@ -194,7 +199,7 @@ def validate_timeout_or_none(option, value):
|
||||
def validate_read_preference(dummy, value):
|
||||
"""Validate a read preference.
|
||||
"""
|
||||
if not isinstance(value, read_preferences.ServerMode):
|
||||
if not isinstance(value, ServerMode):
|
||||
raise ConfigurationError("%r is not a "
|
||||
"valid read preference." % (value,))
|
||||
return value
|
||||
@ -204,7 +209,7 @@ def validate_read_preference_mode(dummy, name):
|
||||
"""Validate read preference mode for a MongoReplicaSetClient.
|
||||
"""
|
||||
try:
|
||||
return read_preferences.read_pref_mode_from_name(name)
|
||||
return read_pref_mode_from_name(name)
|
||||
except ValueError:
|
||||
raise ConfigurationError("Not a valid read preference")
|
||||
|
||||
@ -278,7 +283,8 @@ VALIDATORS = {
|
||||
'read_preference': validate_read_preference,
|
||||
'readpreference': validate_read_preference_mode,
|
||||
'readpreferencetags': validate_read_preference_tags,
|
||||
'acceptablelatencyms': validate_positive_float,
|
||||
'latencythresholdms': validate_positive_float,
|
||||
'secondaryacceptablelatencyms': validate_positive_float,
|
||||
'auto_start_request': validate_boolean,
|
||||
'use_greenlets': validate_boolean,
|
||||
'authmechanism': validate_auth_mechanism,
|
||||
@ -329,25 +335,24 @@ class BaseObject(object):
|
||||
|
||||
def __init__(self, **options):
|
||||
|
||||
self.__read_pref = read_preferences.ReadPreference.PRIMARY
|
||||
self.__read_pref = None
|
||||
self.__uuid_subtype = OLD_UUID_SUBTYPE
|
||||
self.__write_concern = None
|
||||
self.__set_options(options)
|
||||
|
||||
if 'read_preference' not in options:
|
||||
mode = options.get('readpreference', 0)
|
||||
latency = options.get('secondaryacceptablelatencyms',
|
||||
options.get('latencythresholdms', 15))
|
||||
tags = options.get('readpreferencetags')
|
||||
self.__read_pref = make_read_preference(mode, latency, tags)
|
||||
|
||||
def __set_options(self, options):
|
||||
"""Validates and sets all options passed to this object."""
|
||||
wc_opts = {}
|
||||
for option, value in iteritems(options):
|
||||
if option == 'read_preference':
|
||||
self.__read_pref = validate_read_preference(option, value)
|
||||
elif option == 'readpreference':
|
||||
klass = read_preferences.read_pref_class_from_mode(value)
|
||||
if value == 0:
|
||||
# Primary, no tags
|
||||
self.__read_pref = klass()
|
||||
continue
|
||||
tags = options.get('readpreferencetags', None)
|
||||
self.__read_pref = klass(tags)
|
||||
elif option == 'uuidrepresentation':
|
||||
self.__uuid_subtype = validate_uuid_subtype(option, value)
|
||||
elif option in WRITE_CONCERN_OPTIONS:
|
||||
@ -436,6 +441,30 @@ class BaseObject(object):
|
||||
|
||||
read_preference = property(__get_read_pref, __set_read_pref)
|
||||
|
||||
def __get_latency(self):
|
||||
return self.__read_pref.latency_threshold_ms
|
||||
|
||||
def __set_latency(self, latency):
|
||||
warnings.warn("The secondary_acceptable_latency_ms attribute is "
|
||||
"deprecated", DeprecationWarning, stacklevel=2)
|
||||
mode = self.__read_pref.mode
|
||||
tag_sets = self.__read_pref.tag_sets
|
||||
self.__read_pref = make_read_preference(mode, latency, tag_sets)
|
||||
|
||||
secondary_acceptable_latency_ms = property(__get_latency, __set_latency)
|
||||
|
||||
def __get_tags(self):
|
||||
return self.__read_pref.tag_sets
|
||||
|
||||
def __set_tags(self, tag_sets):
|
||||
warnings.warn("The tag_sets attribute is deprecated",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
mode = self.__read_pref.mode
|
||||
latency = self.__read_pref.latency_threshold_ms
|
||||
self.__read_pref = make_read_preference(mode, latency, tag_sets)
|
||||
|
||||
tag_sets = property(__get_tags, __set_tags)
|
||||
|
||||
def __get_uuid_subtype(self):
|
||||
"""This attribute specifies which BSON Binary subtype is used when
|
||||
storing UUIDs. Historically UUIDs have been stored as BSON Binary
|
||||
|
||||
@ -14,6 +14,8 @@
|
||||
|
||||
"""Cursor class to iterate over Mongo query results."""
|
||||
import copy
|
||||
import warnings
|
||||
|
||||
from collections import deque
|
||||
|
||||
from bson import RE_TYPE
|
||||
@ -71,8 +73,9 @@ class Cursor(object):
|
||||
timeout=True, snapshot=False, tailable=False, sort=None,
|
||||
max_scan=None, as_class=None,
|
||||
await_data=False, partial=False, manipulate=True,
|
||||
read_preference=None, exhaust=False, compile_re=True,
|
||||
_uuid_subtype=None):
|
||||
read_preference=None, tag_sets=None,
|
||||
secondary_acceptable_latency_ms=None,
|
||||
exhaust=False, compile_re=True, _uuid_subtype=None):
|
||||
"""Create a new cursor.
|
||||
|
||||
Should not be called directly by application developers - see
|
||||
@ -148,7 +151,6 @@ class Cursor(object):
|
||||
self.__comment = None
|
||||
self.__as_class = as_class
|
||||
self.__manipulate = manipulate
|
||||
self.__read_preference = read_preference or collection.read_preference
|
||||
self.__tz_aware = collection.database.connection.tz_aware
|
||||
self.__compile_re = compile_re
|
||||
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
|
||||
@ -158,10 +160,21 @@ class Cursor(object):
|
||||
self.__retrieved = 0
|
||||
self.__killed = False
|
||||
|
||||
self.__read_preference = read_preference or collection.read_preference
|
||||
if secondary_acceptable_latency_ms or tag_sets:
|
||||
warnings.warn("The secondary_acceptable_latency_ms "
|
||||
"and tag_sets options are deprecated",
|
||||
DeprecationWarning, stacklevel=3)
|
||||
mode = self.__read_preference.mode
|
||||
tags = tag_sets or self.__read_preference.tag_sets
|
||||
latency = (secondary_acceptable_latency_ms or
|
||||
self.__read_preference.latency_threshold_ms)
|
||||
self.__read_preference = make_read_preference(mode, latency, tags)
|
||||
|
||||
self.__query_flags = 0
|
||||
if tailable:
|
||||
self.__query_flags |= _QUERY_OPTIONS["tailable_cursor"]
|
||||
if self.__read_preference != ReadPreference.PRIMARY:
|
||||
if self.__read_preference.mode != ReadPreference.PRIMARY.mode:
|
||||
self.__query_flags |= _QUERY_OPTIONS["slave_okay"]
|
||||
if not timeout:
|
||||
self.__query_flags |= _QUERY_OPTIONS["no_timeout"]
|
||||
@ -300,7 +313,7 @@ class Cursor(object):
|
||||
# PRIMARY to avoid problems with mongos versions that
|
||||
# don't support read preferences.
|
||||
if (self.__collection.database.connection.is_mongos and
|
||||
self.__read_preference != ReadPreference.PRIMARY):
|
||||
self.__read_preference.mode != ReadPreference.PRIMARY.mode):
|
||||
|
||||
# For maximum backwards compatibility, don't set $readPreference
|
||||
# for SECONDARY_PREFERRED unless tags are in use. Just rely on
|
||||
|
||||
@ -295,25 +295,36 @@ class Database(common.BaseObject):
|
||||
fields = helpers._fields_list_to_dict(fields)
|
||||
command.update(kwargs)
|
||||
|
||||
orig = mode = read_preference or self.read_preference
|
||||
orig = pref = read_preference or self.read_preference
|
||||
latency = kwargs.pop('secondary_acceptable_latency_ms', None)
|
||||
tags = kwargs.pop('tags_sets', None)
|
||||
if latency or tags:
|
||||
warnings.warn("The secondary_acceptable_latency_ms "
|
||||
"and tag_sets options are deprecated",
|
||||
DeprecationWarning, stacklevel=3)
|
||||
mode = orig.mode
|
||||
tags = tags or orig.tag_sets
|
||||
latency = latency or orig.latency_threshold_ms
|
||||
orig = make_read_preference(mode, latency, tags)
|
||||
|
||||
if command_name not in SECONDARY_OK_COMMANDS:
|
||||
mode = ReadPreference.PRIMARY
|
||||
pref = ReadPreference.PRIMARY
|
||||
|
||||
# Special-case: mapreduce can go to secondaries only if inline
|
||||
elif command_name == 'mapreduce':
|
||||
out = command.get('out')
|
||||
if not isinstance(out, dict) or not out.get('inline'):
|
||||
mode = ReadPreference.PRIMARY
|
||||
pref = ReadPreference.PRIMARY
|
||||
|
||||
# Special-case: aggregate with $out cannot go to secondaries.
|
||||
elif command_name == 'aggregate':
|
||||
for stage in command.get('pipeline', []):
|
||||
if '$out' in stage:
|
||||
mode = ReadPreference.PRIMARY
|
||||
pref = ReadPreference.PRIMARY
|
||||
break
|
||||
|
||||
# Warn if mode will override read_preference.
|
||||
if mode != orig:
|
||||
if pref.mode != orig.mode:
|
||||
warnings.warn("%s does not support %s read preference "
|
||||
"and will be routed to the primary instead." %
|
||||
(command_name, orig.name), UserWarning, stacklevel=3)
|
||||
@ -323,7 +334,7 @@ class Database(common.BaseObject):
|
||||
fields=fields,
|
||||
limit=-1,
|
||||
as_class=as_class,
|
||||
read_preference=mode,
|
||||
read_preference=pref,
|
||||
compile_re=compile_re,
|
||||
_uuid_subtype=uuid_subtype)
|
||||
for doc in cursor:
|
||||
@ -391,11 +402,13 @@ class Database(common.BaseObject):
|
||||
Python-incompatible regular expressions, for example from
|
||||
``currentOp``
|
||||
- `read_preference`: The read preference for this operation.
|
||||
- `tag_sets` **DEPRECATED**
|
||||
- `secondary_acceptable_latency_ms` **DEPRECATED**
|
||||
- `**kwargs` (optional): additional keyword arguments will
|
||||
be added to the command document before it is sent
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Removed the `tag_sets` and `secondary_acceptable_latency_ms`
|
||||
Deprecated the `tag_sets` and `secondary_acceptable_latency_ms`
|
||||
options.
|
||||
.. versionchanged:: 2.7
|
||||
Added ``compile_re`` option.
|
||||
|
||||
@ -203,10 +203,6 @@ class MongoClient(common.BaseObject):
|
||||
:class:`~pymongo.errors.AutoReconnect` "not master".
|
||||
See :class:`~pymongo.read_preferences.ReadPreference` for all
|
||||
available read preference options.
|
||||
- `acceptableLatencyMS`: (integer) When used with mongos
|
||||
high availability, any mongos whose ping time is within
|
||||
acceptable_latency_ms of the nearest member may be chosen
|
||||
as the new primary during a failover. Default 15 milliseconds.
|
||||
|
||||
| **SSL configuration:**
|
||||
|
||||
@ -299,7 +295,6 @@ class MongoClient(common.BaseObject):
|
||||
self.__repl = options.get('replicaset')
|
||||
self.__direct = len(seeds) == 1 and not self.__repl
|
||||
|
||||
self.__acceptable_latency = options.get('acceptablelatencyms', 15)
|
||||
self.__net_timeout = options.get('sockettimeoutms')
|
||||
self.__conn_timeout = options.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = options.get('waitqueuetimeoutms')
|
||||
@ -679,22 +674,6 @@ class MongoClient(common.BaseObject):
|
||||
return self.__member_property(
|
||||
'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
|
||||
|
||||
@property
|
||||
def acceptable_latency_ms(self):
|
||||
"""When used with mongos high availability, any mongos whose ping time
|
||||
is within acceptable_latency_ms of the nearest mongos may be
|
||||
chosen as the new primary during a failover. Default 15 milliseconds.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
|
||||
.. note:: ``acceptable_latency_ms`` is ignored when talking
|
||||
to a replica set *through* a mongos. The equivalent is the
|
||||
localThreshold_ command line option.
|
||||
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption--localThreshold
|
||||
"""
|
||||
return self.__acceptable_latency
|
||||
|
||||
def __simple_command(self, sock_info, dbname, spec):
|
||||
"""Send a command to the server.
|
||||
"""
|
||||
@ -773,7 +752,7 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
Doesn't modify state.
|
||||
"""
|
||||
latency = self.__acceptable_latency
|
||||
latency = self.read_preference.latency_threshold_ms
|
||||
# Only used for mongos high availability, ping_time is in seconds.
|
||||
fastest = min([
|
||||
member.ping_time for member in candidates])
|
||||
|
||||
@ -553,9 +553,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
- `read_preference`: The read preference for this client.
|
||||
See :class:`~pymongo.read_preferences.ReadPreference` for available
|
||||
options.
|
||||
- `acceptableLatencyMS`: (integer) Any replica-set member
|
||||
whose ping time is within acceptable_latency_ms of the
|
||||
nearest member may accept reads. Default 15 milliseconds.
|
||||
|
||||
| **SSL configuration:**
|
||||
|
||||
@ -646,7 +643,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
raise ConfigurationError("the replicaSet "
|
||||
"keyword parameter is required.")
|
||||
|
||||
self.__acceptable_latency = self.__opts.get('acceptablelatencyms', 15)
|
||||
self.__net_timeout = self.__opts.get('sockettimeoutms')
|
||||
self.__conn_timeout = self.__opts.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
|
||||
@ -1006,18 +1002,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
return rs_state.primary_member.max_write_batch_size
|
||||
return common.MAX_WRITE_BATCH_SIZE
|
||||
|
||||
@property
|
||||
def acceptable_latency_ms(self):
|
||||
"""Any replica-set member whose ping time is within
|
||||
acceptable_latency_ms of the nearest member may accept
|
||||
reads. Defaults to 15 milliseconds.
|
||||
|
||||
See :class:`~pymongo.read_preferences.ReadPreference`.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
return self.__acceptable_latency
|
||||
|
||||
@property
|
||||
def auto_start_request(self):
|
||||
"""Is auto_start_request enabled?
|
||||
@ -1662,7 +1646,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
members=members,
|
||||
mode=pref.mode,
|
||||
tag_sets=pref.tag_sets,
|
||||
latency=self.__acceptable_latency)
|
||||
latency=pref.latency_threshold_ms)
|
||||
|
||||
if not member:
|
||||
# Ran out of members to try
|
||||
|
||||
@ -63,14 +63,20 @@ class ServerMode(object):
|
||||
"""Base class for all read preferences.
|
||||
"""
|
||||
|
||||
__slots__ = ("__mode", "__mongos_mode", "__tag_sets")
|
||||
__slots__ = ("__mongos_mode", "__mode", "__latency", "__tag_sets")
|
||||
|
||||
def __init__(self, mode, tag_sets=None):
|
||||
def __init__(self, mode, latency_threshold_ms=15, tag_sets=None):
|
||||
if mode == _PRIMARY and tag_sets is not None:
|
||||
raise ConfigurationError("PRIMARY cannot be combined with tags")
|
||||
self.__mode = mode
|
||||
raise ConfigurationError("Read preference primary "
|
||||
"cannot be combined with tags")
|
||||
self.__mongos_mode = _MONGOS_MODES[mode]
|
||||
self.__mode = mode
|
||||
self.__tag_sets = _validate_tag_sets(tag_sets)
|
||||
try:
|
||||
self.__latency = float(latency_threshold_ms)
|
||||
except (ValueError, TypeError):
|
||||
raise ConfigurationError("latency_threshold_ms must "
|
||||
"be a positive integer or float")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@ -92,6 +98,17 @@ class ServerMode(object):
|
||||
"""
|
||||
return self.__mode
|
||||
|
||||
@property
|
||||
def latency_threshold_ms(self):
|
||||
"""int - Any replica-set member whose ping time is within
|
||||
`~latency_threshold_ms` of the nearest member may accept reads.
|
||||
When used with mongos high availability, any mongos whose ping
|
||||
time is within `~latency_threshold_ms` of the nearest mongos
|
||||
may be chosen as the new mongos during a failover. Default 15
|
||||
milliseconds.
|
||||
"""
|
||||
return self.__latency
|
||||
|
||||
@property
|
||||
def tag_sets(self):
|
||||
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
|
||||
@ -108,10 +125,13 @@ class ServerMode(object):
|
||||
return self.__tag_sets or [{}]
|
||||
|
||||
def __repr__(self):
|
||||
return "%s(tag_sets=%r)" % (self.name, self.__tag_sets)
|
||||
return "%s(latency_threshold_ms=%d, tag_sets=%r)" % (
|
||||
self.name, self.__latency, self.__tag_sets)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.mode == other.mode and self.tag_sets == other.tag_sets
|
||||
return (self.mode == other.mode and
|
||||
self.latency_threshold_ms == other.latency_threshold_ms and
|
||||
self.tag_sets == other.tag_sets)
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
@ -125,19 +145,17 @@ class Primary(ServerMode):
|
||||
* When connected to a mongos queries are sent to the primary of a shard.
|
||||
* When connected to a replica set queries are sent to the primary of
|
||||
the replica set.
|
||||
|
||||
:Parameters:
|
||||
- `latency_threshold_ms`: Used for mongos high availability. The
|
||||
:attr:`~latency_threshold_ms` when selecting a failover mongos.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(Primary, self).__init__(_PRIMARY)
|
||||
def __init__(self, latency_threshold_ms=15):
|
||||
super(Primary, self).__init__(_PRIMARY, latency_threshold_ms)
|
||||
|
||||
def __repr__(self):
|
||||
return "Primary()"
|
||||
|
||||
def __eq__(self, other):
|
||||
return other.mode == _PRIMARY
|
||||
|
||||
def __ne__(self, other):
|
||||
return other.mode != _PRIMARY
|
||||
return "Primary(latency_threshold_ms=%d)" % self.latency_threshold_ms
|
||||
|
||||
|
||||
class PrimaryPreferred(ServerMode):
|
||||
@ -151,12 +169,15 @@ class PrimaryPreferred(ServerMode):
|
||||
available, otherwise a secondary.
|
||||
|
||||
:Parameters:
|
||||
- `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when
|
||||
selecting a secondary.
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
|
||||
available.
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
|
||||
def __init__(self, latency_threshold_ms=15, tag_sets=None):
|
||||
super(PrimaryPreferred, self).__init__(
|
||||
_PRIMARY_PREFERRED, latency_threshold_ms, tag_sets)
|
||||
|
||||
|
||||
class Secondary(ServerMode):
|
||||
@ -170,11 +191,14 @@ class Secondary(ServerMode):
|
||||
secondaries. An error is raised if no secondaries are available.
|
||||
|
||||
:Parameters:
|
||||
- `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when
|
||||
selecting a secondary.
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(Secondary, self).__init__(_SECONDARY, tag_sets)
|
||||
def __init__(self, latency_threshold_ms=15, tag_sets=None):
|
||||
super(Secondary, self).__init__(
|
||||
_SECONDARY, latency_threshold_ms, tag_sets)
|
||||
|
||||
|
||||
class SecondaryPreferred(ServerMode):
|
||||
@ -188,11 +212,14 @@ class SecondaryPreferred(ServerMode):
|
||||
secondaries, or the primary if no secondary is available.
|
||||
|
||||
:Parameters:
|
||||
- `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when
|
||||
selecting a secondary.
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
|
||||
def __init__(self, latency_threshold_ms=15, tag_sets=None):
|
||||
super(SecondaryPreferred, self).__init__(
|
||||
_SECONDARY_PREFERRED, latency_threshold_ms, tag_sets)
|
||||
|
||||
|
||||
class Nearest(ServerMode):
|
||||
@ -206,21 +233,26 @@ class Nearest(ServerMode):
|
||||
members.
|
||||
|
||||
:Parameters:
|
||||
- `latency_threshold_ms`: The :attr:`~latency_threshold_ms` when
|
||||
selecting a secondary.
|
||||
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
|
||||
"""
|
||||
|
||||
def __init__(self, tag_sets=None):
|
||||
super(Nearest, self).__init__(_NEAREST, tag_sets)
|
||||
def __init__(self, latency_threshold_ms=15, tag_sets=None):
|
||||
super(Nearest, self).__init__(
|
||||
_NEAREST, latency_threshold_ms, tag_sets)
|
||||
|
||||
|
||||
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
|
||||
Secondary, SecondaryPreferred, Nearest)
|
||||
|
||||
|
||||
def read_pref_class_from_mode(mode):
|
||||
"""Get the read preference class for a specific mode.
|
||||
"""
|
||||
return _ALL_READ_PREFERENCES[mode]
|
||||
def make_read_preference(mode, latency_threshold_ms, tag_sets):
|
||||
if mode == _PRIMARY:
|
||||
if tag_sets not in (None, [{}]):
|
||||
raise ConfigurationError("Read preference primary "
|
||||
"cannot be combined with tags")
|
||||
return Primary(latency_threshold_ms)
|
||||
return _ALL_READ_PREFERENCES[mode](latency_threshold_ms, tag_sets)
|
||||
|
||||
|
||||
_MODES = (
|
||||
|
||||
@ -653,6 +653,11 @@ class TestReadPreference(HATestCase):
|
||||
|
||||
assertReadFrom(other_secondary, NEAREST)
|
||||
|
||||
# High secondaryAcceptableLatencyMS, should read from all members
|
||||
assertReadFromAll(
|
||||
[primary, secondary, other_secondary],
|
||||
NEAREST, secondary_acceptable_latency_ms=1000*1000)
|
||||
|
||||
self.clear_ping_times()
|
||||
|
||||
assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}])
|
||||
@ -815,6 +820,18 @@ class TestReadPreference(HATestCase):
|
||||
"Changing from tags %s to tags %s never unpinned" % (
|
||||
tags0, tags1))
|
||||
|
||||
# Finally, verify changing the secondary_acceptable_latency_ms unpins
|
||||
# the member.
|
||||
for _ in range(1000):
|
||||
host = utils.read_from_which_host(c, SECONDARY, None, 15)
|
||||
new_host = utils.read_from_which_host(c, SECONDARY, None, 20)
|
||||
if host != new_host:
|
||||
break
|
||||
else:
|
||||
self.fail(
|
||||
"Changing secondary_acceptable_latency_ms from 15 to 20"
|
||||
" never unpinned")
|
||||
|
||||
def tearDown(self):
|
||||
self.c.close()
|
||||
super(TestReadPreference, self).tearDown()
|
||||
|
||||
@ -95,15 +95,15 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
self.assertRaises(ConfigurationError, ServerMode,
|
||||
0, tag_sets=[{}])
|
||||
|
||||
S = Secondary([{}])
|
||||
S = Secondary(tag_sets=[{}])
|
||||
self.assertEqual([{}],
|
||||
self._get_client(read_preference=S).read_preference.tag_sets)
|
||||
|
||||
S = Secondary([{'k': 'v'}])
|
||||
S = Secondary(tag_sets=[{'k': 'v'}])
|
||||
self.assertEqual([{'k': 'v'}],
|
||||
self._get_client(read_preference=S).read_preference.tag_sets)
|
||||
|
||||
S = Secondary([{'k': 'v'}, {}])
|
||||
S = Secondary(tag_sets=[{'k': 'v'}, {}])
|
||||
self.assertEqual([{'k': 'v'}, {}],
|
||||
self._get_client(read_preference=S).read_preference.tag_sets)
|
||||
|
||||
@ -118,16 +118,16 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
|
||||
def test_latency_validation(self):
|
||||
self.assertEqual(17, self._get_client(
|
||||
acceptableLatencyMS=17
|
||||
).acceptable_latency_ms)
|
||||
latencyThresholdMS=17
|
||||
).read_preference.latency_threshold_ms)
|
||||
|
||||
self.assertEqual(42, self._get_client(
|
||||
acceptableLatencyMS=42
|
||||
).acceptable_latency_ms)
|
||||
latencyThresholdMS=42
|
||||
).read_preference.latency_threshold_ms)
|
||||
|
||||
self.assertEqual(666, self._get_client(
|
||||
acceptablelatencyms=666
|
||||
).acceptable_latency_ms)
|
||||
latencythresholdms=666
|
||||
).read_preference.latency_threshold_ms)
|
||||
|
||||
def test_primary(self):
|
||||
self.assertReadsFrom('primary',
|
||||
@ -151,11 +151,11 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
read_preference=ReadPreference.SECONDARY_PREFERRED)
|
||||
|
||||
def test_nearest(self):
|
||||
# With high acceptableLatencyMS, expect to read from any
|
||||
# With high latencyThresholdMS, expect to read from any
|
||||
# member
|
||||
c = self._get_client(
|
||||
read_preference=ReadPreference.NEAREST,
|
||||
acceptableLatencyMS=10000, # 10 seconds
|
||||
latencyThresholdMS=10000, # 10 seconds
|
||||
auto_start_request=False)
|
||||
|
||||
data_members = set(self.hosts).difference(set(self.arbiters))
|
||||
@ -203,7 +203,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
|
||||
replicaSet=self.name, auto_start_request=False,
|
||||
# Effectively ignore members' ping times so we can test the effect
|
||||
# of ReadPreference modes only
|
||||
acceptableLatencyMS=1000*1000)
|
||||
latencyThresholdMS=1000*1000)
|
||||
self.client_version = Version.from_client(self.c)
|
||||
|
||||
def tearDown(self):
|
||||
@ -538,7 +538,7 @@ class TestMongosConnection(unittest.TestCase):
|
||||
None, [{}]
|
||||
):
|
||||
# Create a client e.g. with read_preference=NEAREST
|
||||
c = get_client(read_preference=mode(tag_sets))
|
||||
c = get_client(read_preference=mode(tag_sets=tag_sets))
|
||||
|
||||
self.assertEqual(is_mongos, c.is_mongos)
|
||||
cursor = c.pymongo_test.test.find()
|
||||
@ -577,7 +577,7 @@ class TestMongosConnection(unittest.TestCase):
|
||||
[{'dc': 'la'}, {'dc': 'sf'}],
|
||||
[{'dc': 'la'}, {'dc': 'sf'}, {}],
|
||||
):
|
||||
c = get_client(read_preference=mode(tag_sets))
|
||||
c = get_client(read_preference=mode(tag_sets=tag_sets))
|
||||
|
||||
self.assertEqual(is_mongos, c.is_mongos)
|
||||
cursor = c.pymongo_test.test.find()
|
||||
|
||||
@ -238,11 +238,11 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
|
||||
|
||||
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
|
||||
secondary = Secondary(tag_sets)
|
||||
secondary = Secondary(tag_sets=tag_sets)
|
||||
c = MongoReplicaSetClient(pair, replicaSet=self.name, max_pool_size=25,
|
||||
document_class=SON, tz_aware=True,
|
||||
read_preference=secondary,
|
||||
acceptablelatencyms=77)
|
||||
secondaryacceptablelatencyms=77)
|
||||
self.assertEqual(c.primary, self.primary)
|
||||
self.assertEqual(c.hosts, self.hosts)
|
||||
self.assertEqual(c.arbiters, self.arbiters)
|
||||
@ -257,7 +257,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
self.assertEqual(
|
||||
secondary, cursor._Cursor__read_preference)
|
||||
|
||||
nearest = Nearest([{'dc': 'ny'}, {}])
|
||||
nearest = Nearest(tag_sets=[{'dc': 'ny'}, {}])
|
||||
cursor = c.pymongo_test.test.find(read_preference=nearest)
|
||||
|
||||
self.assertEqual(
|
||||
@ -1023,7 +1023,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
|
||||
def test_pinned_member(self):
|
||||
latency = 1000 * 1000
|
||||
client = self._get_client(acceptablelatencyms=latency)
|
||||
client = self._get_client(secondaryacceptablelatencyms=latency)
|
||||
|
||||
host = read_from_which_host(client, ReadPreference.SECONDARY)
|
||||
self.assertTrue(host in client.secondaries)
|
||||
|
||||
@ -276,8 +276,9 @@ class RendezvousThread(threading.Thread):
|
||||
|
||||
def read_from_which_host(
|
||||
rsc,
|
||||
mode,
|
||||
pref,
|
||||
tag_sets=None,
|
||||
secondary_acceptable_latency_ms=None
|
||||
):
|
||||
"""Read from a MongoReplicaSetClient with the given Read Preference mode,
|
||||
tags, and acceptable latency. Return the 'host:port' which was read from.
|
||||
@ -288,10 +289,16 @@ def read_from_which_host(
|
||||
- `tag_sets`: List of dicts of tags for data-center-aware reads
|
||||
"""
|
||||
db = rsc.pymongo_test
|
||||
db.read_preference = mode
|
||||
|
||||
if isinstance(tag_sets, dict):
|
||||
tag_sets = [tag_sets]
|
||||
db.tag_sets = tag_sets or [{}]
|
||||
if tag_sets or secondary_acceptable_latency_ms:
|
||||
mode = pref.mode
|
||||
latency = secondary_acceptable_latency_ms or pref.latency_threshold_ms
|
||||
tags = tag_sets or pref.tag_sets
|
||||
pref = pref.__class__(mode, latency, tags)
|
||||
|
||||
db.read_preference = pref
|
||||
|
||||
cursor = db.test.find()
|
||||
try:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user