PYTHON-1117 - Add TopologyDescription.has_readable/writable_server
This commit is contained in:
parent
1355c5af1d
commit
f6e84fdff8
@ -32,7 +32,7 @@ from pymongo.pool import PoolOptions
|
||||
from pymongo.topology_description import (updated_topology_description,
|
||||
TOPOLOGY_TYPE,
|
||||
TopologyDescription)
|
||||
from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
|
||||
from pymongo.errors import ServerSelectionTimeoutError
|
||||
from pymongo.monotonic import time as _time
|
||||
from pymongo.server import Server
|
||||
from pymongo.server_selectors import (any_server_selector,
|
||||
@ -179,7 +179,8 @@ class Topology(object):
|
||||
|
||||
now = _time()
|
||||
end_time = now + server_timeout
|
||||
server_descriptions = self._apply_selector(selector, address)
|
||||
server_descriptions = self._description.apply_selector(
|
||||
selector, address)
|
||||
|
||||
while not server_descriptions:
|
||||
# No suitable servers.
|
||||
@ -192,12 +193,13 @@ class Topology(object):
|
||||
|
||||
# Release the lock and wait for the topology description to
|
||||
# change, or for a timeout. We won't miss any changes that
|
||||
# came after our most recent _apply_selector call, since we've
|
||||
# came after our most recent apply_selector call, since we've
|
||||
# held the lock until now.
|
||||
self._condition.wait(common.MIN_HEARTBEAT_INTERVAL)
|
||||
self._description.check_compatible()
|
||||
now = _time()
|
||||
server_descriptions = self._apply_selector(selector, address)
|
||||
server_descriptions = self._description.apply_selector(
|
||||
selector, address)
|
||||
|
||||
return [self.get_server_by_address(sd.address)
|
||||
for sd in server_descriptions]
|
||||
@ -411,39 +413,6 @@ class Topology(object):
|
||||
for server in self._servers.values():
|
||||
server.request_check()
|
||||
|
||||
def _apply_selector(self, selector, address):
|
||||
if getattr(selector, 'min_wire_version', 0):
|
||||
common_wv = self._description.common_wire_version
|
||||
if common_wv and common_wv < selector.min_wire_version:
|
||||
raise ConfigurationError(
|
||||
"%s requires min wire version %d, but topology's min"
|
||||
" wire version is %d" % (selector,
|
||||
selector.min_wire_version,
|
||||
common_wv))
|
||||
|
||||
if self._description.topology_type == TOPOLOGY_TYPE.Single:
|
||||
# Ignore the selector.
|
||||
return self._description.known_servers
|
||||
elif address:
|
||||
sd = self._description.server_descriptions().get(address)
|
||||
return [sd] if sd else []
|
||||
elif self._description.topology_type == TOPOLOGY_TYPE.Sharded:
|
||||
# Ignore the read preference, but apply localThresholdMS.
|
||||
return self._apply_local_threshold(self._new_selection())
|
||||
else:
|
||||
return self._apply_local_threshold(selector(self._new_selection()))
|
||||
|
||||
def _apply_local_threshold(self, selection):
|
||||
"""Return list of servers from Selection that are in latency window."""
|
||||
if not selection:
|
||||
return []
|
||||
|
||||
# Round trip time in seconds.
|
||||
fastest = min(s.round_trip_time for s in selection.server_descriptions)
|
||||
threshold = self._settings.local_threshold_ms / 1000.0
|
||||
return [s for s in selection.server_descriptions
|
||||
if (s.round_trip_time - fastest) <= threshold]
|
||||
|
||||
def _update_servers(self):
|
||||
"""Sync our Servers from TopologyDescription.server_descriptions.
|
||||
|
||||
|
||||
@ -17,9 +17,11 @@
|
||||
from collections import namedtuple
|
||||
|
||||
from pymongo import common
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_description import ServerDescription
|
||||
from pymongo.server_selectors import Selection
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
|
||||
|
||||
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
|
||||
@ -160,6 +162,71 @@ class TopologyDescription(object):
|
||||
def heartbeat_frequency(self):
|
||||
return self._topology_settings.heartbeat_frequency
|
||||
|
||||
def apply_selector(self, selector, address):
|
||||
|
||||
def apply_local_threshold(selection):
|
||||
if not selection:
|
||||
return []
|
||||
|
||||
settings = self._topology_settings
|
||||
|
||||
# Round trip time in seconds.
|
||||
fastest = min(
|
||||
s.round_trip_time for s in selection.server_descriptions)
|
||||
threshold = settings.local_threshold_ms / 1000.0
|
||||
return [s for s in selection.server_descriptions
|
||||
if (s.round_trip_time - fastest) <= threshold]
|
||||
|
||||
if getattr(selector, 'min_wire_version', 0):
|
||||
common_wv = self.common_wire_version
|
||||
if common_wv and common_wv < selector.min_wire_version:
|
||||
raise ConfigurationError(
|
||||
"%s requires min wire version %d, but topology's min"
|
||||
" wire version is %d" % (selector,
|
||||
selector.min_wire_version,
|
||||
common_wv))
|
||||
|
||||
if self.topology_type == TOPOLOGY_TYPE.Single:
|
||||
# Ignore the selector.
|
||||
return self.known_servers
|
||||
elif address:
|
||||
description = self.server_descriptions().get(address)
|
||||
return [description] if description else []
|
||||
elif self.topology_type == TOPOLOGY_TYPE.Sharded:
|
||||
# Ignore the read preference, but apply localThresholdMS.
|
||||
return apply_local_threshold(
|
||||
Selection.from_topology_description(self))
|
||||
else:
|
||||
return apply_local_threshold(
|
||||
selector(Selection.from_topology_description(self)))
|
||||
|
||||
def has_readable_server(self, read_preference=ReadPreference.PRIMARY):
|
||||
"""Does this topology have any readable servers available matching the
|
||||
given read preference?
|
||||
|
||||
:Parameters:
|
||||
- `read_preference`: an instance of a read preference from
|
||||
:mod:`~pymongo.read_preferences`. Defaults to
|
||||
:attr:`~pymongo.ReadPreference.PRIMARY`.
|
||||
|
||||
.. note:: When connected directly to a single server this method
|
||||
always returns ``True``.
|
||||
|
||||
.. versionadded:: 3.4
|
||||
"""
|
||||
common.validate_read_preference("read_preference", read_preference)
|
||||
return any(self.apply_selector(read_preference, None))
|
||||
|
||||
def has_writable_server(self):
|
||||
"""Does this topology have a writable server available?
|
||||
|
||||
.. note:: When connected directly to a single server this method
|
||||
always returns ``True``.
|
||||
|
||||
.. versionadded:: 3.4
|
||||
"""
|
||||
return self.has_readable_server(ReadPreference.PRIMARY)
|
||||
|
||||
|
||||
# If topology type is Unknown and we receive an ismaster response, what should
|
||||
# the new topology type be?
|
||||
|
||||
@ -222,6 +222,14 @@ class TestSingleServerTopology(TopologyTest):
|
||||
s = t.select_server(writable_server_selector)
|
||||
self.assertEqual(server_type, s.description.server_type)
|
||||
|
||||
# Topology type single is always readable and writable regardless
|
||||
# of server type or state.
|
||||
self.assertTrue(t.description.has_writable_server())
|
||||
self.assertTrue(t.description.has_readable_server())
|
||||
self.assertTrue(t.description.has_readable_server(Secondary()))
|
||||
self.assertTrue(t.description.has_readable_server(
|
||||
Secondary(tag_sets=[{'tag': 'does-not-exist'}])))
|
||||
|
||||
def test_reopen(self):
|
||||
t = create_mock_topology()
|
||||
|
||||
@ -290,6 +298,75 @@ class TestSingleServerTopology(TopologyTest):
|
||||
|
||||
|
||||
class TestMultiServerTopology(TopologyTest):
|
||||
def test_readable_writable(self):
|
||||
t = create_mock_topology(replica_set_name='rs')
|
||||
got_ismaster(t, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
got_ismaster(t, ('b', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
'secondary': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
self.assertTrue(t.description.has_writable_server())
|
||||
self.assertTrue(t.description.has_readable_server())
|
||||
self.assertTrue(
|
||||
t.description.has_readable_server(Secondary()))
|
||||
self.assertFalse(
|
||||
t.description.has_readable_server(
|
||||
Secondary(tag_sets=[{'tag': 'exists'}])))
|
||||
|
||||
t = create_mock_topology(replica_set_name='rs')
|
||||
got_ismaster(t, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
'secondary': False,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
got_ismaster(t, ('b', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
'secondary': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
self.assertFalse(t.description.has_writable_server())
|
||||
self.assertFalse(t.description.has_readable_server())
|
||||
self.assertTrue(
|
||||
t.description.has_readable_server(Secondary()))
|
||||
self.assertFalse(
|
||||
t.description.has_readable_server(
|
||||
Secondary(tag_sets=[{'tag': 'exists'}])))
|
||||
|
||||
t = create_mock_topology(replica_set_name='rs')
|
||||
got_ismaster(t, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
got_ismaster(t, ('b', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
'secondary': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b'],
|
||||
'tags': {'tag': 'exists'}})
|
||||
|
||||
self.assertTrue(t.description.has_writable_server())
|
||||
self.assertTrue(t.description.has_readable_server())
|
||||
self.assertTrue(
|
||||
t.description.has_readable_server(Secondary()))
|
||||
self.assertTrue(
|
||||
t.description.has_readable_server(
|
||||
Secondary(tag_sets=[{'tag': 'exists'}])))
|
||||
|
||||
def test_close(self):
|
||||
t = create_mock_topology(replica_set_name='rs')
|
||||
got_ismaster(t, ('a', 27017), {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user