diff --git a/pymongo/topology.py b/pymongo/topology.py index 3b33d10e4..e3164f329 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -32,7 +32,7 @@ from pymongo.pool import PoolOptions from pymongo.topology_description import (updated_topology_description, TOPOLOGY_TYPE, TopologyDescription) -from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError +from pymongo.errors import ServerSelectionTimeoutError from pymongo.monotonic import time as _time from pymongo.server import Server from pymongo.server_selectors import (any_server_selector, @@ -179,7 +179,8 @@ class Topology(object): now = _time() end_time = now + server_timeout - server_descriptions = self._apply_selector(selector, address) + server_descriptions = self._description.apply_selector( + selector, address) while not server_descriptions: # No suitable servers. @@ -192,12 +193,13 @@ class Topology(object): # Release the lock and wait for the topology description to # change, or for a timeout. We won't miss any changes that - # came after our most recent _apply_selector call, since we've + # came after our most recent apply_selector call, since we've # held the lock until now. self._condition.wait(common.MIN_HEARTBEAT_INTERVAL) self._description.check_compatible() now = _time() - server_descriptions = self._apply_selector(selector, address) + server_descriptions = self._description.apply_selector( + selector, address) return [self.get_server_by_address(sd.address) for sd in server_descriptions] @@ -411,39 +413,6 @@ class Topology(object): for server in self._servers.values(): server.request_check() - def _apply_selector(self, selector, address): - if getattr(selector, 'min_wire_version', 0): - common_wv = self._description.common_wire_version - if common_wv and common_wv < selector.min_wire_version: - raise ConfigurationError( - "%s requires min wire version %d, but topology's min" - " wire version is %d" % (selector, - selector.min_wire_version, - common_wv)) - - if self._description.topology_type == TOPOLOGY_TYPE.Single: - # Ignore the selector. - return self._description.known_servers - elif address: - sd = self._description.server_descriptions().get(address) - return [sd] if sd else [] - elif self._description.topology_type == TOPOLOGY_TYPE.Sharded: - # Ignore the read preference, but apply localThresholdMS. - return self._apply_local_threshold(self._new_selection()) - else: - return self._apply_local_threshold(selector(self._new_selection())) - - def _apply_local_threshold(self, selection): - """Return list of servers from Selection that are in latency window.""" - if not selection: - return [] - - # Round trip time in seconds. - fastest = min(s.round_trip_time for s in selection.server_descriptions) - threshold = self._settings.local_threshold_ms / 1000.0 - return [s for s in selection.server_descriptions - if (s.round_trip_time - fastest) <= threshold] - def _update_servers(self): """Sync our Servers from TopologyDescription.server_descriptions. diff --git a/pymongo/topology_description.py b/pymongo/topology_description.py index cf8237563..0f93ab6ef 100644 --- a/pymongo/topology_description.py +++ b/pymongo/topology_description.py @@ -17,9 +17,11 @@ from collections import namedtuple from pymongo import common -from pymongo.server_type import SERVER_TYPE from pymongo.errors import ConfigurationError +from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription +from pymongo.server_selectors import Selection +from pymongo.server_type import SERVER_TYPE TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary', @@ -160,6 +162,71 @@ class TopologyDescription(object): def heartbeat_frequency(self): return self._topology_settings.heartbeat_frequency + def apply_selector(self, selector, address): + + def apply_local_threshold(selection): + if not selection: + return [] + + settings = self._topology_settings + + # Round trip time in seconds. + fastest = min( + s.round_trip_time for s in selection.server_descriptions) + threshold = settings.local_threshold_ms / 1000.0 + return [s for s in selection.server_descriptions + if (s.round_trip_time - fastest) <= threshold] + + if getattr(selector, 'min_wire_version', 0): + common_wv = self.common_wire_version + if common_wv and common_wv < selector.min_wire_version: + raise ConfigurationError( + "%s requires min wire version %d, but topology's min" + " wire version is %d" % (selector, + selector.min_wire_version, + common_wv)) + + if self.topology_type == TOPOLOGY_TYPE.Single: + # Ignore the selector. + return self.known_servers + elif address: + description = self.server_descriptions().get(address) + return [description] if description else [] + elif self.topology_type == TOPOLOGY_TYPE.Sharded: + # Ignore the read preference, but apply localThresholdMS. + return apply_local_threshold( + Selection.from_topology_description(self)) + else: + return apply_local_threshold( + selector(Selection.from_topology_description(self))) + + def has_readable_server(self, read_preference=ReadPreference.PRIMARY): + """Does this topology have any readable servers available matching the + given read preference? + + :Parameters: + - `read_preference`: an instance of a read preference from + :mod:`~pymongo.read_preferences`. Defaults to + :attr:`~pymongo.ReadPreference.PRIMARY`. + + .. note:: When connected directly to a single server this method + always returns ``True``. + + .. versionadded:: 3.4 + """ + common.validate_read_preference("read_preference", read_preference) + return any(self.apply_selector(read_preference, None)) + + def has_writable_server(self): + """Does this topology have a writable server available? + + .. note:: When connected directly to a single server this method + always returns ``True``. + + .. versionadded:: 3.4 + """ + return self.has_readable_server(ReadPreference.PRIMARY) + # If topology type is Unknown and we receive an ismaster response, what should # the new topology type be? diff --git a/test/test_topology.py b/test/test_topology.py index 65a6e6fb1..dc7a8ff29 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -222,6 +222,14 @@ class TestSingleServerTopology(TopologyTest): s = t.select_server(writable_server_selector) self.assertEqual(server_type, s.description.server_type) + # Topology type single is always readable and writable regardless + # of server type or state. + self.assertTrue(t.description.has_writable_server()) + self.assertTrue(t.description.has_readable_server()) + self.assertTrue(t.description.has_readable_server(Secondary())) + self.assertTrue(t.description.has_readable_server( + Secondary(tag_sets=[{'tag': 'does-not-exist'}]))) + def test_reopen(self): t = create_mock_topology() @@ -290,6 +298,75 @@ class TestSingleServerTopology(TopologyTest): class TestMultiServerTopology(TopologyTest): + def test_readable_writable(self): + t = create_mock_topology(replica_set_name='rs') + got_ismaster(t, ('a', 27017), { + 'ok': 1, + 'ismaster': True, + 'setName': 'rs', + 'hosts': ['a', 'b']}) + + got_ismaster(t, ('b', 27017), { + 'ok': 1, + 'ismaster': False, + 'secondary': True, + 'setName': 'rs', + 'hosts': ['a', 'b']}) + + self.assertTrue(t.description.has_writable_server()) + self.assertTrue(t.description.has_readable_server()) + self.assertTrue( + t.description.has_readable_server(Secondary())) + self.assertFalse( + t.description.has_readable_server( + Secondary(tag_sets=[{'tag': 'exists'}]))) + + t = create_mock_topology(replica_set_name='rs') + got_ismaster(t, ('a', 27017), { + 'ok': 1, + 'ismaster': False, + 'secondary': False, + 'setName': 'rs', + 'hosts': ['a', 'b']}) + + got_ismaster(t, ('b', 27017), { + 'ok': 1, + 'ismaster': False, + 'secondary': True, + 'setName': 'rs', + 'hosts': ['a', 'b']}) + + self.assertFalse(t.description.has_writable_server()) + self.assertFalse(t.description.has_readable_server()) + self.assertTrue( + t.description.has_readable_server(Secondary())) + self.assertFalse( + t.description.has_readable_server( + Secondary(tag_sets=[{'tag': 'exists'}]))) + + t = create_mock_topology(replica_set_name='rs') + got_ismaster(t, ('a', 27017), { + 'ok': 1, + 'ismaster': True, + 'setName': 'rs', + 'hosts': ['a', 'b']}) + + got_ismaster(t, ('b', 27017), { + 'ok': 1, + 'ismaster': False, + 'secondary': True, + 'setName': 'rs', + 'hosts': ['a', 'b'], + 'tags': {'tag': 'exists'}}) + + self.assertTrue(t.description.has_writable_server()) + self.assertTrue(t.description.has_readable_server()) + self.assertTrue( + t.description.has_readable_server(Secondary())) + self.assertTrue( + t.description.has_readable_server( + Secondary(tag_sets=[{'tag': 'exists'}]))) + def test_close(self): t = create_mock_topology(replica_set_name='rs') got_ismaster(t, ('a', 27017), {