PYTHON-526 secondaryAcceptableLatencyMS changes.
- Renamed to acceptableLatencyMS since it is also used in choosing a new mongos during mongos HA failover. - Moved to a global immutable setting. The expectation is that the value would be determined by a network/system admin. Changing it per operation doesn't make a lot of sense. Note - it's possible the name of this option may change again before 3.0 is released.
This commit is contained in:
parent
9404ff1f12
commit
2f86207246
@ -25,7 +25,6 @@
|
||||
.. autoattribute:: database
|
||||
.. autoattribute:: read_preference
|
||||
.. autoattribute:: tag_sets
|
||||
.. autoattribute:: secondary_acceptable_latency_ms
|
||||
.. autoattribute:: write_concern
|
||||
.. autoattribute:: uuid_subtype
|
||||
.. automethod:: insert(doc_or_docs[, manipulate=True[, check_keys=True[, continue_on_error=False[, **kwargs]]]])
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
.. automodule:: pymongo.cursor
|
||||
:synopsis: Tools for iterating over MongoDB query results
|
||||
|
||||
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], secondary_acceptable_latency_ms=None, exhaust=False)
|
||||
.. autoclass:: pymongo.cursor.Cursor(collection, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, as_class=None, await_data=False, partial=False, manipulate=True, read_preference=ReadPreference.PRIMARY, tag_sets=[{}], exhaust=False)
|
||||
:members:
|
||||
|
||||
.. describe:: c[index]
|
||||
|
||||
@ -25,7 +25,6 @@
|
||||
|
||||
.. autoattribute:: read_preference
|
||||
.. autoattribute:: tag_sets
|
||||
.. autoattribute:: secondary_acceptable_latency_ms
|
||||
.. autoattribute:: write_concern
|
||||
.. autoattribute:: uuid_subtype
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@
|
||||
.. autoattribute:: max_wire_version
|
||||
.. autoattribute:: read_preference
|
||||
.. autoattribute:: tag_sets
|
||||
.. autoattribute:: secondary_acceptable_latency_ms
|
||||
.. autoattribute:: acceptable_latency_ms
|
||||
.. autoattribute:: write_concern
|
||||
.. autoattribute:: uuid_subtype
|
||||
.. autoattribute:: is_locked
|
||||
|
||||
@ -32,7 +32,7 @@
|
||||
.. autoattribute:: auto_start_request
|
||||
.. autoattribute:: read_preference
|
||||
.. autoattribute:: tag_sets
|
||||
.. autoattribute:: secondary_acceptable_latency_ms
|
||||
.. autoattribute:: acceptable_latency_ms
|
||||
.. autoattribute:: write_concern
|
||||
.. autoattribute:: uuid_subtype
|
||||
.. automethod:: database_names
|
||||
|
||||
@ -200,7 +200,7 @@ per-query basis, e.g.::
|
||||
>>> db.collection.find_one(read_preference=ReadPreference.PRIMARY)
|
||||
|
||||
Reads are configured using three options: **read_preference**, **tag_sets**,
|
||||
and **secondary_acceptable_latency_ms**.
|
||||
and **acceptableLatencyMS**.
|
||||
|
||||
**read_preference**:
|
||||
|
||||
@ -210,18 +210,18 @@ and **secondary_acceptable_latency_ms**.
|
||||
|
||||
- ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
|
||||
none, read from a secondary matching your choice of ``tag_sets`` and
|
||||
``secondary_acceptable_latency_ms``.
|
||||
``acceptableLatencyMS``.
|
||||
|
||||
- ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and
|
||||
``secondary_acceptable_latency_ms``. If no matching secondary is available,
|
||||
``acceptableLatencyMS``. If no matching secondary is available,
|
||||
raise :class:`~pymongo.errors.AutoReconnect`.
|
||||
|
||||
- ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of
|
||||
``tag_sets`` and ``secondary_acceptable_latency_ms`` if available, otherwise
|
||||
``tag_sets`` and ``acceptableLatencyMS`` if available, otherwise
|
||||
from primary (regardless of the primary's tags and latency).
|
||||
|
||||
- ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and
|
||||
``secondary_acceptable_latency_ms``.
|
||||
``acceptableLatencyMS``.
|
||||
|
||||
**tag_sets**:
|
||||
|
||||
@ -248,22 +248,22 @@ and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an
|
||||
additional fallback, specify a final, empty tag set, ``{}``, which means "read
|
||||
from any member that matches the mode, ignoring tags."
|
||||
|
||||
**secondary_acceptable_latency_ms**:
|
||||
**acceptableLatencyMS**:
|
||||
|
||||
If multiple members match the mode and tag sets, MongoReplicaSetClient reads
|
||||
from among the nearest members, chosen according to ping time. By default,
|
||||
only members whose ping times are within 15 milliseconds of the nearest
|
||||
are used for queries. You can choose to distribute reads among members with
|
||||
higher latencies by setting ``secondary_acceptable_latency_ms`` to a larger
|
||||
higher latencies by setting ``acceptableLatencyMS`` to a larger
|
||||
number. In that case, MongoReplicaSetClient distributes reads among matching
|
||||
members within ``secondary_acceptable_latency_ms`` of the closest member's
|
||||
members within ``acceptableLatencyMS`` of the closest member's
|
||||
ping time.
|
||||
|
||||
.. note:: ``secondary_acceptable_latency_ms`` is ignored when talking to a
|
||||
.. note:: ``acceptableLatencyMS`` is ignored when talking to a
|
||||
replica set *through* a mongos. The equivalent is the localThreshold_ command
|
||||
line option.
|
||||
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption--localThreshold
|
||||
|
||||
Health Monitoring
|
||||
'''''''''''''''''
|
||||
|
||||
@ -325,11 +325,6 @@ class GridFS(object):
|
||||
- `read_preference` (optional): The read preference for
|
||||
this query.
|
||||
- `tag_sets` (optional): The tag sets for this query.
|
||||
- `secondary_acceptable_latency_ms` (optional): Any replica-set
|
||||
member whose ping time is within secondary_acceptable_latency_ms of
|
||||
the nearest member may accept reads. Default 15 milliseconds.
|
||||
**Ignored by mongos** and must be configured on the command line.
|
||||
See the localThreshold_ option for more information.
|
||||
- `compile_re` (optional): if ``False``, don't attempt to compile
|
||||
BSON regex objects into Python regexes. Return instances of
|
||||
:class:`~bson.regex.Regex` instead.
|
||||
@ -341,7 +336,6 @@ class GridFS(object):
|
||||
|
||||
.. versionadded:: 2.7
|
||||
.. mongodoc:: find
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
"""
|
||||
return GridOutCursor(self.__collection, *args, **kwargs)
|
||||
|
||||
|
||||
@ -625,8 +625,7 @@ class GridOutCursor(Cursor):
|
||||
"""
|
||||
def __init__(self, collection, spec=None, skip=0, limit=0,
|
||||
timeout=True, sort=None, max_scan=None,
|
||||
read_preference=None, tag_sets=None,
|
||||
secondary_acceptable_latency_ms=None, compile_re=True):
|
||||
read_preference=None, tag_sets=None, compile_re=True):
|
||||
"""Create a new cursor, similar to the normal
|
||||
:class:`~pymongo.cursor.Cursor`.
|
||||
|
||||
@ -643,14 +642,11 @@ class GridOutCursor(Cursor):
|
||||
# Copy these settings from collection if they are not set by caller.
|
||||
read_preference = read_preference or collection.files.read_preference
|
||||
tag_sets = tag_sets or collection.files.tag_sets
|
||||
latency = (secondary_acceptable_latency_ms
|
||||
or collection.files.secondary_acceptable_latency_ms)
|
||||
|
||||
super(GridOutCursor, self).__init__(
|
||||
collection.files, spec, skip=skip, limit=limit, timeout=timeout,
|
||||
sort=sort, max_scan=max_scan, read_preference=read_preference,
|
||||
secondary_acceptable_latency_ms=latency, compile_re=compile_re,
|
||||
tag_sets=tag_sets)
|
||||
compile_re=compile_re, tag_sets=tag_sets)
|
||||
|
||||
def next(self):
|
||||
"""Get next GridOut object from cursor.
|
||||
|
||||
@ -88,8 +88,6 @@ class Collection(common.BaseObject):
|
||||
super(Collection, self).__init__(
|
||||
read_preference=database.read_preference,
|
||||
tag_sets=database.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
database.secondary_acceptable_latency_ms),
|
||||
uuidrepresentation=database.uuid_subtype,
|
||||
**database.write_concern)
|
||||
|
||||
@ -784,11 +782,6 @@ class Collection(common.BaseObject):
|
||||
- `read_preference` (optional): The read preference for
|
||||
this query.
|
||||
- `tag_sets` (optional): The tag sets for this query.
|
||||
- `secondary_acceptable_latency_ms` (optional): Any replica-set
|
||||
member whose ping time is within secondary_acceptable_latency_ms of
|
||||
the nearest member may accept reads. Default 15 milliseconds.
|
||||
**Ignored by mongos** and must be configured on the command line.
|
||||
See the localThreshold_ option for more information.
|
||||
- `compile_re` (optional): if ``False``, don't attempt to compile
|
||||
BSON regex objects into Python regexes. Return instances of
|
||||
:class:`~bson.regex.Regex` instead.
|
||||
@ -823,7 +816,8 @@ class Collection(common.BaseObject):
|
||||
version **>= 1.5.1**
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Removed the ``network_timeout`` parameter.
|
||||
Removed the `network_timeout` and
|
||||
`secondary_acceptable_latency_ms` parameters.
|
||||
|
||||
.. versionadded:: 2.7
|
||||
The ``compile_re`` parameter.
|
||||
@ -848,15 +842,11 @@ class Collection(common.BaseObject):
|
||||
The `tailable` parameter.
|
||||
|
||||
.. mongodoc:: find
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
"""
|
||||
if not 'read_preference' in kwargs:
|
||||
kwargs['read_preference'] = self.read_preference
|
||||
if not 'tag_sets' in kwargs:
|
||||
kwargs['tag_sets'] = self.tag_sets
|
||||
if not 'secondary_acceptable_latency_ms' in kwargs:
|
||||
kwargs['secondary_acceptable_latency_ms'] = (
|
||||
self.secondary_acceptable_latency_ms)
|
||||
return Cursor(self, *args, **kwargs)
|
||||
|
||||
def parallel_scan(self, num_cursors, **kwargs):
|
||||
@ -907,8 +897,6 @@ class Collection(common.BaseObject):
|
||||
'numCursors': num_cursors,
|
||||
'read_preference': self.read_preference,
|
||||
'tag_sets': self.tag_sets,
|
||||
'secondary_acceptable_latency_ms': (
|
||||
self.secondary_acceptable_latency_ms),
|
||||
}
|
||||
command_kwargs.update(kwargs)
|
||||
|
||||
@ -1288,8 +1276,6 @@ class Collection(common.BaseObject):
|
||||
'pipeline': pipeline,
|
||||
'read_preference': self.read_preference,
|
||||
'tag_sets': self.tag_sets,
|
||||
'secondary_acceptable_latency_ms': (
|
||||
self.secondary_acceptable_latency_ms),
|
||||
}
|
||||
|
||||
command_kwargs.update(kwargs)
|
||||
@ -1363,8 +1349,6 @@ class Collection(common.BaseObject):
|
||||
uuid_subtype=self.uuid_subtype,
|
||||
read_preference=self.read_preference,
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
**kwargs)["retval"]
|
||||
|
||||
def rename(self, new_name, **kwargs):
|
||||
@ -1469,8 +1453,6 @@ class Collection(common.BaseObject):
|
||||
map=map, reduce=reduce,
|
||||
read_preference=self.read_preference,
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
out=out, **kwargs)
|
||||
|
||||
if full_response or not response.get('result'):
|
||||
@ -1519,8 +1501,6 @@ class Collection(common.BaseObject):
|
||||
uuid_subtype=self.uuid_subtype,
|
||||
read_preference=self.read_preference,
|
||||
tag_sets=self.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
self.secondary_acceptable_latency_ms),
|
||||
map=map, reduce=reduce,
|
||||
out={"inline": 1}, **kwargs)
|
||||
|
||||
|
||||
@ -271,8 +271,7 @@ VALIDATORS = {
|
||||
'read_preference': validate_read_preference,
|
||||
'readpreferencetags': validate_tag_sets,
|
||||
'tag_sets': validate_tag_sets,
|
||||
'secondaryacceptablelatencyms': validate_positive_float,
|
||||
'secondary_acceptable_latency_ms': validate_positive_float,
|
||||
'acceptablelatencyms': validate_positive_float,
|
||||
'auto_start_request': validate_boolean,
|
||||
'use_greenlets': validate_boolean,
|
||||
'authmechanism': validate_auth_mechanism,
|
||||
@ -341,7 +340,6 @@ class BaseObject(object):
|
||||
|
||||
self.__read_pref = ReadPreference.PRIMARY
|
||||
self.__tag_sets = [{}]
|
||||
self.__secondary_acceptable_latency_ms = 15
|
||||
self.__uuid_subtype = OLD_UUID_SUBTYPE
|
||||
self.__write_concern = WriteConcern()
|
||||
self.__set_options(options)
|
||||
@ -368,12 +366,6 @@ class BaseObject(object):
|
||||
self.__tag_sets = validate_tag_sets(option, value)
|
||||
elif option == 'uuidrepresentation':
|
||||
self.__uuid_subtype = validate_uuid_subtype(option, value)
|
||||
elif option in (
|
||||
'secondaryacceptablelatencyms',
|
||||
'secondary_acceptable_latency_ms'
|
||||
):
|
||||
self.__secondary_acceptable_latency_ms = \
|
||||
validate_positive_float(option, value)
|
||||
elif option in WRITE_CONCERN_OPTIONS:
|
||||
if option == 'journal':
|
||||
self.__set_write_concern_option('j', value)
|
||||
@ -465,31 +457,6 @@ class BaseObject(object):
|
||||
|
||||
read_preference = property(__get_read_pref, __set_read_pref)
|
||||
|
||||
def __get_acceptable_latency(self):
|
||||
"""Any replica-set member whose ping time is within
|
||||
secondary_acceptable_latency_ms of the nearest member may accept
|
||||
reads. Defaults to 15 milliseconds.
|
||||
|
||||
See :class:`~pymongo.read_preferences.ReadPreference`.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
|
||||
.. note:: ``secondary_acceptable_latency_ms`` is ignored when talking
|
||||
to a replica set *through* a mongos. The equivalent is the
|
||||
localThreshold_ command line option.
|
||||
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
"""
|
||||
return self.__secondary_acceptable_latency_ms
|
||||
|
||||
def __set_acceptable_latency(self, value):
|
||||
"""Property setter for secondary_acceptable_latency_ms"""
|
||||
self.__secondary_acceptable_latency_ms = (validate_positive_float(
|
||||
'secondary_acceptable_latency_ms', value))
|
||||
|
||||
secondary_acceptable_latency_ms = property(
|
||||
__get_acceptable_latency, __set_acceptable_latency)
|
||||
|
||||
def __get_tag_sets(self):
|
||||
"""Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
|
||||
read only from members whose ``dc`` tag has the value ``"ny"``.
|
||||
|
||||
@ -69,8 +69,8 @@ class Cursor(object):
|
||||
max_scan=None, as_class=None,
|
||||
await_data=False, partial=False, manipulate=True,
|
||||
read_preference=ReadPreference.PRIMARY,
|
||||
tag_sets=[{}], secondary_acceptable_latency_ms=None,
|
||||
exhaust=False, compile_re=True, _uuid_subtype=None):
|
||||
tag_sets=[{}], exhaust=False, compile_re=True,
|
||||
_uuid_subtype=None):
|
||||
"""Create a new cursor.
|
||||
|
||||
Should not be called directly by application developers - see
|
||||
@ -148,7 +148,6 @@ class Cursor(object):
|
||||
self.__manipulate = manipulate
|
||||
self.__read_preference = read_preference
|
||||
self.__tag_sets = tag_sets
|
||||
self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
|
||||
self.__tz_aware = collection.database.connection.tz_aware
|
||||
self.__compile_re = compile_re
|
||||
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
|
||||
@ -235,7 +234,6 @@ class Cursor(object):
|
||||
"snapshot", "ordering", "explain", "hint",
|
||||
"batch_size", "max_scan", "as_class",
|
||||
"manipulate", "read_preference", "tag_sets",
|
||||
"secondary_acceptable_latency_ms",
|
||||
"uuid_subtype", "compile_re", "query_flags")
|
||||
data = dict((k, v) for k, v in self.__dict__.iteritems()
|
||||
if k.startswith('_Cursor__') and k[9:] in values_to_clone)
|
||||
@ -690,12 +688,12 @@ class Cursor(object):
|
||||
"""
|
||||
if not isinstance(with_limit_and_skip, bool):
|
||||
raise TypeError("with_limit_and_skip must be an instance of bool")
|
||||
command = {"query": self.__spec, "fields": self.__fields}
|
||||
|
||||
command['read_preference'] = self.__read_preference
|
||||
command['tag_sets'] = self.__tag_sets
|
||||
command['secondary_acceptable_latency_ms'] = (
|
||||
self.__secondary_acceptable_latency_ms)
|
||||
command = {
|
||||
"query": self.__spec,
|
||||
"fields": self.__fields,
|
||||
"read_preference": self.__read_preference,
|
||||
"tag_sets": self.__tag_sets,
|
||||
}
|
||||
if self.__max_time_ms is not None:
|
||||
command["maxTimeMS"] = self.__max_time_ms
|
||||
if self.__comment:
|
||||
@ -749,8 +747,6 @@ class Cursor(object):
|
||||
|
||||
options['read_preference'] = self.__read_preference
|
||||
options['tag_sets'] = self.__tag_sets
|
||||
options['secondary_acceptable_latency_ms'] = (
|
||||
self.__secondary_acceptable_latency_ms)
|
||||
if self.__max_time_ms is not None:
|
||||
options['maxTimeMS'] = self.__max_time_ms
|
||||
if self.__comment:
|
||||
@ -859,8 +855,6 @@ class Cursor(object):
|
||||
kwargs = {
|
||||
"read_preference": self.__read_preference,
|
||||
"tag_sets": self.__tag_sets,
|
||||
"secondary_acceptable_latency_ms":
|
||||
self.__secondary_acceptable_latency_ms,
|
||||
"exhaust": self.__exhaust,
|
||||
}
|
||||
if self.__connection_id is not None:
|
||||
|
||||
@ -63,8 +63,6 @@ class Database(common.BaseObject):
|
||||
super(Database,
|
||||
self).__init__(read_preference=connection.read_preference,
|
||||
tag_sets=connection.tag_sets,
|
||||
secondary_acceptable_latency_ms=(
|
||||
connection.secondary_acceptable_latency_ms),
|
||||
uuidrepresentation=connection.uuid_subtype,
|
||||
**connection.write_concern)
|
||||
|
||||
@ -284,8 +282,6 @@ class Database(common.BaseObject):
|
||||
|
||||
orig = mode = kwargs.pop('read_preference', self.read_preference)
|
||||
tags = kwargs.pop('tag_sets', self.tag_sets)
|
||||
latency = kwargs.pop('secondary_acceptable_latency_ms',
|
||||
self.secondary_acceptable_latency_ms)
|
||||
as_class = kwargs.pop('as_class', None)
|
||||
|
||||
if command_name not in secondary_ok_commands:
|
||||
@ -310,7 +306,6 @@ class Database(common.BaseObject):
|
||||
"and will be routed to the primary instead." %
|
||||
(command_name, modes[orig]), UserWarning)
|
||||
tags = [{}]
|
||||
latency = None
|
||||
|
||||
fields = kwargs.pop('fields', None)
|
||||
if fields is not None and not isinstance(fields, dict):
|
||||
@ -324,7 +319,6 @@ class Database(common.BaseObject):
|
||||
as_class=as_class,
|
||||
read_preference=mode,
|
||||
tag_sets=tags,
|
||||
secondary_acceptable_latency_ms=latency,
|
||||
compile_re=compile_re,
|
||||
_uuid_subtype=uuid_subtype)
|
||||
for doc in cursor:
|
||||
@ -400,14 +394,11 @@ class Database(common.BaseObject):
|
||||
ignoring tags." MongoReplicaSetClient tries each set of tags in
|
||||
turn until it finds a set of tags with at least one matching
|
||||
member.
|
||||
- `secondary_acceptable_latency_ms`: Any replica-set member whose
|
||||
ping time is within secondary_acceptable_latency_ms of the nearest
|
||||
member may accept reads. Default 15 milliseconds.
|
||||
**Ignored by mongos** and must be configured on the command line.
|
||||
See the localThreshold_ option for more information.
|
||||
- `**kwargs` (optional): additional keyword arguments will
|
||||
be added to the command document before it is sent
|
||||
|
||||
.. versionchanged:: 3.0
|
||||
Removed the `secondary_acceptable_latency_ms` option.
|
||||
.. versionchanged:: 2.7
|
||||
Added ``compile_re`` option.
|
||||
.. versionchanged:: 2.3
|
||||
@ -423,7 +414,6 @@ class Database(common.BaseObject):
|
||||
.. versionadded:: 1.4
|
||||
|
||||
.. mongodoc:: commands
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
"""
|
||||
return self._command(command, value, check, allowable_errors,
|
||||
uuid_subtype, compile_re, **kwargs)[0]
|
||||
|
||||
@ -196,6 +196,10 @@ class MongoClient(common.BaseObject):
|
||||
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
|
||||
set, ``{}``, means "read from any member that matches the mode,
|
||||
ignoring tags.
|
||||
- `acceptable_latency_ms`: (integer) When used with mongos
|
||||
high availability, any mongos whose ping time is within
|
||||
acceptable_latency_ms of the nearest member may be chosen
|
||||
as the new primary during a failover. Default 15 milliseconds.
|
||||
|
||||
| **SSL configuration:**
|
||||
|
||||
@ -288,6 +292,7 @@ class MongoClient(common.BaseObject):
|
||||
self.__repl = options.get('replicaset')
|
||||
self.__direct = len(seeds) == 1 and not self.__repl
|
||||
|
||||
self.__acceptable_latency = options.get('acceptablelatencyms', 15)
|
||||
self.__net_timeout = options.get('sockettimeoutms')
|
||||
self.__conn_timeout = options.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = options.get('waitqueuetimeoutms')
|
||||
@ -663,6 +668,22 @@ class MongoClient(common.BaseObject):
|
||||
return self.__member_property(
|
||||
'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
|
||||
|
||||
@property
|
||||
def acceptable_latency_ms(self):
|
||||
"""When used with mongos high availability, any mongos whose ping time
|
||||
is within acceptable_latency_ms of the nearest mongos may be
|
||||
chosen as the new primary during a failover. Default 15 milliseconds.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
|
||||
.. note:: ``acceptable_latency_ms`` is ignored when talking
|
||||
to a replica set *through* a mongos. The equivalent is the
|
||||
localThreshold_ command line option.
|
||||
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption--localThreshold
|
||||
"""
|
||||
return self.__acceptable_latency
|
||||
|
||||
def __simple_command(self, sock_info, dbname, spec):
|
||||
"""Send a command to the server.
|
||||
"""
|
||||
@ -741,7 +762,7 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
Doesn't modify state.
|
||||
"""
|
||||
latency = self.secondary_acceptable_latency_ms
|
||||
latency = self.__acceptable_latency
|
||||
# Only used for mongos high availability, ping_time is in seconds.
|
||||
fastest = min([
|
||||
member.ping_time for member in candidates])
|
||||
|
||||
@ -262,7 +262,7 @@ class RSState(object):
|
||||
"""Return a Member instance or None for the given (host, port)."""
|
||||
return self._host_to_member.get(host)
|
||||
|
||||
def pin_host(self, host, mode, tag_sets, latency):
|
||||
def pin_host(self, host, mode, tag_sets):
|
||||
"""Pin this thread / greenlet to a member.
|
||||
|
||||
`host` is a (host, port) pair. The remaining parameters are a read
|
||||
@ -272,11 +272,11 @@ class RSState(object):
|
||||
# assignment here. Assignment to a threadlocal is only unsafe if it
|
||||
# can cause other Python code to run implicitly.
|
||||
self._threadlocal.host = host
|
||||
self._threadlocal.read_preference = (mode, tag_sets, latency)
|
||||
self._threadlocal.read_preference = (mode, tag_sets)
|
||||
|
||||
def keep_pinned_host(self, mode, tag_sets, latency):
|
||||
def keep_pinned_host(self, mode, tag_sets):
|
||||
"""Does a read pref match the last used by this thread / greenlet?"""
|
||||
return self._threadlocal.read_preference == (mode, tag_sets, latency)
|
||||
return self._threadlocal.read_preference == (mode, tag_sets)
|
||||
|
||||
@property
|
||||
def pinned_host(self):
|
||||
@ -549,11 +549,9 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
ignoring tags." :class:`MongoReplicaSetClient` tries each set of
|
||||
tags in turn until it finds a set of tags with at least one matching
|
||||
member.
|
||||
- `secondary_acceptable_latency_ms`: (integer) Any replica-set member
|
||||
whose ping time is within secondary_acceptable_latency_ms of the
|
||||
- `acceptable_latency_ms`: (integer) Any replica-set member
|
||||
whose ping time is within acceptable_latency_ms of the
|
||||
nearest member may accept reads. Default 15 milliseconds.
|
||||
**Ignored by mongos** and must be configured on the command line.
|
||||
See the localThreshold_ option for more information.
|
||||
|
||||
| **SSL configuration:**
|
||||
|
||||
@ -579,8 +577,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
.. versionchanged:: 2.5
|
||||
Added additional ssl options
|
||||
.. versionadded:: 2.4
|
||||
|
||||
.. _localThreshold: http://docs.mongodb.org/manual/reference/mongos/#cmdoption-mongos--localThreshold
|
||||
"""
|
||||
self.__opts = {}
|
||||
self.__seeds = set()
|
||||
@ -646,6 +642,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
raise ConfigurationError("the replicaSet "
|
||||
"keyword parameter is required.")
|
||||
|
||||
self.__acceptable_latency = self.__opts.get('acceptablelatencyms', 15)
|
||||
self.__net_timeout = self.__opts.get('sockettimeoutms')
|
||||
self.__conn_timeout = self.__opts.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
|
||||
@ -996,6 +993,18 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
return rs_state.primary_member.max_write_batch_size
|
||||
return common.MAX_WRITE_BATCH_SIZE
|
||||
|
||||
@property
|
||||
def acceptable_latency_ms(self):
|
||||
"""Any replica-set member whose ping time is within
|
||||
acceptable_latency_ms of the nearest member may accept
|
||||
reads. Defaults to 15 milliseconds.
|
||||
|
||||
See :class:`~pymongo.read_preferences.ReadPreference`.
|
||||
|
||||
.. versionadded:: 2.3
|
||||
"""
|
||||
return self.__acceptable_latency
|
||||
|
||||
@property
|
||||
def auto_start_request(self):
|
||||
"""Is auto_start_request enabled?
|
||||
@ -1593,10 +1602,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
self.__schedule_refresh(sync=sync)
|
||||
rs_state = self.__rs_state
|
||||
|
||||
latency = kwargs.get(
|
||||
'secondary_acceptable_latency_ms',
|
||||
self.secondary_acceptable_latency_ms)
|
||||
|
||||
try:
|
||||
if _connection_to_use is not None:
|
||||
if _connection_to_use == -1:
|
||||
@ -1629,7 +1634,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
if (pinned_member
|
||||
and pinned_member.matches_mode(mode)
|
||||
and pinned_member.matches_tag_sets(tag_sets) # TODO: REMOVE?
|
||||
and rs_state.keep_pinned_host(mode, tag_sets, latency)):
|
||||
and rs_state.keep_pinned_host(mode, tag_sets)):
|
||||
try:
|
||||
return (
|
||||
pinned_member.host,
|
||||
@ -1650,7 +1655,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
members=members,
|
||||
mode=mode,
|
||||
tag_sets=tag_sets,
|
||||
latency=latency)
|
||||
latency=self.__acceptable_latency)
|
||||
|
||||
if not member:
|
||||
# Ran out of members to try
|
||||
@ -1664,7 +1669,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
if self.in_request():
|
||||
# Keep reading from this member in this thread / greenlet
|
||||
# unless read preference changes
|
||||
rs_state.pin_host(member.host, mode, tag_sets, latency)
|
||||
rs_state.pin_host(member.host, mode, tag_sets)
|
||||
return member.host, response
|
||||
except AutoReconnect, why:
|
||||
if mode == ReadPreference.PRIMARY:
|
||||
|
||||
@ -654,11 +654,6 @@ class TestReadPreference(HATestCase):
|
||||
|
||||
assertReadFrom(other_secondary, NEAREST)
|
||||
|
||||
# High secondaryAcceptableLatencyMS, should read from all members
|
||||
assertReadFromAll(
|
||||
[primary, secondary, other_secondary],
|
||||
NEAREST, secondary_acceptable_latency_ms=1000*1000)
|
||||
|
||||
self.clear_ping_times()
|
||||
|
||||
assertReadFromAll([primary, other_secondary], NEAREST, [{'dc': 'ny'}])
|
||||
@ -822,18 +817,6 @@ class TestReadPreference(HATestCase):
|
||||
"Changing from tags %s to tags %s never unpinned" % (
|
||||
tags0, tags1))
|
||||
|
||||
# Finally, verify changing the secondary_acceptable_latency_ms unpins
|
||||
# the member.
|
||||
for _ in range(1000):
|
||||
host = utils.read_from_which_host(c, SECONDARY, None, 15)
|
||||
new_host = utils.read_from_which_host(c, SECONDARY, None, 20)
|
||||
if host != new_host:
|
||||
break
|
||||
else:
|
||||
self.fail(
|
||||
"Changing secondary_acceptable_latency_ms from 15 to 20"
|
||||
" never unpinned")
|
||||
|
||||
def tearDown(self):
|
||||
self.c.close()
|
||||
super(TestReadPreference, self).tearDown()
|
||||
|
||||
@ -600,14 +600,6 @@ with GridOut(self.db.fs, infile._id) as outfile:
|
||||
fields={"filename":1})
|
||||
|
||||
cursor = GridOutCursor(self.db.fs, {})
|
||||
min_ms = self.db.fs.files.secondary_acceptable_latency_ms
|
||||
new_ms = cursor._Cursor__secondary_acceptable_latency_ms
|
||||
self.assertEqual(min_ms, new_ms)
|
||||
cursor = GridOutCursor(self.db.fs, {},
|
||||
secondary_acceptable_latency_ms=100)
|
||||
min_ms = self.db.fs.files.secondary_acceptable_latency_ms
|
||||
new_ms = cursor._Cursor__secondary_acceptable_latency_ms
|
||||
self.assertNotEqual(min_ms, new_ms)
|
||||
cursor_clone = cursor.clone()
|
||||
self.assertEqual(cursor_clone.__dict__, cursor.__dict__)
|
||||
|
||||
|
||||
@ -118,16 +118,16 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
|
||||
def test_latency_validation(self):
|
||||
self.assertEqual(17, self._get_client(
|
||||
secondary_acceptable_latency_ms=17
|
||||
).secondary_acceptable_latency_ms)
|
||||
acceptableLatencyMS=17
|
||||
).acceptable_latency_ms)
|
||||
|
||||
self.assertEqual(42, self._get_client(
|
||||
secondaryAcceptableLatencyMS=42
|
||||
).secondary_acceptable_latency_ms)
|
||||
acceptableLatencyMS=42
|
||||
).acceptable_latency_ms)
|
||||
|
||||
self.assertEqual(666, self._get_client(
|
||||
secondaryacceptablelatencyms=666
|
||||
).secondary_acceptable_latency_ms)
|
||||
acceptablelatencyms=666
|
||||
).acceptable_latency_ms)
|
||||
|
||||
def test_primary(self):
|
||||
self.assertReadsFrom('primary',
|
||||
@ -157,11 +157,11 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
ReadPreference.SECONDARY, ReadPreference.SECONDARY_ONLY)
|
||||
|
||||
def test_nearest(self):
|
||||
# With high secondaryAcceptableLatencyMS, expect to read from any
|
||||
# With high acceptableLatencyMS, expect to read from any
|
||||
# member
|
||||
c = self._get_client(
|
||||
read_preference=ReadPreference.NEAREST,
|
||||
secondaryAcceptableLatencyMS=10000, # 10 seconds
|
||||
acceptableLatencyMS=10000, # 10 seconds
|
||||
auto_start_request=False)
|
||||
|
||||
data_members = set(self.hosts).difference(set(self.arbiters))
|
||||
@ -209,7 +209,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
|
||||
replicaSet=self.name, auto_start_request=False,
|
||||
# Effectively ignore members' ping times so we can test the effect
|
||||
# of ReadPreference modes only
|
||||
secondary_acceptable_latency_ms=1000*1000)
|
||||
acceptableLatencyMS=1000*1000)
|
||||
|
||||
def tearDown(self):
|
||||
# We create a lot of collections and indexes in these tests, so drop
|
||||
|
||||
@ -241,14 +241,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
for obj in c, c.pymongo_test, c.pymongo_test.test:
|
||||
self.assertEqual(obj.read_preference, ReadPreference.PRIMARY)
|
||||
self.assertEqual(obj.tag_sets, [{}])
|
||||
self.assertEqual(obj.secondary_acceptable_latency_ms, 15)
|
||||
self.assertEqual(obj.write_concern, {})
|
||||
|
||||
cursor = c.pymongo_test.test.find()
|
||||
self.assertEqual(
|
||||
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
|
||||
self.assertEqual([{}], cursor._Cursor__tag_sets)
|
||||
self.assertEqual(15, cursor._Cursor__secondary_acceptable_latency_ms)
|
||||
c.close()
|
||||
|
||||
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
|
||||
@ -256,7 +254,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
document_class=SON, tz_aware=True,
|
||||
read_preference=ReadPreference.SECONDARY,
|
||||
tag_sets=copy.deepcopy(tag_sets),
|
||||
secondary_acceptable_latency_ms=77)
|
||||
acceptablelatencyms=77)
|
||||
c.admin.command('ping')
|
||||
self.assertEqual(c.primary, self.primary)
|
||||
self.assertEqual(c.hosts, self.hosts)
|
||||
@ -268,23 +266,19 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
for obj in c, c.pymongo_test, c.pymongo_test.test:
|
||||
self.assertEqual(obj.read_preference, ReadPreference.SECONDARY)
|
||||
self.assertEqual(obj.tag_sets, tag_sets)
|
||||
self.assertEqual(obj.secondary_acceptable_latency_ms, 77)
|
||||
|
||||
cursor = c.pymongo_test.test.find()
|
||||
self.assertEqual(
|
||||
ReadPreference.SECONDARY, cursor._Cursor__read_preference)
|
||||
self.assertEqual(tag_sets, cursor._Cursor__tag_sets)
|
||||
self.assertEqual(77, cursor._Cursor__secondary_acceptable_latency_ms)
|
||||
|
||||
cursor = c.pymongo_test.test.find(
|
||||
read_preference=ReadPreference.NEAREST,
|
||||
tag_sets=[{'dc':'ny'}, {}],
|
||||
secondary_acceptable_latency_ms=123)
|
||||
tag_sets=[{'dc':'ny'}, {}])
|
||||
|
||||
self.assertEqual(
|
||||
ReadPreference.NEAREST, cursor._Cursor__read_preference)
|
||||
self.assertEqual([{'dc':'ny'}, {}], cursor._Cursor__tag_sets)
|
||||
self.assertEqual(123, cursor._Cursor__secondary_acceptable_latency_ms)
|
||||
|
||||
if version.at_least(c, (1, 7, 4)):
|
||||
self.assertEqual(c.max_bson_size, 16777216)
|
||||
@ -1059,7 +1053,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
|
||||
def test_pinned_member(self):
|
||||
latency = 1000 * 1000
|
||||
client = self._get_client(secondary_acceptable_latency_ms=latency)
|
||||
client = self._get_client(acceptablelatencyms=latency)
|
||||
|
||||
host = read_from_which_host(client, ReadPreference.SECONDARY)
|
||||
self.assertTrue(host in client.secondaries)
|
||||
@ -1067,18 +1061,18 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
# No pinning since we're not in a request
|
||||
assertReadFromAll(
|
||||
self, client, client.secondaries,
|
||||
ReadPreference.SECONDARY, None, latency)
|
||||
ReadPreference.SECONDARY, None)
|
||||
|
||||
assertReadFromAll(
|
||||
self, client, list(client.secondaries) + [client.primary],
|
||||
ReadPreference.NEAREST, None, latency)
|
||||
ReadPreference.NEAREST, None)
|
||||
|
||||
client.start_request()
|
||||
host = read_from_which_host(client, ReadPreference.SECONDARY)
|
||||
self.assertTrue(host in client.secondaries)
|
||||
assertReadFrom(self, client, host, ReadPreference.SECONDARY)
|
||||
|
||||
# Changing any part of read preference (mode, tag_sets, latency)
|
||||
# Changing any part of read preference (mode, tag_sets)
|
||||
# unpins the current host and pins to a new one
|
||||
primary = client.primary
|
||||
assertReadFrom(self, client, primary, ReadPreference.PRIMARY_PREFERRED)
|
||||
@ -1096,7 +1090,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
client.end_request()
|
||||
assertReadFromAll(
|
||||
self, client, list(client.secondaries) + [client.primary],
|
||||
ReadPreference.NEAREST, None, latency)
|
||||
ReadPreference.NEAREST, None)
|
||||
|
||||
def test_alive(self):
|
||||
client = self._get_client()
|
||||
|
||||
@ -278,7 +278,6 @@ def read_from_which_host(
|
||||
rsc,
|
||||
mode,
|
||||
tag_sets=None,
|
||||
secondary_acceptable_latency_ms=15
|
||||
):
|
||||
"""Read from a MongoReplicaSetClient with the given Read Preference mode,
|
||||
tags, and acceptable latency. Return the 'host:port' which was read from.
|
||||
@ -287,14 +286,12 @@ def read_from_which_host(
|
||||
- `rsc`: A MongoReplicaSetClient
|
||||
- `mode`: A ReadPreference
|
||||
- `tag_sets`: List of dicts of tags for data-center-aware reads
|
||||
- `secondary_acceptable_latency_ms`: a float
|
||||
"""
|
||||
db = rsc.pymongo_test
|
||||
db.read_preference = mode
|
||||
if isinstance(tag_sets, dict):
|
||||
tag_sets = [tag_sets]
|
||||
db.tag_sets = tag_sets or [{}]
|
||||
db.secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
|
||||
|
||||
cursor = db.test.find()
|
||||
try:
|
||||
@ -309,9 +306,8 @@ def read_from_which_host(
|
||||
return None
|
||||
|
||||
def assertReadFrom(testcase, rsc, member, *args, **kwargs):
|
||||
"""Check that a query with the given mode, tag_sets, and
|
||||
secondary_acceptable_latency_ms reads from the expected replica-set
|
||||
member
|
||||
"""Check that a query with the given mode and tag_sets reads from
|
||||
the expected replica-set member.
|
||||
|
||||
:Parameters:
|
||||
- `testcase`: A unittest.TestCase
|
||||
@ -319,15 +315,13 @@ def assertReadFrom(testcase, rsc, member, *args, **kwargs):
|
||||
- `member`: A host:port expected to be used
|
||||
- `mode`: A ReadPreference
|
||||
- `tag_sets` (optional): List of dicts of tags for data-center-aware reads
|
||||
- `secondary_acceptable_latency_ms` (optional): a float
|
||||
"""
|
||||
for _ in range(10):
|
||||
testcase.assertEqual(member, read_from_which_host(rsc, *args, **kwargs))
|
||||
|
||||
def assertReadFromAll(testcase, rsc, members, *args, **kwargs):
|
||||
"""Check that a query with the given mode, tag_sets, and
|
||||
secondary_acceptable_latency_ms reads from all members in a set, and
|
||||
only members in that set.
|
||||
"""Check that a query with the given mode and tag_sets reads from all
|
||||
members in a set, and only members in that set.
|
||||
|
||||
:Parameters:
|
||||
- `testcase`: A unittest.TestCase
|
||||
@ -335,7 +329,6 @@ def assertReadFromAll(testcase, rsc, members, *args, **kwargs):
|
||||
- `members`: Sequence of host:port expected to be used
|
||||
- `mode`: A ReadPreference
|
||||
- `tag_sets` (optional): List of dicts of tags for data-center-aware reads
|
||||
- `secondary_acceptable_latency_ms` (optional): a float
|
||||
"""
|
||||
members = set(members)
|
||||
used = set()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user