PYTHON-1104 Implement maxStalenessMS.

This commit is contained in:
A. Jesse Jiryu Davis 2016-07-11 21:17:04 -04:00
parent f25df9799e
commit 5933730588
42 changed files with 2684 additions and 146 deletions

View File

@ -44,7 +44,9 @@ def _parse_read_preference(options):
mode = options.get('readpreference', 0)
tags = options.get('readpreferencetags')
return make_read_preference(mode, tags)
# common.validate() has converted from ms to seconds.
max_staleness = options.get('maxstalenessms', 0)
return make_read_preference(mode, tags, max_staleness)
def _parse_write_concern(options):

View File

@ -450,7 +450,7 @@ URI_VALIDATORS = {
'tz_aware': validate_boolean_or_string,
'uuidrepresentation': validate_uuid_representation,
'connect': validate_boolean_or_string,
'minpoolsize': validate_non_negative_integer
'minpoolsize': validate_non_negative_integer,
}
TIMEOUT_VALIDATORS = {
@ -460,6 +460,7 @@ TIMEOUT_VALIDATORS = {
'serverselectiontimeoutms': validate_timeout_or_zero,
'heartbeatfrequencyms': validate_timeout_or_none,
'maxidletimems': validate_timeout_or_none,
'maxstalenessms': validate_timeout_or_none,
}
KW_VALIDATORS = {

View File

@ -132,3 +132,7 @@ class IsMaster(object):
me = self._doc.get('me')
if me:
return common.clean_node(me)
@property
def last_write_date(self):
return self._doc.get('lastWriteDate')

View File

@ -0,0 +1,90 @@
# Copyright 2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Criteria to select ServerDescriptions based on maxStalenessMS.
The Max Staleness Spec says: When there is a known primary P,
a secondary S's staleness is estimated with this formula:
(S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate)
+ heartbeatFrequencyMS
When there is no known primary, a secondary S's staleness is estimated with:
SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS
where "SMax" is the secondary with the greatest lastWriteDate.
"""
from pymongo.errors import ConfigurationError
from pymongo.server_type import SERVER_TYPE
def _with_primary(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection with a known primary."""
primary = selection.primary
sds = []
for s in selection.server_descriptions:
if s.server_type == SERVER_TYPE.RSSecondary:
# See max-staleness.rst for explanation of this formula.
staleness = (
(s.last_update_time - s.last_write_date) -
(primary.last_update_time - primary.last_write_date) +
selection.heartbeat_frequency)
if staleness <= max_staleness:
sds.append(s)
else:
sds.append(s)
return selection.with_server_descriptions(sds)
def _no_primary(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection with no known primary."""
smax = selection.secondary_with_max_last_write_date()
sds = []
for s in selection.server_descriptions:
if s.server_type == SERVER_TYPE.RSSecondary:
# See max-staleness.rst for explanation of this formula.
staleness = (smax.last_write_date -
s.last_write_date +
selection.heartbeat_frequency)
if staleness <= max_staleness:
sds.append(s)
else:
sds.append(s)
return selection.with_server_descriptions(sds)
def select(max_staleness, selection):
"""Apply max_staleness, in seconds, to a Selection."""
if not max_staleness:
return selection
# Server Selection Spec: "A driver MUST raise an error if the
# TopologyType is ReplicaSetWithPrimary or ReplicaSetNoPrimary and
# maxStalenessMS is less than twice heartbeatFrequencyMS."
if max_staleness < 2 * selection.heartbeat_frequency:
raise ConfigurationError(
"maxStalenessMS must be twice heartbeatFrequencyMS")
if selection.primary:
return _with_primary(max_staleness, selection)
else:
return _no_primary(max_staleness, selection)

View File

@ -16,10 +16,10 @@
from collections import Mapping
from pymongo import max_staleness_selectors
from pymongo.errors import ConfigurationError
from pymongo.server_selectors import (member_with_tags_server_selector,
secondary_with_tags_server_selector,
writable_server_selector)
secondary_with_tags_server_selector)
_PRIMARY = 0
@ -62,19 +62,33 @@ def _validate_tag_sets(tag_sets):
return tag_sets
def _validate_max_staleness(max_staleness):
"""Validate maxStalenessMS."""
if max_staleness is None:
return 0.0
errmsg = "maxStalenessMS must be an integer or float"
try:
max_staleness = float(max_staleness)
except ValueError:
raise ValueError(errmsg)
except TypeError:
raise TypeError(errmsg)
return max_staleness
class _ServerMode(object):
"""Base class for all read preferences.
"""
__slots__ = ("__mongos_mode", "__mode", "__tag_sets")
__slots__ = ("__mongos_mode", "__mode", "__tag_sets", "__max_staleness")
def __init__(self, mode, tag_sets=None):
if mode == _PRIMARY and tag_sets is not None:
raise ConfigurationError("Read preference primary "
"cannot be combined with tags")
def __init__(self, mode, tag_sets=None, max_staleness=None):
self.__mongos_mode = _MONGOS_MODES[mode]
self.__mode = mode
self.__tag_sets = _validate_tag_sets(tag_sets)
self.__max_staleness = _validate_max_staleness(max_staleness)
@property
def name(self):
@ -111,6 +125,23 @@ class _ServerMode(object):
"""
return list(self.__tag_sets) if self.__tag_sets else [{}]
@property
def max_staleness(self):
"""This read preference's maxStalenessMS, converted to seconds."""
return self.__max_staleness
@property
def min_wire_version(self):
"""The wire protocol version the server must support.
Some read preferences impose version requirements on all servers in the
topology. E.g., maxStalenessMS requires MongoDB 3.4 / maxWireVersion 5.
All servers' maxWireVersion must be at least this read preference's
`min_wire_version`, or the driver raises `ConfigurationError`.
"""
return 5 if self.__max_staleness else 0
def __repr__(self):
return "%s(tag_sets=%r)" % (
self.name, self.__tag_sets)
@ -118,7 +149,8 @@ class _ServerMode(object):
def __eq__(self, other):
if isinstance(other, _ServerMode):
return (self.mode == other.mode and
self.tag_sets == other.tag_sets)
self.tag_sets == other.tag_sets and
self.max_staleness == other.max_staleness)
return NotImplemented
def __ne__(self, other):
@ -129,13 +161,16 @@ class _ServerMode(object):
Needed explicitly because __slots__() defined.
"""
return {'mode': self.__mode, 'tag_sets': self.__tag_sets}
return {'mode': self.__mode,
'tag_sets': self.__tag_sets,
'max_staleness': self.__max_staleness}
def __setstate__(self, value):
"""Restore from pickling."""
self.__mode = value['mode']
self.__mongos_mode = _MONGOS_MODES[self.__mode]
self.__tag_sets = _validate_tag_sets(value['tag_sets'])
self.__max_staleness = value['max_staleness']
class Primary(_ServerMode):
@ -151,9 +186,9 @@ class Primary(_ServerMode):
def __init__(self):
super(Primary, self).__init__(_PRIMARY)
def __call__(self, td):
"""Return matching ServerDescriptions from a TopologyDescription."""
return writable_server_selector(td)
def __call__(self, selection):
"""Apply this read preference to a Selection."""
return selection.primary_selection
def __repr__(self):
return "Primary()"
@ -177,18 +212,24 @@ class PrimaryPreferred(_ServerMode):
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
available.
- `max_staleness`: The :attr:`~max_staleness` to use if the primary is
not available.
"""
def __init__(self, tag_sets=None):
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
def __init__(self, tag_sets=None, max_staleness=None):
super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED,
tag_sets,
max_staleness)
def __call__(self, td):
"""Return matching ServerDescriptions from a TopologyDescription."""
writable_servers = writable_server_selector(td)
if writable_servers:
return writable_servers
def __call__(self, selection):
"""Apply this read preference to Selection."""
if selection.primary:
return selection.primary_selection
else:
return secondary_with_tags_server_selector(self.tag_sets, td)
return secondary_with_tags_server_selector(
self.tag_sets,
max_staleness_selectors.select(
self.max_staleness, selection))
class Secondary(_ServerMode):
@ -202,15 +243,19 @@ class Secondary(_ServerMode):
secondaries. An error is raised if no secondaries are available.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- `tag_sets`: The :attr:`~tag_sets` for this read preference.
- `max_staleness`: The :attr:`~max_staleness` for this read preference.
"""
def __init__(self, tag_sets=None):
super(Secondary, self).__init__(_SECONDARY, tag_sets)
def __init__(self, tag_sets=None, max_staleness=None):
super(Secondary, self).__init__(_SECONDARY, tag_sets, max_staleness)
def __call__(self, td):
"""Return matching ServerDescriptions from a TopologyDescription."""
return secondary_with_tags_server_selector(self.tag_sets, td)
def __call__(self, selection):
"""Apply this read preference to Selection."""
return secondary_with_tags_server_selector(
self.tag_sets,
max_staleness_selectors.select(
self.max_staleness, selection))
class SecondaryPreferred(_ServerMode):
@ -224,20 +269,26 @@ class SecondaryPreferred(_ServerMode):
secondaries, or the primary if no secondary is available.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- `tag_sets`: The :attr:`~tag_sets` for this read preference.
- `max_staleness`: The :attr:`~max_staleness` for this read preference.
"""
def __init__(self, tag_sets=None):
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
def __init__(self, tag_sets=None, max_staleness=None):
super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED,
tag_sets,
max_staleness)
def __call__(self, td):
"""Return matching ServerDescriptions from a TopologyDescription."""
secondaries = secondary_with_tags_server_selector(self.tag_sets, td)
def __call__(self, selection):
"""Apply this read preference to Selection."""
secondaries = secondary_with_tags_server_selector(
self.tag_sets,
max_staleness_selectors.select(
self.max_staleness, selection))
if secondaries:
return secondaries
else:
return writable_server_selector(td)
return selection.primary_selection
class Nearest(_ServerMode):
@ -251,27 +302,35 @@ class Nearest(_ServerMode):
members.
:Parameters:
- `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- `tag_sets`: The :attr:`~tag_sets` for this read preference.
- `max_staleness`: The :attr:`~max_staleness` for this read preference.
"""
def __init__(self, tag_sets=None):
super(Nearest, self).__init__(_NEAREST, tag_sets)
def __init__(self, tag_sets=None, max_staleness=None):
super(Nearest, self).__init__(_NEAREST, tag_sets, max_staleness)
def __call__(self, td):
"""Return matching ServerDescriptions from a TopologyDescription."""
return member_with_tags_server_selector(self.tag_sets or [{}], td)
def __call__(self, selection):
"""Apply this read preference to Selection."""
return member_with_tags_server_selector(
self.tag_sets,
max_staleness_selectors.select(
self.max_staleness, selection))
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
Secondary, SecondaryPreferred, Nearest)
def make_read_preference(mode, tag_sets):
def make_read_preference(mode, tag_sets, max_staleness=None):
if mode == _PRIMARY:
if tag_sets not in (None, [{}]):
raise ConfigurationError("Read preference primary "
"cannot be combined with tags")
if max_staleness:
raise ConfigurationError("Read preference primary cannot be "
"combined with maxStalenessMS")
return Primary()
return _ALL_READ_PREFERENCES[mode](tag_sets)
return _ALL_READ_PREFERENCES[mode](tag_sets, max_staleness)
_MODES = (

View File

@ -14,8 +14,20 @@
"""Represent one server in the topology."""
from bson import EPOCH_NAIVE
from pymongo.server_type import SERVER_TYPE
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
def _total_seconds(delta):
"""Total seconds in the duration."""
if hasattr(delta, 'total_seconds'):
return delta.total_seconds()
# Python 2.6.
return ((delta.days * 86400 + delta.seconds) * 10 ** 6 +
delta.microseconds) / 10.0 ** 6
class ServerDescription(object):
@ -33,7 +45,7 @@ class ServerDescription(object):
'_primary', '_max_bson_size', '_max_message_size',
'_max_write_batch_size', '_min_wire_version', '_max_wire_version',
'_round_trip_time', '_me', '_is_writable', '_is_readable', '_error',
'_set_version', '_election_id')
'_set_version', '_election_id', '_last_write_date', '_last_update_time')
def __init__(
self,
@ -61,8 +73,16 @@ class ServerDescription(object):
self._is_readable = ismaster.is_readable
self._round_trip_time = round_trip_time
self._me = ismaster.me
self._last_update_time = _time()
self._error = error
if ismaster.last_write_date:
# Convert from datetime to seconds.
delta = ismaster.last_write_date - EPOCH_NAIVE
self._last_write_date = _total_seconds(delta)
else:
self._last_write_date = None
@property
def address(self):
return self._address
@ -126,6 +146,14 @@ class ServerDescription(object):
def me(self):
return self._me
@property
def last_write_date(self):
return self._last_write_date
@property
def last_update_time(self):
return self._last_update_time
@property
def round_trip_time(self):
"""The current average latency or None."""

View File

@ -1,4 +1,4 @@
# Copyright 2014-2015 MongoDB, Inc.
# Copyright 2014-2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
@ -17,45 +17,108 @@
from pymongo.server_type import SERVER_TYPE
def any_server_selector(td):
return td.known_servers
class Selection(object):
"""Input or output of a server selector function."""
@classmethod
def from_topology_description(cls, topology_description):
known_servers = topology_description.known_servers
primary = None
for sd in known_servers:
if sd.server_type == SERVER_TYPE.RSPrimary:
primary = sd
break
return Selection(topology_description,
topology_description.known_servers,
topology_description.common_wire_version,
primary)
def __init__(self,
topology_description,
server_descriptions,
common_wire_version,
primary):
self.topology_description = topology_description
self.server_descriptions = server_descriptions
self.primary = primary
self.common_wire_version = common_wire_version
def with_server_descriptions(self, server_descriptions):
return Selection(self.topology_description,
server_descriptions,
self.common_wire_version,
self.primary)
def secondary_with_max_last_write_date(self):
smax = None
for s in self.topology_description.known_servers:
if s.server_type == SERVER_TYPE.RSSecondary:
if not smax:
smax = s
else:
if s.last_write_date > smax.last_write_date:
smax = s
return smax
@property
def primary_selection(self):
primaries = [self.primary] if self.primary else []
return self.with_server_descriptions(primaries)
@property
def heartbeat_frequency(self):
return self.topology_description.heartbeat_frequency
def __bool__(self):
return bool(self.server_descriptions)
__nonzero__ = __bool__ # Python 2.
def __getitem__(self, item):
return self.server_descriptions[item]
def readable_server_selector(td):
return [s for s in td.known_servers if s.is_readable]
def any_server_selector(selection):
return selection
def writable_server_selector(td):
return [s for s in td.known_servers if s.is_writable]
def readable_server_selector(selection):
return selection.with_server_descriptions(
[s for s in selection.server_descriptions if s.is_readable])
def secondary_server_selector(td):
return [s for s in td.known_servers
if s.server_type == SERVER_TYPE.RSSecondary]
def writable_server_selector(selection):
return selection.with_server_descriptions(
[s for s in selection.server_descriptions if s.is_writable])
def arbiter_server_selector(td):
return [s for s in td.known_servers
if s.server_type == SERVER_TYPE.RSArbiter]
def secondary_server_selector(selection):
return selection.with_server_descriptions(
[s for s in selection.server_descriptions
if s.server_type == SERVER_TYPE.RSSecondary])
def writable_preferred_server_selector(td):
def arbiter_server_selector(selection):
return selection.with_server_descriptions(
[s for s in selection.server_descriptions
if s.server_type == SERVER_TYPE.RSArbiter])
def writable_preferred_server_selector(selection):
"""Like PrimaryPreferred but doesn't use tags or latency."""
return writable_server_selector(td) or secondary_server_selector(td)
return (writable_server_selector(selection) or
secondary_server_selector(selection))
def apply_single_tag_set(tag_set, server_descriptions):
def apply_single_tag_set(tag_set, selection):
"""All servers matching one tag set.
A tag set is a dict. A server matches if its tags are a superset:
A server tagged {'a': '1', 'b': '2'} matches the tag set {'a': '1'}.
The empty tag set {} matches any server.
The `server_descriptions` passed to this function should have
non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered
out (e.g. by readable_server_selector or secondary_server_selector)
first.
"""
def tags_match(server_tags):
for key, value in tag_set.items():
@ -64,10 +127,11 @@ def apply_single_tag_set(tag_set, server_descriptions):
return True
return [s for s in server_descriptions if tags_match(s.tags)]
return selection.with_server_descriptions(
[s for s in selection.server_descriptions if tags_match(s.tags)])
def apply_tag_sets(tag_sets, server_descriptions):
def apply_tag_sets(tag_sets, selection):
"""All servers match a list of tag sets.
tag_sets is a list of dicts. The empty tag set {} matches any server,
@ -75,49 +139,20 @@ def apply_tag_sets(tag_sets, server_descriptions):
[{'a': 'value'}, {}] expresses a preference for servers tagged
{'a': 'value'}, but accepts any server if none matches the first
preference.
The `server_descriptions` passed to this function should have
non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered
out (e.g. by readable_server_selector or secondary_server_selector)
first.
"""
for tag_set in tag_sets:
selected = apply_single_tag_set(tag_set, server_descriptions)
if selected:
return selected
with_tag_set = apply_single_tag_set(tag_set, selection)
if with_tag_set:
return with_tag_set
return []
return selection.with_server_descriptions([])
def apply_local_threshold(latency_ms, server_descriptions):
"""All servers with round trip times within latency_ms of the fastest one.
No ServerDescription's round_trip_time can be None.
The `server_descriptions` passed to this function should have
non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered
out (e.g. by readable_server_selector or secondary_server_selector)
first.
"""
if not server_descriptions:
# Avoid ValueError from min() with empty sequence.
return []
# round_trip_time is in seconds.
if any(s for s in server_descriptions if s.round_trip_time is None):
raise ValueError("Not all servers' round trip times are known")
fastest = min(s.round_trip_time for s in server_descriptions)
return [
s for s in server_descriptions
if (s.round_trip_time - fastest) <= latency_ms / 1000.]
def secondary_with_tags_server_selector(tag_sets, td):
def secondary_with_tags_server_selector(tag_sets, selection):
"""All near-enough secondaries matching the tag sets."""
return apply_tag_sets(tag_sets, secondary_server_selector(td))
return apply_tag_sets(tag_sets, secondary_server_selector(selection))
def member_with_tags_server_selector(tag_sets, td):
def member_with_tags_server_selector(tag_sets, selection):
"""All near-enough members matching the tag sets."""
return apply_tag_sets(tag_sets, readable_server_selector(td))
return apply_tag_sets(tag_sets, readable_server_selector(selection))

View File

@ -41,8 +41,9 @@ class TopologySettings(object):
Take a list of (host, port) pairs and optional replica set name.
"""
if heartbeat_frequency < common.MIN_HEARTBEAT_INTERVAL:
raise ConfigurationError("%s cannot be less than %.1f" % (
'heartbeatFrequencyMS', common.MIN_HEARTBEAT_INTERVAL))
raise ConfigurationError(
"heartbeatFrequencyMS cannot be less than %d" %
common.MIN_HEARTBEAT_INTERVAL * 1000)
self._seeds = seeds or [('localhost', 27017)]
self._replica_set_name = replica_set_name

View File

@ -1,4 +1,4 @@
# Copyright 2014-2015 MongoDB, Inc.
# Copyright 2014-2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
@ -32,14 +32,14 @@ from pymongo.pool import PoolOptions
from pymongo.topology_description import (updated_topology_description,
TOPOLOGY_TYPE,
TopologyDescription)
from pymongo.errors import ServerSelectionTimeoutError
from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
from pymongo.monotonic import time as _time
from pymongo.server import Server
from pymongo.server_selectors import (any_server_selector,
apply_local_threshold,
arbiter_server_selector,
secondary_server_selector,
writable_server_selector)
writable_server_selector,
Selection)
def process_events_queue(queue_ref):
@ -84,15 +84,17 @@ class Topology(object):
topology_settings.get_server_descriptions(),
topology_settings.replica_set_name,
None,
None)
None,
topology_settings)
self._description = topology_description
if self._publish_tp:
initial_td = TopologyDescription(TOPOLOGY_TYPE.Unknown, {}, None,
None, None, self._settings)
self._events.put((
self._listeners.publish_topology_description_changed,
(TopologyDescription(
TOPOLOGY_TYPE.Unknown, {}, None, None, None),
self._description, self._topology_id)))
(initial_td, self._description, self._topology_id)))
for seed in topology_settings.seeds:
if self._publish_server:
self._events.put((self._listeners.publish_server_opened,
@ -284,8 +286,7 @@ class Topology(object):
if topology_type != TOPOLOGY_TYPE.ReplicaSetWithPrimary:
return None
description = writable_server_selector(self._description)[0]
return description.address
return writable_server_selector(self._new_selection())[0].address
def _get_replica_set_members(self, selector):
"""Return set of replica set member addresses."""
@ -296,8 +297,7 @@ class Topology(object):
TOPOLOGY_TYPE.ReplicaSetNoPrimary):
return set()
descriptions = selector(self._description)
return set([d.address for d in descriptions])
return set([sd.address for sd in selector(self._new_selection())])
def get_secondaries(self):
"""Return set of secondary addresses."""
@ -359,6 +359,13 @@ class Topology(object):
def description(self):
return self._description
def _new_selection(self):
"""A Selection object, initially including all known servers.
Hold the lock when calling this.
"""
return Selection.from_topology_description(self._description)
def _ensure_opened(self):
"""Start monitors, or restart after a fork.
@ -405,6 +412,15 @@ class Topology(object):
server.request_check()
def _apply_selector(self, selector, address):
if getattr(selector, 'min_wire_version', 0):
common_wv = self._description.common_wire_version
if common_wv and common_wv < selector.min_wire_version:
raise ConfigurationError(
"%s requires min wire version %d, but topology's min"
" wire version is %d" % (selector,
selector.min_wire_version,
common_wv))
if self._description.topology_type == TOPOLOGY_TYPE.Single:
# Ignore the selector.
return self._description.known_servers
@ -412,12 +428,21 @@ class Topology(object):
sd = self._description.server_descriptions().get(address)
return [sd] if sd else []
elif self._description.topology_type == TOPOLOGY_TYPE.Sharded:
return apply_local_threshold(self._settings.local_threshold_ms,
self._description.known_servers)
# Ignore the read preference, but apply localThresholdMS.
return self._apply_local_threshold(self._new_selection())
else:
sds = selector(self._description)
return apply_local_threshold(
self._settings.local_threshold_ms, sds)
return self._apply_local_threshold(selector(self._new_selection()))
def _apply_local_threshold(self, selection):
"""Return list of servers from Selection that are in latency window."""
if not selection:
return []
# Round trip time in seconds.
fastest = min(s.round_trip_time for s in selection.server_descriptions)
threshold = self._settings.local_threshold_ms / 1000.0
return [s for s in selection.server_descriptions
if (s.round_trip_time - fastest) <= threshold]
def _update_servers(self):
"""Sync our Servers from TopologyDescription.server_descriptions.

View File

@ -1,4 +1,4 @@
# Copyright 2014-2015 MongoDB, Inc.
# Copyright 2014-2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
@ -28,13 +28,13 @@ TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
class TopologyDescription(object):
def __init__(
self,
topology_type,
server_descriptions,
replica_set_name,
max_set_version,
max_election_id):
def __init__(self,
topology_type,
server_descriptions,
replica_set_name,
max_set_version,
max_election_id,
topology_settings):
"""Represent a topology of servers.
:Parameters:
@ -44,6 +44,7 @@ class TopologyDescription(object):
- `replica_set_name`: replica set name or None
- `max_set_version`: greatest setVersion seen from a primary, or None
- `max_election_id`: greatest electionId seen from a primary, or None
- `topology_settings`: a TopologySettings
"""
self._topology_type = topology_type
self._replica_set_name = replica_set_name
@ -51,6 +52,9 @@ class TopologyDescription(object):
self._max_set_version = max_set_version
self._max_election_id = max_election_id
# The heartbeat_frequency is used in staleness estimates.
self._topology_settings = topology_settings
# Is PyMongo compatible with all servers' wire protocols?
self._incompatible_err = None
@ -111,7 +115,8 @@ class TopologyDescription(object):
sds,
self._replica_set_name,
self._max_set_version,
self._max_election_id)
self._max_election_id,
self._topology_settings)
def server_descriptions(self):
"""Dict of (address, ServerDescription)."""
@ -142,6 +147,19 @@ class TopologyDescription(object):
return [s for s in self._server_descriptions.values()
if s.is_server_type_known]
@property
def common_wire_version(self):
"""Minimum of all servers' max wire versions, or None."""
servers = self.known_servers
if servers:
return min(s.max_wire_version for s in self.known_servers)
return None
@property
def heartbeat_frequency(self):
return self._topology_settings.heartbeat_frequency
# If topology type is Unknown and we receive an ismaster response, what should
# the new topology type be?
@ -188,7 +206,8 @@ def updated_topology_description(topology_description, server_description):
sds,
set_name,
max_set_version,
max_election_id)
max_election_id,
topology_description._topology_settings)
if topology_type == TOPOLOGY_TYPE.Unknown:
if server_type == SERVER_TYPE.Standalone:
@ -253,7 +272,8 @@ def updated_topology_description(topology_description, server_description):
sds,
set_name,
max_set_version,
max_election_id)
max_election_id,
topology_description._topology_settings)
def _update_rs_from_primary(

View File

@ -0,0 +1,74 @@
{
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,36 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 120000,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 4,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 25002,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 25002,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 25001,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,64 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "PrimaryPreferred"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,111 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Secondary",
"tag_sets": [
{
"data_center": "nyc"
}
]
},
"suitable_servers": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "d:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,63 @@
{
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 120000,
"mode": "SecondaryPreferred"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,111 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "SecondaryPreferred",
"tag_sets": [
{
"data_center": "nyc"
}
]
},
"suitable_servers": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "d:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
}
],
"type": "ReplicaSetNoPrimary"
}
}

View File

@ -0,0 +1,74 @@
{
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,36 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 120000,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 4,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 25001,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 25001,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 25001,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 25001,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,36 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 1,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,35 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 120000
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,88 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,64 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "PrimaryPreferred"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,36 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 50000,
"mode": "PrimaryPreferred"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 4,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,63 @@
{
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"read_preference": {
"maxStalenessMS": 120000,
"mode": "SecondaryPreferred"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,138 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "SecondaryPreferred",
"tag_sets": [
{
"data_center": "nyc"
}
]
},
"suitable_servers": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "d:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "e:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,138 @@
{
"heartbeatFrequencyMS": 25000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 50000,
"mode": "Secondary",
"tag_sets": [
{
"data_center": "nyc"
}
]
},
"suitable_servers": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "25002"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 1,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "d:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "nyc"
},
"type": "RSSecondary"
},
{
"address": "e:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "2"
}
},
"maxWireVersion": 5,
"tags": {
"data_center": "tokyo"
},
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,76 @@
{
"heartbeatFrequencyMS": 1000,
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 2000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,76 @@
{
"heartbeatFrequencyMS": 1000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
}
],
"read_preference": {
"maxStalenessMS": 2000,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,75 @@
{
"in_latency_window": [
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"read_preference": {
"maxStalenessMS": 0,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 50,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1000001"
}
},
"maxWireVersion": 5,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "RSSecondary"
}
],
"type": "ReplicaSetWithPrimary"
}
}

View File

@ -0,0 +1,36 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 120000,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 4,
"type": "Mongos"
}
],
"type": "Sharded"
}
}

View File

@ -0,0 +1,52 @@
{
"heartbeatFrequencyMS": 10000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Mongos"
}
],
"read_preference": {
"maxStalenessMS": 1,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Mongos"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Mongos"
}
],
"type": "Sharded"
}
}

View File

@ -0,0 +1,24 @@
{
"error": true,
"read_preference": {
"maxStalenessMS": 120000,
"mode": "Nearest"
},
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 4,
"type": "Standalone"
}
],
"type": "Single"
}
}

View File

@ -0,0 +1,52 @@
{
"heartbeatFrequencyMS": 10000,
"in_latency_window": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Standalone"
}
],
"read_preference": {
"maxStalenessMS": 1,
"mode": "Nearest"
},
"suitable_servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Standalone"
}
],
"topology_description": {
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 5,
"lastUpdateTime": 0,
"lastWrite": {
"lastWriteDate": {
"$numberLong": "1"
}
},
"maxWireVersion": 5,
"type": "Standalone"
}
],
"type": "Single"
}
}

View File

@ -0,0 +1,18 @@
{
"heartbeatFrequencyMS": 10000,
"in_latency_window": [],
"read_preference": {
"maxStalenessMS": 1,
"mode": "Nearest"
},
"suitable_servers": [],
"topology_description": {
"servers": [
{
"address": "a:27017",
"type": "Unknown"
}
],
"type": "Unknown"
}
}

View File

@ -186,6 +186,15 @@ class ClientUnitTest(unittest.TestCase):
c = MongoClient(uri, connect=False)
self.assertEqual(Database(c, 'foo'), c.get_default_database())
def test_primary_read_pref_with_tags(self):
# No tags allowed with "primary".
with self.assertRaises(ConfigurationError):
MongoClient('mongodb://host/?readpreferencetags=dc:east')
with self.assertRaises(ConfigurationError):
MongoClient('mongodb://host/?'
'readpreference=primary&readpreferencetags=dc:east')
class TestClient(IntegrationTest):

241
test/test_max_staleness.py Normal file
View File

@ -0,0 +1,241 @@
# Copyright 2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test maxStalenessMS support."""
import datetime
import os
import sys
sys.path[0:0] = [""]
from bson import json_util
from pymongo import MongoClient, read_preferences
from pymongo.common import clean_node, HEARTBEAT_FREQUENCY
from pymongo.errors import ConfigurationError, ConnectionFailure
from pymongo.ismaster import IsMaster
from pymongo.server_description import ServerDescription
from pymongo.settings import TopologySettings
from pymongo.topology import Topology
from test import unittest
# Location of JSON test specifications.
_TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'max_staleness')
class MockSocketInfo(object):
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class MockPool(object):
def __init__(self, *args, **kwargs):
pass
def reset(self):
pass
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):
pass
def open(self):
pass
def request_check(self):
pass
def close(self):
pass
def get_addresses(server_list):
seeds = []
hosts = []
for server in server_list:
seeds.append(clean_node(server['address']))
hosts.append(server['address'])
return seeds, hosts
def make_last_write_date(server):
epoch = datetime.datetime.utcfromtimestamp(0)
millis = server.get('lastWrite', {}).get('lastWriteDate')
if millis:
diff = ((millis % 1000) + 1000) % 1000
seconds = (millis - diff) / 1000
micros = diff * 1000
return epoch + datetime.timedelta(
seconds=seconds, microseconds=micros)
else:
# "Unknown" server.
return epoch
def make_server_description(server, hosts):
"""Make ServerDescription from server info from JSON file."""
server_type = server['type']
if server_type == "Unknown":
return ServerDescription(clean_node(server['address']), IsMaster({}))
ismaster_response = {'ok': True, 'hosts': hosts}
if server_type != "Standalone" and server_type != "Mongos":
ismaster_response['setName'] = "rs"
if server_type == "RSPrimary":
ismaster_response['ismaster'] = True
elif server_type == "RSSecondary":
ismaster_response['secondary'] = True
elif server_type == "Mongos":
ismaster_response['msg'] = 'isdbgrid'
ismaster_response['lastWriteDate'] = make_last_write_date(server)
if 'maxWireVersion' in server:
ismaster_response['maxWireVersion'] = server['maxWireVersion']
if 'tags' in server:
ismaster_response['tags'] = server['tags']
# Sets _last_update_time to now.
sd = ServerDescription(clean_node(server['address']),
IsMaster(ismaster_response),
round_trip_time=server['avg_rtt_ms'])
sd._last_update_time = server['lastUpdateTime'] / 1000.0 # ms to sec.
return sd
class TestAllScenarios(unittest.TestCase):
pass
def create_test(scenario_def):
def run_scenario(self):
if 'heartbeatFrequencyMS' in scenario_def:
frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0
else:
frequency = HEARTBEAT_FREQUENCY
# Initialize topologies.
seeds, hosts = get_addresses(
scenario_def['topology_description']['servers'])
topology = Topology(
TopologySettings(seeds=seeds,
monitor_class=MockMonitor,
pool_class=MockPool,
heartbeat_frequency=frequency))
# Update topologies with server descriptions.
for server in scenario_def['topology_description']['servers']:
server_description = make_server_description(server, hosts)
topology.on_change(server_description)
# Create server selector.
# Make first letter lowercase to match read_pref's modes.
pref_def = scenario_def['read_preference']
mode_string = pref_def.get('mode', 'primary')
mode_string = mode_string[:1].lower() + mode_string[1:]
mode = read_preferences.read_pref_mode_from_name(mode_string)
max_staleness = pref_def.get('maxStalenessMS', 0) / 1000.0
tag_sets = pref_def.get('tag_sets')
if scenario_def.get('error'):
with self.assertRaises(ConfigurationError):
# Error can be raised when making Read Pref or selecting.
pref = read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness)
topology.select_server(pref)
return
expected_addrs = set([
server['address'] for server in scenario_def['in_latency_window']])
# Select servers.
pref = read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness)
if not expected_addrs:
with self.assertRaises(ConnectionFailure):
topology.select_servers(pref, server_selection_timeout=0)
return
servers = topology.select_servers(pref, server_selection_timeout=0)
actual_addrs = set(['%s:%d' % s.description.address for s in servers])
for unexpected in actual_addrs - expected_addrs:
self.fail("'%s' shouldn't have been selected, but was" % unexpected)
for unselected in expected_addrs - actual_addrs:
self.fail("'%s' should have been selected, but wasn't" % unselected)
return run_scenario
def create_tests():
for dirpath, _, filenames in os.walk(_TEST_PATH):
dirname = os.path.split(dirpath)
dirname = os.path.split(dirname[-2])[-1] + '_' + dirname[-1]
for filename in filenames:
if not filename.endswith('.json'):
continue
with open(os.path.join(dirpath, filename)) as scenario_stream:
scenario_def = json_util.loads(scenario_stream.read())
# Construct test from scenario.
new_test = create_test(scenario_def)
test_name = 'test_%s_%s' % (
dirname, os.path.splitext(filename)[0])
new_test.__name__ = test_name
setattr(TestAllScenarios, new_test.__name__, new_test)
create_tests()
class TestMaxStaleness(unittest.TestCase):
def test_max_staleness(self):
# These tests are specified in max-staleness-tests.rst.
with self.assertRaises(ConfigurationError):
MongoClient("mongodb://a/?maxStalenessMS=120000")
with self.assertRaises(ConfigurationError):
MongoClient("mongodb://a/?readPreference=primary&"
"maxStalenessMS=120000")
client = MongoClient("mongodb://host/?readPreference=secondary&"
"maxStalenessMS=120000")
self.assertEqual(120, client.read_preference.max_staleness)
client = MongoClient("mongodb://a/?readPreference=secondary&"
"maxStalenessMS=1")
self.assertEqual(0.001, client.read_preference.max_staleness)
if __name__ == "__main__":
unittest.main()

View File

@ -1,4 +1,4 @@
# Copyright 2011-2015 MongoDB, Inc.
# Copyright 2011-2016 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -30,9 +30,9 @@ from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import (ReadPreference, MovingAverage,
Primary, PrimaryPreferred,
Secondary, SecondaryPreferred,
Nearest, _ServerMode)
Nearest)
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import readable_server_selector
from pymongo.server_selectors import readable_server_selector, Selection
from pymongo.server_type import SERVER_TYPE
from pymongo.write_concern import WriteConcern
@ -48,8 +48,23 @@ from test.utils import connected, single_client, one, wait_until, rs_client
from test.version import Version
class TestSelections(unittest.TestCase):
def test_bool(self):
client = single_client()
wait_until(lambda: client.address, "discover primary")
selection = Selection.from_topology_description(
client._topology.description)
self.assertTrue(selection)
self.assertFalse(selection.with_server_descriptions([]))
class TestReadPreferenceObjects(unittest.TestCase):
prefs = [Primary(), Secondary(), Nearest(tag_sets=[{'a': 1}, {'b': 2}])]
prefs = [Primary(),
Secondary(),
Nearest(tag_sets=[{'a': 1}, {'b': 2}]),
SecondaryPreferred(max_staleness=30)]
def test_pickle(self):
for pref in self.prefs:
@ -158,14 +173,6 @@ class TestReadPreferences(TestReadPreferencesBase):
rs_client, read_preference='foo')
def test_tag_sets_validation(self):
# Can't use tags with PRIMARY
self.assertRaises(ConfigurationError, _ServerMode,
0, tag_sets=[{'k': 'v'}])
# ... but empty tag sets are ok with PRIMARY
self.assertRaises(ConfigurationError, _ServerMode,
0, tag_sets=[{}])
S = Secondary(tag_sets=[{}])
self.assertEqual(
[{}],