PYTHON-525 Server selection implemented within ReadPreference classes.
This commit is contained in:
parent
6e2f8a2773
commit
3027853cd9
@ -19,6 +19,9 @@ import random
|
||||
from collections import Mapping, namedtuple
|
||||
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.server_selectors import (near_enough_server_selector,
|
||||
near_secondary_with_tags_server_selector,
|
||||
writable_server_selector)
|
||||
|
||||
|
||||
_PRIMARY = 0
|
||||
@ -154,6 +157,10 @@ class Primary(ServerMode):
|
||||
def __init__(self, latency_threshold_ms=15):
|
||||
super(Primary, self).__init__(_PRIMARY, latency_threshold_ms)
|
||||
|
||||
def select_servers(self, server_descriptions):
|
||||
"""Return matching ServerDescriptions from a list."""
|
||||
return writable_server_selector(server_descriptions)
|
||||
|
||||
def __repr__(self):
|
||||
return "Primary(latency_threshold_ms=%d)" % self.latency_threshold_ms
|
||||
|
||||
@ -179,6 +186,17 @@ class PrimaryPreferred(ServerMode):
|
||||
super(PrimaryPreferred, self).__init__(
|
||||
_PRIMARY_PREFERRED, latency_threshold_ms, tag_sets)
|
||||
|
||||
def select_servers(self, server_descriptions):
|
||||
"""Return matching ServerDescriptions from a list."""
|
||||
writable_servers = writable_server_selector(server_descriptions)
|
||||
if writable_servers:
|
||||
return writable_servers
|
||||
else:
|
||||
return near_secondary_with_tags_server_selector(
|
||||
self.tag_sets,
|
||||
self.latency_threshold_ms,
|
||||
server_descriptions)
|
||||
|
||||
|
||||
class Secondary(ServerMode):
|
||||
"""Secondary read preference.
|
||||
@ -200,6 +218,13 @@ class Secondary(ServerMode):
|
||||
super(Secondary, self).__init__(
|
||||
_SECONDARY, latency_threshold_ms, tag_sets)
|
||||
|
||||
def select_servers(self, server_descriptions):
|
||||
"""Return matching ServerDescriptions from a list."""
|
||||
return near_secondary_with_tags_server_selector(
|
||||
self.tag_sets,
|
||||
self.latency_threshold_ms,
|
||||
server_descriptions)
|
||||
|
||||
|
||||
class SecondaryPreferred(ServerMode):
|
||||
"""SecondaryPreferred read preference.
|
||||
@ -221,6 +246,18 @@ class SecondaryPreferred(ServerMode):
|
||||
super(SecondaryPreferred, self).__init__(
|
||||
_SECONDARY_PREFERRED, latency_threshold_ms, tag_sets)
|
||||
|
||||
def select_servers(self, server_descriptions):
|
||||
"""Return matching ServerDescriptions from a list."""
|
||||
secondaries = near_secondary_with_tags_server_selector(
|
||||
self.tag_sets,
|
||||
self.latency_threshold_ms,
|
||||
server_descriptions)
|
||||
|
||||
if secondaries:
|
||||
return secondaries
|
||||
else:
|
||||
return writable_server_selector(server_descriptions)
|
||||
|
||||
|
||||
class Nearest(ServerMode):
|
||||
"""Nearest read preference.
|
||||
@ -242,6 +279,11 @@ class Nearest(ServerMode):
|
||||
super(Nearest, self).__init__(
|
||||
_NEAREST, latency_threshold_ms, tag_sets)
|
||||
|
||||
def select_servers(self, server_descriptions):
|
||||
"""Return matching ServerDescriptions from a list."""
|
||||
return near_enough_server_selector(server_descriptions,
|
||||
self.latency_threshold_ms)
|
||||
|
||||
|
||||
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
|
||||
Secondary, SecondaryPreferred, Nearest)
|
||||
@ -307,6 +349,7 @@ def read_pref_mode_from_name(name):
|
||||
return _MONGOS_MODES.index(name)
|
||||
|
||||
|
||||
# TODO: DELETE
|
||||
def _select_primary(members):
|
||||
"""Get the primary member.
|
||||
"""
|
||||
@ -317,6 +360,7 @@ def _select_primary(members):
|
||||
return None
|
||||
|
||||
|
||||
# TODO: DELETE
|
||||
def _select_member_with_tags(members, tags, secondary_only, latency):
|
||||
"""Get the member matching the given tags, and acceptable latency.
|
||||
"""
|
||||
@ -345,6 +389,7 @@ def _select_member_with_tags(members, tags, secondary_only, latency):
|
||||
return random.choice(near_candidates)
|
||||
|
||||
|
||||
# TODO: DELETE
|
||||
def select_member(members, mode, tag_sets=[{}], latency=15):
|
||||
"""Return a Member or None.
|
||||
"""
|
||||
|
||||
@ -28,3 +28,69 @@ def writable_server_selector(server_descriptions):
|
||||
def secondary_server_selector(server_descriptions):
|
||||
return [s for s in server_descriptions
|
||||
if s.server_type == SERVER_TYPE.RSSecondary]
|
||||
|
||||
|
||||
def single_tag_set_server_selector(tag_set, server_descriptions):
|
||||
"""All servers matching one tag set.
|
||||
|
||||
A tag set is a dict. A server matches if its tags are a superset:
|
||||
A server tagged {'a': '1', 'b': '2'} matches the tag set {'a': '1'}.
|
||||
|
||||
The empty tag set {} matches any server.
|
||||
"""
|
||||
def tags_match(server_tags):
|
||||
for key, value in tag_set.items():
|
||||
if key not in server_tags or server_tags[key] != value:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
return [s for s in server_descriptions if tags_match(s.tags)]
|
||||
|
||||
|
||||
def tag_sets_server_selector(tag_sets, server_descriptions):
|
||||
"""All servers match a list of tag sets.
|
||||
|
||||
tag_sets is a list of dicts. The empty tag set {} matches any server,
|
||||
and may be provided at the end of the list as a fallback. So
|
||||
[{'a': 'value'}, {}] expresses a preference for servers tagged
|
||||
{'a': 'value'}, but accepts any server if none matches the first
|
||||
preference.
|
||||
"""
|
||||
for tag_set in tag_sets:
|
||||
selected = single_tag_set_server_selector(server_descriptions, tag_set)
|
||||
if selected:
|
||||
return selected
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def near_enough_server_selector(latency_ms, server_descriptions):
|
||||
"""All servers with round trip times within latency_ms of the fastest one.
|
||||
|
||||
No ServerDescription's round_trip_time can be None.
|
||||
"""
|
||||
if not server_descriptions:
|
||||
# Avoid ValueError from min() with empty sequence.
|
||||
return []
|
||||
|
||||
# round_trip_time is in seconds.
|
||||
if any(s for s in server_descriptions if s.round_trip_time is None):
|
||||
raise ValueError("Not all servers' round trip times are known")
|
||||
|
||||
fastest = min(s.round_trip_time for s in server_descriptions)
|
||||
return [
|
||||
s for s in server_descriptions
|
||||
if (s.round_trip_time - fastest) < latency_ms / 1000.]
|
||||
|
||||
|
||||
def near_secondary_with_tags_server_selector(
|
||||
tag_sets,
|
||||
latency_ms,
|
||||
server_descriptions):
|
||||
"""All near-enough secondaries matching the tag sets."""
|
||||
return near_enough_server_selector(
|
||||
latency_ms,
|
||||
tag_sets_server_selector(
|
||||
tag_sets,
|
||||
secondary_server_selector(server_descriptions)))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user