From 593373058818e8d6b2c76ee56372c78408944262 Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Mon, 11 Jul 2016 21:17:04 -0400 Subject: [PATCH] PYTHON-1104 Implement maxStalenessMS. --- pymongo/client_options.py | 4 +- pymongo/common.py | 3 +- pymongo/ismaster.py | 4 + pymongo/max_staleness_selectors.py | 90 +++++++ pymongo/read_preferences.py | 141 +++++++--- pymongo/server_description.py | 30 ++- pymongo/server_selectors.py | 155 ++++++----- pymongo/settings.py | 5 +- pymongo/topology.py | 59 +++-- pymongo/topology_description.py | 42 ++- .../DefaultNoMaxStaleness.json | 74 ++++++ .../ReplicaSetNoPrimary/Incompatible.json | 36 +++ .../ReplicaSetNoPrimary/LastUpdateTime.json | 88 +++++++ .../ReplicaSetNoPrimary/Nearest.json | 88 +++++++ .../ReplicaSetNoPrimary/Nearest2.json | 88 +++++++ .../ReplicaSetNoPrimary/PrimaryPreferred.json | 64 +++++ .../ReplicaSetNoPrimary/Secondary.json | 111 ++++++++ .../SecondaryPreferred.json | 63 +++++ .../SecondaryPreferred_tags.json | 111 ++++++++ .../DefaultNoMaxStaleness.json | 74 ++++++ .../ReplicaSetWithPrimary/Incompatible.json | 36 +++ .../ReplicaSetWithPrimary/LastUpdateTime.json | 88 +++++++ .../MaxStalenessTooSmall.json | 36 +++ .../MaxStalenessWithModePrimary.json | 35 +++ .../ReplicaSetWithPrimary/Nearest.json | 88 +++++++ .../ReplicaSetWithPrimary/Nearest2.json | 88 +++++++ .../PrimaryPreferred.json | 64 +++++ .../PrimaryPreferred_incompatible.json | 36 +++ .../SecondaryPreferred.json | 63 +++++ .../SecondaryPreferred_tags.json | 138 ++++++++++ .../ReplicaSetWithPrimary/Secondary_tags.json | 138 ++++++++++ .../ShortHeartbeartShortMaxStaleness.json | 76 ++++++ .../ShortHeartbeartShortMaxStaleness2.json | 76 ++++++ .../ZeroMaxStaleness.json | 75 ++++++ test/max_staleness/Sharded/Incompatible.json | 36 +++ .../Sharded/SmallMaxStaleness.json | 52 ++++ test/max_staleness/Single/Incompatible.json | 24 ++ .../Single/SmallMaxStaleness.json | 52 ++++ .../Unknown/SmallMaxStaleness.json | 18 ++ test/test_client.py | 9 + test/test_max_staleness.py | 241 ++++++++++++++++++ test/test_read_preferences.py | 31 ++- 42 files changed, 2684 insertions(+), 146 deletions(-) create mode 100644 pymongo/max_staleness_selectors.py create mode 100644 test/max_staleness/ReplicaSetNoPrimary/DefaultNoMaxStaleness.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/Incompatible.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/LastUpdateTime.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/Nearest.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/Nearest2.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/PrimaryPreferred.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/Secondary.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred.json create mode 100644 test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred_tags.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/DefaultNoMaxStaleness.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/Incompatible.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/LastUpdateTime.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/MaxStalenessTooSmall.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/MaxStalenessWithModePrimary.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/Nearest.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/Nearest2.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred_incompatible.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred_tags.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/Secondary_tags.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness2.json create mode 100644 test/max_staleness/ReplicaSetWithPrimary/ZeroMaxStaleness.json create mode 100644 test/max_staleness/Sharded/Incompatible.json create mode 100644 test/max_staleness/Sharded/SmallMaxStaleness.json create mode 100644 test/max_staleness/Single/Incompatible.json create mode 100644 test/max_staleness/Single/SmallMaxStaleness.json create mode 100644 test/max_staleness/Unknown/SmallMaxStaleness.json create mode 100644 test/test_max_staleness.py diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 0d71bdc40..a32e393e5 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -44,7 +44,9 @@ def _parse_read_preference(options): mode = options.get('readpreference', 0) tags = options.get('readpreferencetags') - return make_read_preference(mode, tags) + # common.validate() has converted from ms to seconds. + max_staleness = options.get('maxstalenessms', 0) + return make_read_preference(mode, tags, max_staleness) def _parse_write_concern(options): diff --git a/pymongo/common.py b/pymongo/common.py index 984afd951..747a91885 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -450,7 +450,7 @@ URI_VALIDATORS = { 'tz_aware': validate_boolean_or_string, 'uuidrepresentation': validate_uuid_representation, 'connect': validate_boolean_or_string, - 'minpoolsize': validate_non_negative_integer + 'minpoolsize': validate_non_negative_integer, } TIMEOUT_VALIDATORS = { @@ -460,6 +460,7 @@ TIMEOUT_VALIDATORS = { 'serverselectiontimeoutms': validate_timeout_or_zero, 'heartbeatfrequencyms': validate_timeout_or_none, 'maxidletimems': validate_timeout_or_none, + 'maxstalenessms': validate_timeout_or_none, } KW_VALIDATORS = { diff --git a/pymongo/ismaster.py b/pymongo/ismaster.py index 728d87cb3..88c9b69fa 100644 --- a/pymongo/ismaster.py +++ b/pymongo/ismaster.py @@ -132,3 +132,7 @@ class IsMaster(object): me = self._doc.get('me') if me: return common.clean_node(me) + + @property + def last_write_date(self): + return self._doc.get('lastWriteDate') diff --git a/pymongo/max_staleness_selectors.py b/pymongo/max_staleness_selectors.py new file mode 100644 index 000000000..511d0a1c6 --- /dev/null +++ b/pymongo/max_staleness_selectors.py @@ -0,0 +1,90 @@ +# Copyright 2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Criteria to select ServerDescriptions based on maxStalenessMS. + +The Max Staleness Spec says: When there is a known primary P, +a secondary S's staleness is estimated with this formula: + + (S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate) + + heartbeatFrequencyMS + +When there is no known primary, a secondary S's staleness is estimated with: + + SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS + +where "SMax" is the secondary with the greatest lastWriteDate. +""" + +from pymongo.errors import ConfigurationError +from pymongo.server_type import SERVER_TYPE + + +def _with_primary(max_staleness, selection): + """Apply max_staleness, in seconds, to a Selection with a known primary.""" + primary = selection.primary + sds = [] + + for s in selection.server_descriptions: + if s.server_type == SERVER_TYPE.RSSecondary: + # See max-staleness.rst for explanation of this formula. + staleness = ( + (s.last_update_time - s.last_write_date) - + (primary.last_update_time - primary.last_write_date) + + selection.heartbeat_frequency) + + if staleness <= max_staleness: + sds.append(s) + else: + sds.append(s) + + return selection.with_server_descriptions(sds) + + +def _no_primary(max_staleness, selection): + """Apply max_staleness, in seconds, to a Selection with no known primary.""" + smax = selection.secondary_with_max_last_write_date() + sds = [] + + for s in selection.server_descriptions: + if s.server_type == SERVER_TYPE.RSSecondary: + # See max-staleness.rst for explanation of this formula. + staleness = (smax.last_write_date - + s.last_write_date + + selection.heartbeat_frequency) + + if staleness <= max_staleness: + sds.append(s) + else: + sds.append(s) + + return selection.with_server_descriptions(sds) + + +def select(max_staleness, selection): + """Apply max_staleness, in seconds, to a Selection.""" + if not max_staleness: + return selection + + # Server Selection Spec: "A driver MUST raise an error if the + # TopologyType is ReplicaSetWithPrimary or ReplicaSetNoPrimary and + # maxStalenessMS is less than twice heartbeatFrequencyMS." + if max_staleness < 2 * selection.heartbeat_frequency: + raise ConfigurationError( + "maxStalenessMS must be twice heartbeatFrequencyMS") + + if selection.primary: + return _with_primary(max_staleness, selection) + else: + return _no_primary(max_staleness, selection) diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index bec5d0311..dc16c12ca 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -16,10 +16,10 @@ from collections import Mapping +from pymongo import max_staleness_selectors from pymongo.errors import ConfigurationError from pymongo.server_selectors import (member_with_tags_server_selector, - secondary_with_tags_server_selector, - writable_server_selector) + secondary_with_tags_server_selector) _PRIMARY = 0 @@ -62,19 +62,33 @@ def _validate_tag_sets(tag_sets): return tag_sets +def _validate_max_staleness(max_staleness): + """Validate maxStalenessMS.""" + if max_staleness is None: + return 0.0 + + errmsg = "maxStalenessMS must be an integer or float" + try: + max_staleness = float(max_staleness) + except ValueError: + raise ValueError(errmsg) + except TypeError: + raise TypeError(errmsg) + + return max_staleness + + class _ServerMode(object): """Base class for all read preferences. """ - __slots__ = ("__mongos_mode", "__mode", "__tag_sets") + __slots__ = ("__mongos_mode", "__mode", "__tag_sets", "__max_staleness") - def __init__(self, mode, tag_sets=None): - if mode == _PRIMARY and tag_sets is not None: - raise ConfigurationError("Read preference primary " - "cannot be combined with tags") + def __init__(self, mode, tag_sets=None, max_staleness=None): self.__mongos_mode = _MONGOS_MODES[mode] self.__mode = mode self.__tag_sets = _validate_tag_sets(tag_sets) + self.__max_staleness = _validate_max_staleness(max_staleness) @property def name(self): @@ -111,6 +125,23 @@ class _ServerMode(object): """ return list(self.__tag_sets) if self.__tag_sets else [{}] + @property + def max_staleness(self): + """This read preference's maxStalenessMS, converted to seconds.""" + return self.__max_staleness + + @property + def min_wire_version(self): + """The wire protocol version the server must support. + + Some read preferences impose version requirements on all servers in the + topology. E.g., maxStalenessMS requires MongoDB 3.4 / maxWireVersion 5. + + All servers' maxWireVersion must be at least this read preference's + `min_wire_version`, or the driver raises `ConfigurationError`. + """ + return 5 if self.__max_staleness else 0 + def __repr__(self): return "%s(tag_sets=%r)" % ( self.name, self.__tag_sets) @@ -118,7 +149,8 @@ class _ServerMode(object): def __eq__(self, other): if isinstance(other, _ServerMode): return (self.mode == other.mode and - self.tag_sets == other.tag_sets) + self.tag_sets == other.tag_sets and + self.max_staleness == other.max_staleness) return NotImplemented def __ne__(self, other): @@ -129,13 +161,16 @@ class _ServerMode(object): Needed explicitly because __slots__() defined. """ - return {'mode': self.__mode, 'tag_sets': self.__tag_sets} + return {'mode': self.__mode, + 'tag_sets': self.__tag_sets, + 'max_staleness': self.__max_staleness} def __setstate__(self, value): """Restore from pickling.""" self.__mode = value['mode'] self.__mongos_mode = _MONGOS_MODES[self.__mode] self.__tag_sets = _validate_tag_sets(value['tag_sets']) + self.__max_staleness = value['max_staleness'] class Primary(_ServerMode): @@ -151,9 +186,9 @@ class Primary(_ServerMode): def __init__(self): super(Primary, self).__init__(_PRIMARY) - def __call__(self, td): - """Return matching ServerDescriptions from a TopologyDescription.""" - return writable_server_selector(td) + def __call__(self, selection): + """Apply this read preference to a Selection.""" + return selection.primary_selection def __repr__(self): return "Primary()" @@ -177,18 +212,24 @@ class PrimaryPreferred(_ServerMode): :Parameters: - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not available. + - `max_staleness`: The :attr:`~max_staleness` to use if the primary is + not available. """ - def __init__(self, tag_sets=None): - super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets) + def __init__(self, tag_sets=None, max_staleness=None): + super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, + tag_sets, + max_staleness) - def __call__(self, td): - """Return matching ServerDescriptions from a TopologyDescription.""" - writable_servers = writable_server_selector(td) - if writable_servers: - return writable_servers + def __call__(self, selection): + """Apply this read preference to Selection.""" + if selection.primary: + return selection.primary_selection else: - return secondary_with_tags_server_selector(self.tag_sets, td) + return secondary_with_tags_server_selector( + self.tag_sets, + max_staleness_selectors.select( + self.max_staleness, selection)) class Secondary(_ServerMode): @@ -202,15 +243,19 @@ class Secondary(_ServerMode): secondaries. An error is raised if no secondaries are available. :Parameters: - - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + - `tag_sets`: The :attr:`~tag_sets` for this read preference. + - `max_staleness`: The :attr:`~max_staleness` for this read preference. """ - def __init__(self, tag_sets=None): - super(Secondary, self).__init__(_SECONDARY, tag_sets) + def __init__(self, tag_sets=None, max_staleness=None): + super(Secondary, self).__init__(_SECONDARY, tag_sets, max_staleness) - def __call__(self, td): - """Return matching ServerDescriptions from a TopologyDescription.""" - return secondary_with_tags_server_selector(self.tag_sets, td) + def __call__(self, selection): + """Apply this read preference to Selection.""" + return secondary_with_tags_server_selector( + self.tag_sets, + max_staleness_selectors.select( + self.max_staleness, selection)) class SecondaryPreferred(_ServerMode): @@ -224,20 +269,26 @@ class SecondaryPreferred(_ServerMode): secondaries, or the primary if no secondary is available. :Parameters: - - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + - `tag_sets`: The :attr:`~tag_sets` for this read preference. + - `max_staleness`: The :attr:`~max_staleness` for this read preference. """ - def __init__(self, tag_sets=None): - super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets) + def __init__(self, tag_sets=None, max_staleness=None): + super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, + tag_sets, + max_staleness) - def __call__(self, td): - """Return matching ServerDescriptions from a TopologyDescription.""" - secondaries = secondary_with_tags_server_selector(self.tag_sets, td) + def __call__(self, selection): + """Apply this read preference to Selection.""" + secondaries = secondary_with_tags_server_selector( + self.tag_sets, + max_staleness_selectors.select( + self.max_staleness, selection)) if secondaries: return secondaries else: - return writable_server_selector(td) + return selection.primary_selection class Nearest(_ServerMode): @@ -251,27 +302,35 @@ class Nearest(_ServerMode): members. :Parameters: - - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference + - `tag_sets`: The :attr:`~tag_sets` for this read preference. + - `max_staleness`: The :attr:`~max_staleness` for this read preference. """ - def __init__(self, tag_sets=None): - super(Nearest, self).__init__(_NEAREST, tag_sets) + def __init__(self, tag_sets=None, max_staleness=None): + super(Nearest, self).__init__(_NEAREST, tag_sets, max_staleness) - def __call__(self, td): - """Return matching ServerDescriptions from a TopologyDescription.""" - return member_with_tags_server_selector(self.tag_sets or [{}], td) + def __call__(self, selection): + """Apply this read preference to Selection.""" + return member_with_tags_server_selector( + self.tag_sets, + max_staleness_selectors.select( + self.max_staleness, selection)) _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest) -def make_read_preference(mode, tag_sets): + +def make_read_preference(mode, tag_sets, max_staleness=None): if mode == _PRIMARY: if tag_sets not in (None, [{}]): raise ConfigurationError("Read preference primary " "cannot be combined with tags") + if max_staleness: + raise ConfigurationError("Read preference primary cannot be " + "combined with maxStalenessMS") return Primary() - return _ALL_READ_PREFERENCES[mode](tag_sets) + return _ALL_READ_PREFERENCES[mode](tag_sets, max_staleness) _MODES = ( diff --git a/pymongo/server_description.py b/pymongo/server_description.py index 76a1c165d..1ed645450 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -14,8 +14,20 @@ """Represent one server in the topology.""" +from bson import EPOCH_NAIVE from pymongo.server_type import SERVER_TYPE from pymongo.ismaster import IsMaster +from pymongo.monotonic import time as _time + + +def _total_seconds(delta): + """Total seconds in the duration.""" + if hasattr(delta, 'total_seconds'): + return delta.total_seconds() + + # Python 2.6. + return ((delta.days * 86400 + delta.seconds) * 10 ** 6 + + delta.microseconds) / 10.0 ** 6 class ServerDescription(object): @@ -33,7 +45,7 @@ class ServerDescription(object): '_primary', '_max_bson_size', '_max_message_size', '_max_write_batch_size', '_min_wire_version', '_max_wire_version', '_round_trip_time', '_me', '_is_writable', '_is_readable', '_error', - '_set_version', '_election_id') + '_set_version', '_election_id', '_last_write_date', '_last_update_time') def __init__( self, @@ -61,8 +73,16 @@ class ServerDescription(object): self._is_readable = ismaster.is_readable self._round_trip_time = round_trip_time self._me = ismaster.me + self._last_update_time = _time() self._error = error + if ismaster.last_write_date: + # Convert from datetime to seconds. + delta = ismaster.last_write_date - EPOCH_NAIVE + self._last_write_date = _total_seconds(delta) + else: + self._last_write_date = None + @property def address(self): return self._address @@ -126,6 +146,14 @@ class ServerDescription(object): def me(self): return self._me + @property + def last_write_date(self): + return self._last_write_date + + @property + def last_update_time(self): + return self._last_update_time + @property def round_trip_time(self): """The current average latency or None.""" diff --git a/pymongo/server_selectors.py b/pymongo/server_selectors.py index cee9ae007..34da77729 100644 --- a/pymongo/server_selectors.py +++ b/pymongo/server_selectors.py @@ -1,4 +1,4 @@ -# Copyright 2014-2015 MongoDB, Inc. +# Copyright 2014-2016 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You @@ -17,45 +17,108 @@ from pymongo.server_type import SERVER_TYPE -def any_server_selector(td): - return td.known_servers +class Selection(object): + """Input or output of a server selector function.""" + + @classmethod + def from_topology_description(cls, topology_description): + known_servers = topology_description.known_servers + primary = None + for sd in known_servers: + if sd.server_type == SERVER_TYPE.RSPrimary: + primary = sd + break + + return Selection(topology_description, + topology_description.known_servers, + topology_description.common_wire_version, + primary) + + def __init__(self, + topology_description, + server_descriptions, + common_wire_version, + primary): + self.topology_description = topology_description + self.server_descriptions = server_descriptions + self.primary = primary + self.common_wire_version = common_wire_version + + def with_server_descriptions(self, server_descriptions): + return Selection(self.topology_description, + server_descriptions, + self.common_wire_version, + self.primary) + + def secondary_with_max_last_write_date(self): + smax = None + for s in self.topology_description.known_servers: + if s.server_type == SERVER_TYPE.RSSecondary: + if not smax: + smax = s + else: + if s.last_write_date > smax.last_write_date: + smax = s + + return smax + + @property + def primary_selection(self): + primaries = [self.primary] if self.primary else [] + return self.with_server_descriptions(primaries) + + @property + def heartbeat_frequency(self): + return self.topology_description.heartbeat_frequency + + def __bool__(self): + return bool(self.server_descriptions) + + __nonzero__ = __bool__ # Python 2. + + def __getitem__(self, item): + return self.server_descriptions[item] -def readable_server_selector(td): - return [s for s in td.known_servers if s.is_readable] +def any_server_selector(selection): + return selection -def writable_server_selector(td): - return [s for s in td.known_servers if s.is_writable] +def readable_server_selector(selection): + return selection.with_server_descriptions( + [s for s in selection.server_descriptions if s.is_readable]) -def secondary_server_selector(td): - return [s for s in td.known_servers - if s.server_type == SERVER_TYPE.RSSecondary] +def writable_server_selector(selection): + return selection.with_server_descriptions( + [s for s in selection.server_descriptions if s.is_writable]) -def arbiter_server_selector(td): - return [s for s in td.known_servers - if s.server_type == SERVER_TYPE.RSArbiter] +def secondary_server_selector(selection): + return selection.with_server_descriptions( + [s for s in selection.server_descriptions + if s.server_type == SERVER_TYPE.RSSecondary]) -def writable_preferred_server_selector(td): +def arbiter_server_selector(selection): + return selection.with_server_descriptions( + [s for s in selection.server_descriptions + if s.server_type == SERVER_TYPE.RSArbiter]) + + +def writable_preferred_server_selector(selection): """Like PrimaryPreferred but doesn't use tags or latency.""" - return writable_server_selector(td) or secondary_server_selector(td) + return (writable_server_selector(selection) or + secondary_server_selector(selection)) -def apply_single_tag_set(tag_set, server_descriptions): +def apply_single_tag_set(tag_set, selection): """All servers matching one tag set. A tag set is a dict. A server matches if its tags are a superset: A server tagged {'a': '1', 'b': '2'} matches the tag set {'a': '1'}. The empty tag set {} matches any server. - - The `server_descriptions` passed to this function should have - non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered - out (e.g. by readable_server_selector or secondary_server_selector) - first. """ def tags_match(server_tags): for key, value in tag_set.items(): @@ -64,10 +127,11 @@ def apply_single_tag_set(tag_set, server_descriptions): return True - return [s for s in server_descriptions if tags_match(s.tags)] + return selection.with_server_descriptions( + [s for s in selection.server_descriptions if tags_match(s.tags)]) -def apply_tag_sets(tag_sets, server_descriptions): +def apply_tag_sets(tag_sets, selection): """All servers match a list of tag sets. tag_sets is a list of dicts. The empty tag set {} matches any server, @@ -75,49 +139,20 @@ def apply_tag_sets(tag_sets, server_descriptions): [{'a': 'value'}, {}] expresses a preference for servers tagged {'a': 'value'}, but accepts any server if none matches the first preference. - - The `server_descriptions` passed to this function should have - non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered - out (e.g. by readable_server_selector or secondary_server_selector) - first. """ for tag_set in tag_sets: - selected = apply_single_tag_set(tag_set, server_descriptions) - if selected: - return selected + with_tag_set = apply_single_tag_set(tag_set, selection) + if with_tag_set: + return with_tag_set - return [] + return selection.with_server_descriptions([]) -def apply_local_threshold(latency_ms, server_descriptions): - """All servers with round trip times within latency_ms of the fastest one. - - No ServerDescription's round_trip_time can be None. - - The `server_descriptions` passed to this function should have - non-readable servers (e.g. RSGhost, RSArbiter, Unknown) filtered - out (e.g. by readable_server_selector or secondary_server_selector) - first. - """ - if not server_descriptions: - # Avoid ValueError from min() with empty sequence. - return [] - - # round_trip_time is in seconds. - if any(s for s in server_descriptions if s.round_trip_time is None): - raise ValueError("Not all servers' round trip times are known") - - fastest = min(s.round_trip_time for s in server_descriptions) - return [ - s for s in server_descriptions - if (s.round_trip_time - fastest) <= latency_ms / 1000.] - - -def secondary_with_tags_server_selector(tag_sets, td): +def secondary_with_tags_server_selector(tag_sets, selection): """All near-enough secondaries matching the tag sets.""" - return apply_tag_sets(tag_sets, secondary_server_selector(td)) + return apply_tag_sets(tag_sets, secondary_server_selector(selection)) -def member_with_tags_server_selector(tag_sets, td): +def member_with_tags_server_selector(tag_sets, selection): """All near-enough members matching the tag sets.""" - return apply_tag_sets(tag_sets, readable_server_selector(td)) + return apply_tag_sets(tag_sets, readable_server_selector(selection)) diff --git a/pymongo/settings.py b/pymongo/settings.py index 900a9275f..1c39a7d4a 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -41,8 +41,9 @@ class TopologySettings(object): Take a list of (host, port) pairs and optional replica set name. """ if heartbeat_frequency < common.MIN_HEARTBEAT_INTERVAL: - raise ConfigurationError("%s cannot be less than %.1f" % ( - 'heartbeatFrequencyMS', common.MIN_HEARTBEAT_INTERVAL)) + raise ConfigurationError( + "heartbeatFrequencyMS cannot be less than %d" % + common.MIN_HEARTBEAT_INTERVAL * 1000) self._seeds = seeds or [('localhost', 27017)] self._replica_set_name = replica_set_name diff --git a/pymongo/topology.py b/pymongo/topology.py index 8dedec889..ffad5badc 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -1,4 +1,4 @@ -# Copyright 2014-2015 MongoDB, Inc. +# Copyright 2014-2016 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You @@ -32,14 +32,14 @@ from pymongo.pool import PoolOptions from pymongo.topology_description import (updated_topology_description, TOPOLOGY_TYPE, TopologyDescription) -from pymongo.errors import ServerSelectionTimeoutError +from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError from pymongo.monotonic import time as _time from pymongo.server import Server from pymongo.server_selectors import (any_server_selector, - apply_local_threshold, arbiter_server_selector, secondary_server_selector, - writable_server_selector) + writable_server_selector, + Selection) def process_events_queue(queue_ref): @@ -84,15 +84,17 @@ class Topology(object): topology_settings.get_server_descriptions(), topology_settings.replica_set_name, None, - None) + None, + topology_settings) self._description = topology_description if self._publish_tp: + initial_td = TopologyDescription(TOPOLOGY_TYPE.Unknown, {}, None, + None, None, self._settings) self._events.put(( self._listeners.publish_topology_description_changed, - (TopologyDescription( - TOPOLOGY_TYPE.Unknown, {}, None, None, None), - self._description, self._topology_id))) + (initial_td, self._description, self._topology_id))) + for seed in topology_settings.seeds: if self._publish_server: self._events.put((self._listeners.publish_server_opened, @@ -284,8 +286,7 @@ class Topology(object): if topology_type != TOPOLOGY_TYPE.ReplicaSetWithPrimary: return None - description = writable_server_selector(self._description)[0] - return description.address + return writable_server_selector(self._new_selection())[0].address def _get_replica_set_members(self, selector): """Return set of replica set member addresses.""" @@ -296,8 +297,7 @@ class Topology(object): TOPOLOGY_TYPE.ReplicaSetNoPrimary): return set() - descriptions = selector(self._description) - return set([d.address for d in descriptions]) + return set([sd.address for sd in selector(self._new_selection())]) def get_secondaries(self): """Return set of secondary addresses.""" @@ -359,6 +359,13 @@ class Topology(object): def description(self): return self._description + def _new_selection(self): + """A Selection object, initially including all known servers. + + Hold the lock when calling this. + """ + return Selection.from_topology_description(self._description) + def _ensure_opened(self): """Start monitors, or restart after a fork. @@ -405,6 +412,15 @@ class Topology(object): server.request_check() def _apply_selector(self, selector, address): + if getattr(selector, 'min_wire_version', 0): + common_wv = self._description.common_wire_version + if common_wv and common_wv < selector.min_wire_version: + raise ConfigurationError( + "%s requires min wire version %d, but topology's min" + " wire version is %d" % (selector, + selector.min_wire_version, + common_wv)) + if self._description.topology_type == TOPOLOGY_TYPE.Single: # Ignore the selector. return self._description.known_servers @@ -412,12 +428,21 @@ class Topology(object): sd = self._description.server_descriptions().get(address) return [sd] if sd else [] elif self._description.topology_type == TOPOLOGY_TYPE.Sharded: - return apply_local_threshold(self._settings.local_threshold_ms, - self._description.known_servers) + # Ignore the read preference, but apply localThresholdMS. + return self._apply_local_threshold(self._new_selection()) else: - sds = selector(self._description) - return apply_local_threshold( - self._settings.local_threshold_ms, sds) + return self._apply_local_threshold(selector(self._new_selection())) + + def _apply_local_threshold(self, selection): + """Return list of servers from Selection that are in latency window.""" + if not selection: + return [] + + # Round trip time in seconds. + fastest = min(s.round_trip_time for s in selection.server_descriptions) + threshold = self._settings.local_threshold_ms / 1000.0 + return [s for s in selection.server_descriptions + if (s.round_trip_time - fastest) <= threshold] def _update_servers(self): """Sync our Servers from TopologyDescription.server_descriptions. diff --git a/pymongo/topology_description.py b/pymongo/topology_description.py index c9341c0cd..cf8237563 100644 --- a/pymongo/topology_description.py +++ b/pymongo/topology_description.py @@ -1,4 +1,4 @@ -# Copyright 2014-2015 MongoDB, Inc. +# Copyright 2014-2016 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You @@ -28,13 +28,13 @@ TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary', class TopologyDescription(object): - def __init__( - self, - topology_type, - server_descriptions, - replica_set_name, - max_set_version, - max_election_id): + def __init__(self, + topology_type, + server_descriptions, + replica_set_name, + max_set_version, + max_election_id, + topology_settings): """Represent a topology of servers. :Parameters: @@ -44,6 +44,7 @@ class TopologyDescription(object): - `replica_set_name`: replica set name or None - `max_set_version`: greatest setVersion seen from a primary, or None - `max_election_id`: greatest electionId seen from a primary, or None + - `topology_settings`: a TopologySettings """ self._topology_type = topology_type self._replica_set_name = replica_set_name @@ -51,6 +52,9 @@ class TopologyDescription(object): self._max_set_version = max_set_version self._max_election_id = max_election_id + # The heartbeat_frequency is used in staleness estimates. + self._topology_settings = topology_settings + # Is PyMongo compatible with all servers' wire protocols? self._incompatible_err = None @@ -111,7 +115,8 @@ class TopologyDescription(object): sds, self._replica_set_name, self._max_set_version, - self._max_election_id) + self._max_election_id, + self._topology_settings) def server_descriptions(self): """Dict of (address, ServerDescription).""" @@ -142,6 +147,19 @@ class TopologyDescription(object): return [s for s in self._server_descriptions.values() if s.is_server_type_known] + @property + def common_wire_version(self): + """Minimum of all servers' max wire versions, or None.""" + servers = self.known_servers + if servers: + return min(s.max_wire_version for s in self.known_servers) + + return None + + @property + def heartbeat_frequency(self): + return self._topology_settings.heartbeat_frequency + # If topology type is Unknown and we receive an ismaster response, what should # the new topology type be? @@ -188,7 +206,8 @@ def updated_topology_description(topology_description, server_description): sds, set_name, max_set_version, - max_election_id) + max_election_id, + topology_description._topology_settings) if topology_type == TOPOLOGY_TYPE.Unknown: if server_type == SERVER_TYPE.Standalone: @@ -253,7 +272,8 @@ def updated_topology_description(topology_description, server_description): sds, set_name, max_set_version, - max_election_id) + max_election_id, + topology_description._topology_settings) def _update_rs_from_primary( diff --git a/test/max_staleness/ReplicaSetNoPrimary/DefaultNoMaxStaleness.json b/test/max_staleness/ReplicaSetNoPrimary/DefaultNoMaxStaleness.json new file mode 100644 index 000000000..bf15fe734 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/DefaultNoMaxStaleness.json @@ -0,0 +1,74 @@ +{ + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/Incompatible.json b/test/max_staleness/ReplicaSetNoPrimary/Incompatible.json new file mode 100644 index 000000000..e41ea79b3 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/Incompatible.json @@ -0,0 +1,36 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 120000, + "mode": "Nearest" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 4, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/LastUpdateTime.json b/test/max_staleness/ReplicaSetNoPrimary/LastUpdateTime.json new file mode 100644 index 000000000..58e0ac4f8 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/LastUpdateTime.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 25002, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 25002, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 25001, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/Nearest.json b/test/max_staleness/ReplicaSetNoPrimary/Nearest.json new file mode 100644 index 000000000..97f471b11 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/Nearest.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/Nearest2.json b/test/max_staleness/ReplicaSetNoPrimary/Nearest2.json new file mode 100644 index 000000000..0438db264 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/Nearest2.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/PrimaryPreferred.json b/test/max_staleness/ReplicaSetNoPrimary/PrimaryPreferred.json new file mode 100644 index 000000000..f287087b6 --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/PrimaryPreferred.json @@ -0,0 +1,64 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "PrimaryPreferred" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/Secondary.json b/test/max_staleness/ReplicaSetNoPrimary/Secondary.json new file mode 100644 index 000000000..db99b636c --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/Secondary.json @@ -0,0 +1,111 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Secondary", + "tag_sets": [ + { + "data_center": "nyc" + } + ] + }, + "suitable_servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "d:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred.json b/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred.json new file mode 100644 index 000000000..66b9c1ffd --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred.json @@ -0,0 +1,63 @@ +{ + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 120000, + "mode": "SecondaryPreferred" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred_tags.json b/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred_tags.json new file mode 100644 index 000000000..f41250d0d --- /dev/null +++ b/test/max_staleness/ReplicaSetNoPrimary/SecondaryPreferred_tags.json @@ -0,0 +1,111 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "SecondaryPreferred", + "tag_sets": [ + { + "data_center": "nyc" + } + ] + }, + "suitable_servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "d:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetNoPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/DefaultNoMaxStaleness.json b/test/max_staleness/ReplicaSetWithPrimary/DefaultNoMaxStaleness.json new file mode 100644 index 000000000..d8418c139 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/DefaultNoMaxStaleness.json @@ -0,0 +1,74 @@ +{ + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/Incompatible.json b/test/max_staleness/ReplicaSetWithPrimary/Incompatible.json new file mode 100644 index 000000000..0e99ba96a --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/Incompatible.json @@ -0,0 +1,36 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 120000, + "mode": "Nearest" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 4, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/LastUpdateTime.json b/test/max_staleness/ReplicaSetWithPrimary/LastUpdateTime.json new file mode 100644 index 000000000..7dc33872b --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/LastUpdateTime.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 25001, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 25001, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 25001, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 25001, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessTooSmall.json b/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessTooSmall.json new file mode 100644 index 000000000..add052fcb --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessTooSmall.json @@ -0,0 +1,36 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 1, + "mode": "Nearest" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessWithModePrimary.json b/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessWithModePrimary.json new file mode 100644 index 000000000..b1cced7a3 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/MaxStalenessWithModePrimary.json @@ -0,0 +1,35 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 120000 + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/Nearest.json b/test/max_staleness/ReplicaSetWithPrimary/Nearest.json new file mode 100644 index 000000000..1fa266e0c --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/Nearest.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/Nearest2.json b/test/max_staleness/ReplicaSetWithPrimary/Nearest2.json new file mode 100644 index 000000000..790bad947 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/Nearest2.json @@ -0,0 +1,88 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred.json b/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred.json new file mode 100644 index 000000000..c45ecea1d --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred.json @@ -0,0 +1,64 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "PrimaryPreferred" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred_incompatible.json b/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred_incompatible.json new file mode 100644 index 000000000..7805a2384 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/PrimaryPreferred_incompatible.json @@ -0,0 +1,36 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 50000, + "mode": "PrimaryPreferred" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 4, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred.json b/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred.json new file mode 100644 index 000000000..d5a410f6b --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred.json @@ -0,0 +1,63 @@ +{ + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "read_preference": { + "maxStalenessMS": 120000, + "mode": "SecondaryPreferred" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred_tags.json b/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred_tags.json new file mode 100644 index 000000000..b41855b7c --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/SecondaryPreferred_tags.json @@ -0,0 +1,138 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "SecondaryPreferred", + "tag_sets": [ + { + "data_center": "nyc" + } + ] + }, + "suitable_servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "d:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "e:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/Secondary_tags.json b/test/max_staleness/ReplicaSetWithPrimary/Secondary_tags.json new file mode 100644 index 000000000..f167bc3ef --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/Secondary_tags.json @@ -0,0 +1,138 @@ +{ + "heartbeatFrequencyMS": 25000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 50000, + "mode": "Secondary", + "tag_sets": [ + { + "data_center": "nyc" + } + ] + }, + "suitable_servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "25002" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 1, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "d:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "nyc" + }, + "type": "RSSecondary" + }, + { + "address": "e:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "2" + } + }, + "maxWireVersion": 5, + "tags": { + "data_center": "tokyo" + }, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness.json b/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness.json new file mode 100644 index 000000000..4a6c92708 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness.json @@ -0,0 +1,76 @@ +{ + "heartbeatFrequencyMS": 1000, + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 2000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness2.json b/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness2.json new file mode 100644 index 000000000..e3c4242e5 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/ShortHeartbeartShortMaxStaleness2.json @@ -0,0 +1,76 @@ +{ + "heartbeatFrequencyMS": 1000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + } + ], + "read_preference": { + "maxStalenessMS": 2000, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/ReplicaSetWithPrimary/ZeroMaxStaleness.json b/test/max_staleness/ReplicaSetWithPrimary/ZeroMaxStaleness.json new file mode 100644 index 000000000..310912753 --- /dev/null +++ b/test/max_staleness/ReplicaSetWithPrimary/ZeroMaxStaleness.json @@ -0,0 +1,75 @@ +{ + "in_latency_window": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "read_preference": { + "maxStalenessMS": 0, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 50, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1000001" + } + }, + "maxWireVersion": 5, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "RSSecondary" + } + ], + "type": "ReplicaSetWithPrimary" + } +} diff --git a/test/max_staleness/Sharded/Incompatible.json b/test/max_staleness/Sharded/Incompatible.json new file mode 100644 index 000000000..58b6d577d --- /dev/null +++ b/test/max_staleness/Sharded/Incompatible.json @@ -0,0 +1,36 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 120000, + "mode": "Nearest" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 4, + "type": "Mongos" + } + ], + "type": "Sharded" + } +} diff --git a/test/max_staleness/Sharded/SmallMaxStaleness.json b/test/max_staleness/Sharded/SmallMaxStaleness.json new file mode 100644 index 000000000..89a7b357c --- /dev/null +++ b/test/max_staleness/Sharded/SmallMaxStaleness.json @@ -0,0 +1,52 @@ +{ + "heartbeatFrequencyMS": 10000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Mongos" + } + ], + "read_preference": { + "maxStalenessMS": 1, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Mongos" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Mongos" + } + ], + "type": "Sharded" + } +} diff --git a/test/max_staleness/Single/Incompatible.json b/test/max_staleness/Single/Incompatible.json new file mode 100644 index 000000000..1a0357564 --- /dev/null +++ b/test/max_staleness/Single/Incompatible.json @@ -0,0 +1,24 @@ +{ + "error": true, + "read_preference": { + "maxStalenessMS": 120000, + "mode": "Nearest" + }, + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 4, + "type": "Standalone" + } + ], + "type": "Single" + } +} diff --git a/test/max_staleness/Single/SmallMaxStaleness.json b/test/max_staleness/Single/SmallMaxStaleness.json new file mode 100644 index 000000000..67f27cdff --- /dev/null +++ b/test/max_staleness/Single/SmallMaxStaleness.json @@ -0,0 +1,52 @@ +{ + "heartbeatFrequencyMS": 10000, + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Standalone" + } + ], + "read_preference": { + "maxStalenessMS": 1, + "mode": "Nearest" + }, + "suitable_servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Standalone" + } + ], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 5, + "lastUpdateTime": 0, + "lastWrite": { + "lastWriteDate": { + "$numberLong": "1" + } + }, + "maxWireVersion": 5, + "type": "Standalone" + } + ], + "type": "Single" + } +} diff --git a/test/max_staleness/Unknown/SmallMaxStaleness.json b/test/max_staleness/Unknown/SmallMaxStaleness.json new file mode 100644 index 000000000..6cad0873e --- /dev/null +++ b/test/max_staleness/Unknown/SmallMaxStaleness.json @@ -0,0 +1,18 @@ +{ + "heartbeatFrequencyMS": 10000, + "in_latency_window": [], + "read_preference": { + "maxStalenessMS": 1, + "mode": "Nearest" + }, + "suitable_servers": [], + "topology_description": { + "servers": [ + { + "address": "a:27017", + "type": "Unknown" + } + ], + "type": "Unknown" + } +} diff --git a/test/test_client.py b/test/test_client.py index 83bfa9789..922684bf0 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -186,6 +186,15 @@ class ClientUnitTest(unittest.TestCase): c = MongoClient(uri, connect=False) self.assertEqual(Database(c, 'foo'), c.get_default_database()) + def test_primary_read_pref_with_tags(self): + # No tags allowed with "primary". + with self.assertRaises(ConfigurationError): + MongoClient('mongodb://host/?readpreferencetags=dc:east') + + with self.assertRaises(ConfigurationError): + MongoClient('mongodb://host/?' + 'readpreference=primary&readpreferencetags=dc:east') + class TestClient(IntegrationTest): diff --git a/test/test_max_staleness.py b/test/test_max_staleness.py new file mode 100644 index 000000000..2e307829e --- /dev/null +++ b/test/test_max_staleness.py @@ -0,0 +1,241 @@ +# Copyright 2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test maxStalenessMS support.""" + +import datetime +import os +import sys + +sys.path[0:0] = [""] + +from bson import json_util +from pymongo import MongoClient, read_preferences +from pymongo.common import clean_node, HEARTBEAT_FREQUENCY +from pymongo.errors import ConfigurationError, ConnectionFailure +from pymongo.ismaster import IsMaster +from pymongo.server_description import ServerDescription +from pymongo.settings import TopologySettings +from pymongo.topology import Topology +from test import unittest + + +# Location of JSON test specifications. +_TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'max_staleness') + + +class MockSocketInfo(object): + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class MockPool(object): + def __init__(self, *args, **kwargs): + pass + + def reset(self): + pass + + +class MockMonitor(object): + def __init__(self, server_description, topology, pool, topology_settings): + pass + + def open(self): + pass + + def request_check(self): + pass + + def close(self): + pass + + +def get_addresses(server_list): + seeds = [] + hosts = [] + for server in server_list: + seeds.append(clean_node(server['address'])) + hosts.append(server['address']) + return seeds, hosts + + +def make_last_write_date(server): + epoch = datetime.datetime.utcfromtimestamp(0) + millis = server.get('lastWrite', {}).get('lastWriteDate') + if millis: + diff = ((millis % 1000) + 1000) % 1000 + seconds = (millis - diff) / 1000 + micros = diff * 1000 + return epoch + datetime.timedelta( + seconds=seconds, microseconds=micros) + else: + # "Unknown" server. + return epoch + + +def make_server_description(server, hosts): + """Make ServerDescription from server info from JSON file.""" + server_type = server['type'] + if server_type == "Unknown": + return ServerDescription(clean_node(server['address']), IsMaster({})) + + ismaster_response = {'ok': True, 'hosts': hosts} + if server_type != "Standalone" and server_type != "Mongos": + ismaster_response['setName'] = "rs" + + if server_type == "RSPrimary": + ismaster_response['ismaster'] = True + elif server_type == "RSSecondary": + ismaster_response['secondary'] = True + elif server_type == "Mongos": + ismaster_response['msg'] = 'isdbgrid' + + ismaster_response['lastWriteDate'] = make_last_write_date(server) + if 'maxWireVersion' in server: + ismaster_response['maxWireVersion'] = server['maxWireVersion'] + + if 'tags' in server: + ismaster_response['tags'] = server['tags'] + + # Sets _last_update_time to now. + sd = ServerDescription(clean_node(server['address']), + IsMaster(ismaster_response), + round_trip_time=server['avg_rtt_ms']) + + sd._last_update_time = server['lastUpdateTime'] / 1000.0 # ms to sec. + return sd + + +class TestAllScenarios(unittest.TestCase): + pass + + +def create_test(scenario_def): + def run_scenario(self): + if 'heartbeatFrequencyMS' in scenario_def: + frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0 + else: + frequency = HEARTBEAT_FREQUENCY + + # Initialize topologies. + seeds, hosts = get_addresses( + scenario_def['topology_description']['servers']) + + topology = Topology( + TopologySettings(seeds=seeds, + monitor_class=MockMonitor, + pool_class=MockPool, + heartbeat_frequency=frequency)) + + # Update topologies with server descriptions. + for server in scenario_def['topology_description']['servers']: + server_description = make_server_description(server, hosts) + topology.on_change(server_description) + + # Create server selector. + # Make first letter lowercase to match read_pref's modes. + pref_def = scenario_def['read_preference'] + mode_string = pref_def.get('mode', 'primary') + mode_string = mode_string[:1].lower() + mode_string[1:] + mode = read_preferences.read_pref_mode_from_name(mode_string) + max_staleness = pref_def.get('maxStalenessMS', 0) / 1000.0 + tag_sets = pref_def.get('tag_sets') + + if scenario_def.get('error'): + with self.assertRaises(ConfigurationError): + # Error can be raised when making Read Pref or selecting. + pref = read_preferences.make_read_preference( + mode, tag_sets=tag_sets, max_staleness=max_staleness) + + topology.select_server(pref) + return + + expected_addrs = set([ + server['address'] for server in scenario_def['in_latency_window']]) + + # Select servers. + pref = read_preferences.make_read_preference( + mode, tag_sets=tag_sets, max_staleness=max_staleness) + + if not expected_addrs: + with self.assertRaises(ConnectionFailure): + topology.select_servers(pref, server_selection_timeout=0) + return + + servers = topology.select_servers(pref, server_selection_timeout=0) + actual_addrs = set(['%s:%d' % s.description.address for s in servers]) + + for unexpected in actual_addrs - expected_addrs: + self.fail("'%s' shouldn't have been selected, but was" % unexpected) + + for unselected in expected_addrs - actual_addrs: + self.fail("'%s' should have been selected, but wasn't" % unselected) + + return run_scenario + + +def create_tests(): + for dirpath, _, filenames in os.walk(_TEST_PATH): + dirname = os.path.split(dirpath) + dirname = os.path.split(dirname[-2])[-1] + '_' + dirname[-1] + + for filename in filenames: + if not filename.endswith('.json'): + continue + + with open(os.path.join(dirpath, filename)) as scenario_stream: + scenario_def = json_util.loads(scenario_stream.read()) + + # Construct test from scenario. + new_test = create_test(scenario_def) + test_name = 'test_%s_%s' % ( + dirname, os.path.splitext(filename)[0]) + + new_test.__name__ = test_name + setattr(TestAllScenarios, new_test.__name__, new_test) + + +create_tests() + + +class TestMaxStaleness(unittest.TestCase): + def test_max_staleness(self): + # These tests are specified in max-staleness-tests.rst. + with self.assertRaises(ConfigurationError): + MongoClient("mongodb://a/?maxStalenessMS=120000") + + with self.assertRaises(ConfigurationError): + MongoClient("mongodb://a/?readPreference=primary&" + "maxStalenessMS=120000") + + client = MongoClient("mongodb://host/?readPreference=secondary&" + "maxStalenessMS=120000") + self.assertEqual(120, client.read_preference.max_staleness) + + client = MongoClient("mongodb://a/?readPreference=secondary&" + "maxStalenessMS=1") + self.assertEqual(0.001, client.read_preference.max_staleness) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index ba9685027..a8488eb6f 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -1,4 +1,4 @@ -# Copyright 2011-2015 MongoDB, Inc. +# Copyright 2011-2016 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -30,9 +30,9 @@ from pymongo.mongo_client import MongoClient from pymongo.read_preferences import (ReadPreference, MovingAverage, Primary, PrimaryPreferred, Secondary, SecondaryPreferred, - Nearest, _ServerMode) + Nearest) from pymongo.server_description import ServerDescription -from pymongo.server_selectors import readable_server_selector +from pymongo.server_selectors import readable_server_selector, Selection from pymongo.server_type import SERVER_TYPE from pymongo.write_concern import WriteConcern @@ -48,8 +48,23 @@ from test.utils import connected, single_client, one, wait_until, rs_client from test.version import Version +class TestSelections(unittest.TestCase): + def test_bool(self): + client = single_client() + + wait_until(lambda: client.address, "discover primary") + selection = Selection.from_topology_description( + client._topology.description) + + self.assertTrue(selection) + self.assertFalse(selection.with_server_descriptions([])) + + class TestReadPreferenceObjects(unittest.TestCase): - prefs = [Primary(), Secondary(), Nearest(tag_sets=[{'a': 1}, {'b': 2}])] + prefs = [Primary(), + Secondary(), + Nearest(tag_sets=[{'a': 1}, {'b': 2}]), + SecondaryPreferred(max_staleness=30)] def test_pickle(self): for pref in self.prefs: @@ -158,14 +173,6 @@ class TestReadPreferences(TestReadPreferencesBase): rs_client, read_preference='foo') def test_tag_sets_validation(self): - # Can't use tags with PRIMARY - self.assertRaises(ConfigurationError, _ServerMode, - 0, tag_sets=[{'k': 'v'}]) - - # ... but empty tag sets are ok with PRIMARY - self.assertRaises(ConfigurationError, _ServerMode, - 0, tag_sets=[{}]) - S = Secondary(tag_sets=[{}]) self.assertEqual( [{}],