diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index 4f1e48c3b..01d2ebeb2 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -19,6 +19,9 @@ import random from collections import Mapping, namedtuple from pymongo.errors import ConfigurationError +from pymongo.server_selectors import (near_enough_server_selector, + near_secondary_with_tags_server_selector, + writable_server_selector) _PRIMARY = 0 @@ -154,6 +157,10 @@ class Primary(ServerMode): def __init__(self, latency_threshold_ms=15): super(Primary, self).__init__(_PRIMARY, latency_threshold_ms) + def select_servers(self, server_descriptions): + """Return matching ServerDescriptions from a list.""" + return writable_server_selector(server_descriptions) + def __repr__(self): return "Primary(latency_threshold_ms=%d)" % self.latency_threshold_ms @@ -179,6 +186,17 @@ class PrimaryPreferred(ServerMode): super(PrimaryPreferred, self).__init__( _PRIMARY_PREFERRED, latency_threshold_ms, tag_sets) + def select_servers(self, server_descriptions): + """Return matching ServerDescriptions from a list.""" + writable_servers = writable_server_selector(server_descriptions) + if writable_servers: + return writable_servers + else: + return near_secondary_with_tags_server_selector( + self.tag_sets, + self.latency_threshold_ms, + server_descriptions) + class Secondary(ServerMode): """Secondary read preference. @@ -200,6 +218,13 @@ class Secondary(ServerMode): super(Secondary, self).__init__( _SECONDARY, latency_threshold_ms, tag_sets) + def select_servers(self, server_descriptions): + """Return matching ServerDescriptions from a list.""" + return near_secondary_with_tags_server_selector( + self.tag_sets, + self.latency_threshold_ms, + server_descriptions) + class SecondaryPreferred(ServerMode): """SecondaryPreferred read preference. @@ -221,6 +246,18 @@ class SecondaryPreferred(ServerMode): super(SecondaryPreferred, self).__init__( _SECONDARY_PREFERRED, latency_threshold_ms, tag_sets) + def select_servers(self, server_descriptions): + """Return matching ServerDescriptions from a list.""" + secondaries = near_secondary_with_tags_server_selector( + self.tag_sets, + self.latency_threshold_ms, + server_descriptions) + + if secondaries: + return secondaries + else: + return writable_server_selector(server_descriptions) + class Nearest(ServerMode): """Nearest read preference. @@ -242,6 +279,11 @@ class Nearest(ServerMode): super(Nearest, self).__init__( _NEAREST, latency_threshold_ms, tag_sets) + def select_servers(self, server_descriptions): + """Return matching ServerDescriptions from a list.""" + return near_enough_server_selector(server_descriptions, + self.latency_threshold_ms) + _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred, Secondary, SecondaryPreferred, Nearest) @@ -307,6 +349,7 @@ def read_pref_mode_from_name(name): return _MONGOS_MODES.index(name) +# TODO: DELETE def _select_primary(members): """Get the primary member. """ @@ -317,6 +360,7 @@ def _select_primary(members): return None +# TODO: DELETE def _select_member_with_tags(members, tags, secondary_only, latency): """Get the member matching the given tags, and acceptable latency. """ @@ -345,6 +389,7 @@ def _select_member_with_tags(members, tags, secondary_only, latency): return random.choice(near_candidates) +# TODO: DELETE def select_member(members, mode, tag_sets=[{}], latency=15): """Return a Member or None. """ diff --git a/pymongo/server_selectors.py b/pymongo/server_selectors.py index 9b0a04ffb..b3d7ee5e8 100644 --- a/pymongo/server_selectors.py +++ b/pymongo/server_selectors.py @@ -28,3 +28,69 @@ def writable_server_selector(server_descriptions): def secondary_server_selector(server_descriptions): return [s for s in server_descriptions if s.server_type == SERVER_TYPE.RSSecondary] + + +def single_tag_set_server_selector(tag_set, server_descriptions): + """All servers matching one tag set. + + A tag set is a dict. A server matches if its tags are a superset: + A server tagged {'a': '1', 'b': '2'} matches the tag set {'a': '1'}. + + The empty tag set {} matches any server. + """ + def tags_match(server_tags): + for key, value in tag_set.items(): + if key not in server_tags or server_tags[key] != value: + return False + + return True + + return [s for s in server_descriptions if tags_match(s.tags)] + + +def tag_sets_server_selector(tag_sets, server_descriptions): + """All servers match a list of tag sets. + + tag_sets is a list of dicts. The empty tag set {} matches any server, + and may be provided at the end of the list as a fallback. So + [{'a': 'value'}, {}] expresses a preference for servers tagged + {'a': 'value'}, but accepts any server if none matches the first + preference. + """ + for tag_set in tag_sets: + selected = single_tag_set_server_selector(server_descriptions, tag_set) + if selected: + return selected + + return [] + + +def near_enough_server_selector(latency_ms, server_descriptions): + """All servers with round trip times within latency_ms of the fastest one. + + No ServerDescription's round_trip_time can be None. + """ + if not server_descriptions: + # Avoid ValueError from min() with empty sequence. + return [] + + # round_trip_time is in seconds. + if any(s for s in server_descriptions if s.round_trip_time is None): + raise ValueError("Not all servers' round trip times are known") + + fastest = min(s.round_trip_time for s in server_descriptions) + return [ + s for s in server_descriptions + if (s.round_trip_time - fastest) < latency_ms / 1000.] + + +def near_secondary_with_tags_server_selector( + tag_sets, + latency_ms, + server_descriptions): + """All near-enough secondaries matching the tag sets.""" + return near_enough_server_selector( + latency_ms, + tag_sets_server_selector( + tag_sets, + secondary_server_selector(server_descriptions)))