diff --git a/doc/api/pymongo/mongo_client.rst b/doc/api/pymongo/mongo_client.rst index 68d43fc9d..189ac1677 100644 --- a/doc/api/pymongo/mongo_client.rst +++ b/doc/api/pymongo/mongo_client.rst @@ -14,6 +14,7 @@ Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used. + .. autoattribute:: address .. autoattribute:: host .. autoattribute:: port .. autoattribute:: is_primary diff --git a/doc/changelog.rst b/doc/changelog.rst index 930653059..57c436821 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -57,6 +57,14 @@ A list of multiple standalones is no longer supported; if multiple servers are listed they must be members of the same replica set, or mongoses in the same sharded cluster. +The behavior for a list of mongoses is changed from "high availability" to +"load balancing". Before, the client connected to the lowest-latency mongos in +the list, and used it until a network error prompted it to re-evaluate all +mongoses' latencies and reconnect to one of them. In PyMongo 3, the client +monitors its network latency to all the mongoses continuously, and distributes +operations evenly among those with the lowest latency. +See :ref:`mongos-load-balancing` for more information. + The client methods ``start_request``, ``in_request``, and ``end_request`` are removed, and so is the ``auto_start_request`` option. Requests were designed to make read-your-writes consistency more likely with the ``w=0`` @@ -666,8 +674,7 @@ Important New Features: - Support for expanded read preferences including directing reads to tagged servers - See :ref:`secondary-reads` for more information. -- Support for mongos failover - - See :ref:`mongos-high-availability` for more information. +- Support for mongos failover. - A new :meth:`~pymongo.collection.Collection.aggregate` method to support MongoDB's new `aggregation framework `_. diff --git a/doc/examples/high_availability.rst b/doc/examples/high_availability.rst index d8bcec068..1b8018120 100644 --- a/doc/examples/high_availability.rst +++ b/doc/examples/high_availability.rst @@ -256,6 +256,8 @@ from any member that matches the mode, ignoring tags." See :mod:`~pymongo.read_preferences` for more information. +.. _distributes reads to secondaries: + **local_threshold_ms**: If multiple members match the mode and tag sets, PyMongo reads @@ -288,64 +290,36 @@ monitor the replica set for changes in: Replica-set monitoring ensures queries are continually routed to the proper members as the state of the replica set changes. -.. _mongos-high-availability: +.. _mongos-load-balancing: -High Availability and mongos ----------------------------- - -.. warning:: The documentation below is obsolete. It awaits a new - spec for how MongoDB drivers connect to multiple mongoses. +mongos Load Balancing +--------------------- An instance of :class:`~pymongo.mongo_client.MongoClient` can be configured -to automatically connect to a different mongos if the instance it is -currently connected to fails. If a failure occurs, PyMongo will attempt -to find the nearest mongos to perform subsequent operations. As with a -replica set this can't happen completely transparently. Here we'll perform -an example failover to illustrate how everything behaves. First, we'll -connect to a sharded cluster, using a seed list, and perform a couple of -basic operations:: +with a list of mongos servers: - >>> db = MongoClient('localhost:30000,localhost:30001,localhost:30002').test - >>> db.test.insert_one({'x': 1}) - ObjectId('...') - >>> db.test.find_one() - {u'x': 1, u'_id': ObjectId('...')} + >>> client = MongoClient('mongodb://host1,host2,host3') -Each member of the seed list passed to MongoClient must be a mongos. By checking -the host, port, and is_mongos attributes we can see that we're connected to -*localhost:30001*, a mongos:: +Each member of the list must be a mongos server. The client continuously +monitors all the mongoses' availability, and its network latency to each. - >>> db.client.host - 'localhost' - >>> db.client.port - 30001 - >>> db.client.is_mongos - True +PyMongo distributes operations evenly among the set of mongoses within its +``localThresholdMS`` (similar to how it `distributes reads to secondaries`_ +in a replica set). By default the threshold is 15 ms. -Now let's shut down that mongos instance and see what happens when we run our -query again:: +The lowest-latency server, and all servers with latencies no more than +``localThresholdMS`` beyond the lowest-latency server's, receive +operations equally. For example, if we have three mongoses: - >>> db.test.find_one() - Traceback (most recent call last): - pymongo.errors.AutoReconnect: ... + - host1: 20 ms + - host2: 35 ms + - host3: 40 ms -As in the replica set example earlier in this document, we get -an :class:`~pymongo.errors.AutoReconnect` exception. This means -that the driver was not able to connect to the original mongos at port -30001 (which makes sense, since we shut it down), but that it will -attempt to connect to a new mongos on subsequent operations. When this -exception is raised our application code needs to decide whether to retry -the operation or to simply continue, accepting the fact that the operation -might have failed. +By default the ``localThresholdMS`` is 15 ms, so PyMongo uses host1 and host2 +evenly. It uses host1 because its network latency to the driver is shortest. It +uses host2 because its latency is within 15 ms of the lowest-latency server's. +But it excuses host3: host3 is 20ms beyond the lowest-latency server. -As long as one of the seed list members is still available the next -operation will succeed:: +If we set ``localThresholdMS`` to 30 ms all servers are within the threshold: - >>> db.test.find_one() - {u'x': 1, u'_id': ObjectId('...')} - >>> db.client.host - 'localhost' - >>> db.client.port - 30002 - >>> db.client.is_mongos - True + >>> client = MongoClient('mongodb://host1,host2,host3/?localThresholdMS=30') diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 78950ef0c..428a0d4a1 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -14,9 +14,8 @@ """Tools for connecting to MongoDB. -.. seealso:: :doc:`/examples/high_availability` for an example of how to - connect to a replica set, or specify a list of mongos instances for - automatic failover. +.. seealso:: :doc:`/examples/high_availability` for examples of connecting + to replica sets or sets of mongos servers. To get a :class:`~pymongo.database.Database` instance from a :class:`MongoClient` use either dictionary-style or attribute-style @@ -52,6 +51,7 @@ from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, DuplicateKeyError, + InvalidOperation, InvalidURI, NetworkTimeout, NotMasterError, @@ -238,6 +238,15 @@ class MongoClient(common.BaseObject): servers are listed they must be members of the same replica set, or mongoses in the same sharded cluster. + The behavior for a list of mongoses is changed from "high + availability" to "load balancing". Before, the client connected to + the lowest-latency mongos in the list, and used it until a network + error prompted it to re-evaluate all mongoses' latencies and + reconnect to one of them. In PyMongo 3, the client monitors its + network latency to all the mongoses continuously, and distributes + operations evenly among those with the lowest latency. See + :ref:`mongos-load-balancing` for more information. + The ``connect`` option is added. The ``start_request``, ``in_request``, and ``end_request`` methods @@ -479,9 +488,19 @@ class MongoClient(common.BaseObject): def address(self): """(host, port) of the current standalone, primary, or mongos, or None. + Accessing :attr:`address` raises :exc:`~.errors.InvalidOperation` if + the client is load-balancing among mongoses, since there is no single + address. Use :attr:`nodes` instead. + .. versionadded:: 3.0 """ - return self._server_property('address') + try: + return self._topology.get_direct_or_primary() + except InvalidOperation: + # Only one case where Topology throws InvalidOperation. + raise InvalidOperation( + 'Cannot use "address" property when load balancing among' + ' mongoses, use "nodes" instead.') @property def primary(self): diff --git a/pymongo/topology.py b/pymongo/topology.py index f00f70839..9b5659a4e 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -24,7 +24,7 @@ from pymongo.pool import PoolOptions from pymongo.topology_description import (updated_topology_description, TOPOLOGY_TYPE, TopologyDescription) -from pymongo.errors import AutoReconnect +from pymongo.errors import AutoReconnect, InvalidOperation from pymongo.server import Server from pymongo.server_selectors import (address_server_selector, apply_local_threshold, @@ -183,6 +183,23 @@ class Topology(object): descriptions = selector(self._description.known_servers) return set([d.address for d in descriptions]) + def get_direct_or_primary(self): + """Return the address of a connected primary or standalone, or None. + + Raise InvalidOperation for Sharded topologies. + """ + # Implemented here in Topology instead of MongoClient, so it can lock. + with self._lock: + topology_type = self._description.topology_type + if topology_type == TOPOLOGY_TYPE.Sharded: + raise InvalidOperation() + if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary, + TOPOLOGY_TYPE.Single): + return None + descriptions = writable_server_selector( + self._description.known_servers) + return descriptions[0].address if descriptions else None + def get_secondaries(self): """Return set of secondary addresses.""" return self._get_replica_set_members(secondary_server_selector) diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index 966bc7e85..e88cc8a9e 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -28,12 +28,13 @@ from pymongo.common import partition_node from pymongo.errors import (AutoReconnect, OperationFailure, ConnectionFailure, + InvalidOperation, WTimeoutError) from pymongo.mongo_client import MongoClient from pymongo.read_preferences import ReadPreference from pymongo.server_description import ServerDescription from pymongo.write_concern import WriteConcern -from test import SkipTest, unittest, utils, client_knobs +from test import unittest, utils, client_knobs from test.utils import one, wait_until, connected from test.version import Version @@ -814,46 +815,51 @@ class TestAlive(HATestCase): self.assertFalse(secondary_cx.alive()) self.assertFalse(rsc.alive()) - -class TestMongosHighAvailability(HATestCase): + +class TestMongosLoadBalancing(HATestCase): def setUp(self): - super(TestMongosHighAvailability, self).setUp() - - raise SkipTest( - 'Mongos HA may be replaced with load balancing in PyMongo 3') - + super(TestMongosLoadBalancing, self).setUp() seed_list = ha_tools.create_sharded_cluster() + self.assertIsNotNone(seed_list) self.dbname = 'pymongo_mongos_ha' self.client = MongoClient(seed_list) self.client.drop_database(self.dbname) - def test_mongos_ha(self): + def test_mongos_load_balancing(self): + wait_until(lambda: len(ha_tools.routers) == len(self.client.nodes), + 'discover all mongoses') + + # Can't access "address" when load balancing. + with self.assertRaises(InvalidOperation): + self.client.address + coll = self.client[self.dbname].test coll.insert_one({'foo': 'bar'}) - first = '%s:%d' % (self.client.host, self.client.port) - ha_tools.kill_mongos(first) - # Fail first attempt - self.assertRaises(AutoReconnect, coll.count) - # Find new mongos - self.assertEqual(1, coll.count()) + live_routers = list(ha_tools.routers) + ha_tools.kill_mongos(live_routers.pop()) + while live_routers: + try: + self.assertEqual(1, coll.count()) + except ConnectionFailure: + # If first attempt happened to select the dead mongos. + self.assertEqual(1, coll.count()) - second = '%s:%d' % (self.client.host, self.client.port) - self.assertNotEqual(first, second) - ha_tools.kill_mongos(second) - # Fail first attempt - self.assertRaises(AutoReconnect, coll.count) - # Find new mongos - self.assertEqual(1, coll.count()) + wait_until(lambda: len(live_routers) == len(self.client.nodes), + 'remove dead mongos', + timeout=30) + ha_tools.kill_mongos(live_routers.pop()) - third = '%s:%d' % (self.client.host, self.client.port) - self.assertNotEqual(second, third) - ha_tools.kill_mongos(third) - # Fail first attempt - self.assertRaises(AutoReconnect, coll.count) + # Make sure the last one's really dead. + time.sleep(1) - # We've killed all three, restart one. - ha_tools.restart_mongos(first) + # I'm alone. + self.assertRaises(ConnectionFailure, coll.count) + wait_until(lambda: 0 == len(self.client.nodes), + 'remove dead mongos', + timeout=30) + + ha_tools.restart_mongos(one(ha_tools.routers)) # Find new mongos self.assertEqual(1, coll.count()) diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 5539bef81..7785d8b6f 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -78,8 +78,8 @@ class MockMonitor(Monitor): def _check_once(self): address = self._server_description.address - response = self.client.mock_is_master('%s:%d' % address) - return ServerDescription(address, IsMaster(response), 0) + response, rtt = self.client.mock_is_master('%s:%d' % address) + return ServerDescription(address, IsMaster(response), rtt) class MockClient(MongoClient): @@ -117,6 +117,9 @@ class MockClient(MongoClient): # Hostname -> max write batch size self.mock_max_write_batch_sizes = {} + # Hostname -> round trip time + self.mock_rtts = {} + kwargs['_pool_class'] = partial(MockPool, self) kwargs['_monitor_class'] = partial(MockMonitor, self) @@ -137,6 +140,7 @@ class MockClient(MongoClient): self.mock_max_write_batch_sizes[host] = size def mock_is_master(self, host): + """Return mock ismaster response (a dict) and round trip time.""" min_wire_version, max_wire_version = self.mock_wire_versions.get( host, (common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION)) @@ -144,19 +148,20 @@ class MockClient(MongoClient): max_write_batch_size = self.mock_max_write_batch_sizes.get( host, common.MAX_WRITE_BATCH_SIZE) + rtt = self.mock_rtts.get(host, 0) + # host is like 'a:1'. if host in self.mock_down_hosts: raise socket.timeout('mock timeout') - if host in self.mock_standalones: - return { + elif host in self.mock_standalones: + response = { 'ok': 1, 'ismaster': True, 'minWireVersion': min_wire_version, 'maxWireVersion': max_wire_version, 'maxWriteBatchSize': max_write_batch_size} - - if host in self.mock_members: + elif host in self.mock_members: ismaster = (host == self.mock_primary) # Simulate a replica set member. @@ -172,21 +177,20 @@ class MockClient(MongoClient): if self.mock_primary: response['primary'] = self.mock_primary - - return response - - if host in self.mock_mongoses: - return { + elif host in self.mock_mongoses: + response = { 'ok': 1, 'ismaster': True, 'minWireVersion': min_wire_version, 'maxWireVersion': max_wire_version, 'msg': 'isdbgrid', 'maxWriteBatchSize': max_write_batch_size} + else: + # In test_internal_ips(), we try to connect to a host listed + # in ismaster['hosts'] but not publicly accessible. + raise socket.error('Unknown host: %s' % host) - # In test_internal_ips(), we try to connect to a host listed - # in ismaster['hosts'] but not publicly accessible. - raise socket.error('Unknown host: %s' % host) + return response, rtt def _process_kill_cursors_queue(self): # Avoid the background thread causing races, e.g. a surprising diff --git a/test/test_mongos_ha.py b/test/test_mongos_ha.py index 71d8955fd..19dd1729f 100644 --- a/test/test_mongos_ha.py +++ b/test/test_mongos_ha.py @@ -12,15 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Test MongoClient's mongos high-availability features using a mock.""" +"""Test MongoClient's mongos load balancing using a mock.""" import sys import threading +import time sys.path[0:0] = [""] -from pymongo.errors import AutoReconnect -from test import unittest, client_context, SkipTest, MockClientTest +from pymongo.errors import AutoReconnect, InvalidOperation +from pymongo.server_selectors import writable_server_selector +from pymongo.topology_description import TOPOLOGY_TYPE +from test import unittest, client_context, MockClientTest from test.pymongo_mocks import MockClient from test.utils import connected, wait_until @@ -54,27 +57,39 @@ def do_simple_op(client, nthreads): assert t.passed -class TestMongosHA(MockClientTest): +def writable_addresses(topology): + return set(server.description.address for server in + topology.select_servers(writable_server_selector)) - def mock_client(self): - return MockClient( + +class TestMongosLoadBalancing(MockClientTest): + + def mock_client(self, **kwargs): + mock_client = MockClient( standalones=[], members=[], mongoses=['a:1', 'b:2', 'c:3'], host='a:1,b:2,c:3', - connect=False) - - def test_lazy_connect(self): - # TODO: Reimplement Mongos HA with PyMongo 3's MongoClient. - raise SkipTest('Mongos HA must be reimplemented with 3.0 MongoClient') + connect=False, + **kwargs) + # Latencies in seconds. + mock_client.mock_rtts['a:1'] = 0.020 + mock_client.mock_rtts['b:2'] = 0.025 + mock_client.mock_rtts['c:3'] = 0.040 + return mock_client + + def test_lazy_connect(self): + # While connected() ensures we can trigger connection from the main + # thread and wait for the monitors, this test triggers connection from + # several threads at once to check for data races. nthreads = 10 client = self.mock_client() self.assertEqual(0, len(client.nodes)) # Trigger initial connection. do_simple_op(client, nthreads) - self.assertEqual(3, len(client.nodes)) + wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses') def test_reconnect(self): nthreads = 10 @@ -92,35 +107,21 @@ class TestMongosHA(MockClientTest): 'reconnect to all mongoses') def test_failover(self): - # TODO: PyMongo 3's MongoClient currently picks a new Mongos at random - # for each operation (besides getMore). Need to "pin". - raise SkipTest('Mongos HA must be reimplemented with 3.0 MongoClient') - - nthreads = 1 - - # ['1:1', '2:2', '3:3', ...] - mock_hosts = ['%d:%d' % (i, i) for i in range(50)] - client = MockClient( - standalones=[], - members=[], - mongoses=mock_hosts, - host=','.join(mock_hosts)) - - self.assertEqual(len(mock_hosts), len(client.nodes)) + nthreads = 10 + client = connected(self.mock_client(localThresholdMS=0.001)) + wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses') # Our chosen mongos goes down. - client.kill_host('%s:%s' % (client.host, client.port)) + client.kill_host('a:1') - # Trigger failover. AutoReconnect should be raised exactly once. - errors = [] + # Trigger failover to higher-latency nodes. AutoReconnect should be + # raised at most once in each thread. passed = [] def f(): try: client.db.command('ismaster') except AutoReconnect: - errors.append(True) - # Second attempt succeeds. client.db.command('ismaster') @@ -133,11 +134,45 @@ class TestMongosHA(MockClientTest): for t in threads: t.join() - self.assertEqual(1, len(errors)) self.assertEqual(nthreads, len(passed)) - # Down host is still in list. - self.assertEqual(len(mock_hosts), len(client.nodes)) + # Down host removed from list. + self.assertEqual(2, len(client.nodes)) + + def test_local_threshold(self): + client = connected(self.mock_client(localThresholdMS=30)) + wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses') + topology = client._topology + + # All are within a 30-ms latency window, see self.mock_client(). + self.assertEqual(set([('a', 1), ('b', 2), ('c', 3)]), + writable_addresses(topology)) + + def test_load_balancing(self): + # Although the server selection JSON tests already prove that + # select_servers works for sharded topologies, here we do an end-to-end + # test of discovering servers' round trip times and configuring + # localThresholdMS. + client = connected(self.mock_client()) + wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses') + + # Prohibited for topology type Sharded. + with self.assertRaises(InvalidOperation): + client.address + + topology = client._topology + self.assertEqual(TOPOLOGY_TYPE.Sharded, + topology.description.topology_type) + + # a and b are within the 15-ms latency window, see self.mock_client(). + self.assertEqual(set([('a', 1), ('b', 2)]), + writable_addresses(topology)) + + client.mock_rtts['a:1'] = 0.040 + + # Discover only b is within latency window. + wait_until(lambda: set([('b', 2)]) == writable_addresses(topology), + 'discover server "a" is too far') if __name__ == "__main__":