PYTHON-852 - Mongos load balancing.

The behavior for a list of mongoses is changed from "high availability" to
"load balancing". Before, the client connected to the lowest-latency mongos in
the list, and used it until a network error prompted it to re-evaluate all
mongoses' latencies and reconnect to one of them. In PyMongo 3, the client
monitors its network latency to all the mongoses continuously, and distributes
operations evenly among those with the lowest latency. See
high_availability.rst.
This commit is contained in:
A. Jesse Jiryu Davis 2015-03-07 17:18:41 -05:00
parent 8ececbb1de
commit ea8ca5ec7d
8 changed files with 198 additions and 135 deletions

View File

@ -14,6 +14,7 @@
Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used.
.. autoattribute:: address
.. autoattribute:: host
.. autoattribute:: port
.. autoattribute:: is_primary

View File

@ -57,6 +57,14 @@ A list of multiple standalones is no longer supported; if multiple servers
are listed they must be members of the same replica set, or mongoses in the
same sharded cluster.
The behavior for a list of mongoses is changed from "high availability" to
"load balancing". Before, the client connected to the lowest-latency mongos in
the list, and used it until a network error prompted it to re-evaluate all
mongoses' latencies and reconnect to one of them. In PyMongo 3, the client
monitors its network latency to all the mongoses continuously, and distributes
operations evenly among those with the lowest latency.
See :ref:`mongos-load-balancing` for more information.
The client methods ``start_request``, ``in_request``, and ``end_request``
are removed, and so is the ``auto_start_request`` option. Requests were
designed to make read-your-writes consistency more likely with the ``w=0``
@ -666,8 +674,7 @@ Important New Features:
- Support for expanded read preferences including directing reads to tagged
servers - See :ref:`secondary-reads` for more information.
- Support for mongos failover -
See :ref:`mongos-high-availability` for more information.
- Support for mongos failover.
- A new :meth:`~pymongo.collection.Collection.aggregate` method to support
MongoDB's new `aggregation framework
<http://docs.mongodb.org/manual/applications/aggregation/>`_.

View File

@ -256,6 +256,8 @@ from any member that matches the mode, ignoring tags."
See :mod:`~pymongo.read_preferences` for more information.
.. _distributes reads to secondaries:
**local_threshold_ms**:
If multiple members match the mode and tag sets, PyMongo reads
@ -288,64 +290,36 @@ monitor the replica set for changes in:
Replica-set monitoring ensures queries are continually routed to the proper
members as the state of the replica set changes.
.. _mongos-high-availability:
.. _mongos-load-balancing:
High Availability and mongos
----------------------------
.. warning:: The documentation below is obsolete. It awaits a new
spec for how MongoDB drivers connect to multiple mongoses.
mongos Load Balancing
---------------------
An instance of :class:`~pymongo.mongo_client.MongoClient` can be configured
to automatically connect to a different mongos if the instance it is
currently connected to fails. If a failure occurs, PyMongo will attempt
to find the nearest mongos to perform subsequent operations. As with a
replica set this can't happen completely transparently. Here we'll perform
an example failover to illustrate how everything behaves. First, we'll
connect to a sharded cluster, using a seed list, and perform a couple of
basic operations::
with a list of mongos servers:
>>> db = MongoClient('localhost:30000,localhost:30001,localhost:30002').test
>>> db.test.insert_one({'x': 1})
ObjectId('...')
>>> db.test.find_one()
{u'x': 1, u'_id': ObjectId('...')}
>>> client = MongoClient('mongodb://host1,host2,host3')
Each member of the seed list passed to MongoClient must be a mongos. By checking
the host, port, and is_mongos attributes we can see that we're connected to
*localhost:30001*, a mongos::
Each member of the list must be a mongos server. The client continuously
monitors all the mongoses' availability, and its network latency to each.
>>> db.client.host
'localhost'
>>> db.client.port
30001
>>> db.client.is_mongos
True
PyMongo distributes operations evenly among the set of mongoses within its
``localThresholdMS`` (similar to how it `distributes reads to secondaries`_
in a replica set). By default the threshold is 15 ms.
Now let's shut down that mongos instance and see what happens when we run our
query again::
The lowest-latency server, and all servers with latencies no more than
``localThresholdMS`` beyond the lowest-latency server's, receive
operations equally. For example, if we have three mongoses:
>>> db.test.find_one()
Traceback (most recent call last):
pymongo.errors.AutoReconnect: ...
- host1: 20 ms
- host2: 35 ms
- host3: 40 ms
As in the replica set example earlier in this document, we get
an :class:`~pymongo.errors.AutoReconnect` exception. This means
that the driver was not able to connect to the original mongos at port
30001 (which makes sense, since we shut it down), but that it will
attempt to connect to a new mongos on subsequent operations. When this
exception is raised our application code needs to decide whether to retry
the operation or to simply continue, accepting the fact that the operation
might have failed.
By default the ``localThresholdMS`` is 15 ms, so PyMongo uses host1 and host2
evenly. It uses host1 because its network latency to the driver is shortest. It
uses host2 because its latency is within 15 ms of the lowest-latency server's.
But it excuses host3: host3 is 20ms beyond the lowest-latency server.
As long as one of the seed list members is still available the next
operation will succeed::
If we set ``localThresholdMS`` to 30 ms all servers are within the threshold:
>>> db.test.find_one()
{u'x': 1, u'_id': ObjectId('...')}
>>> db.client.host
'localhost'
>>> db.client.port
30002
>>> db.client.is_mongos
True
>>> client = MongoClient('mongodb://host1,host2,host3/?localThresholdMS=30')

View File

@ -14,9 +14,8 @@
"""Tools for connecting to MongoDB.
.. seealso:: :doc:`/examples/high_availability` for an example of how to
connect to a replica set, or specify a list of mongos instances for
automatic failover.
.. seealso:: :doc:`/examples/high_availability` for examples of connecting
to replica sets or sets of mongos servers.
To get a :class:`~pymongo.database.Database` instance from a
:class:`MongoClient` use either dictionary-style or attribute-style
@ -52,6 +51,7 @@ from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
DuplicateKeyError,
InvalidOperation,
InvalidURI,
NetworkTimeout,
NotMasterError,
@ -238,6 +238,15 @@ class MongoClient(common.BaseObject):
servers are listed they must be members of the same replica set, or
mongoses in the same sharded cluster.
The behavior for a list of mongoses is changed from "high
availability" to "load balancing". Before, the client connected to
the lowest-latency mongos in the list, and used it until a network
error prompted it to re-evaluate all mongoses' latencies and
reconnect to one of them. In PyMongo 3, the client monitors its
network latency to all the mongoses continuously, and distributes
operations evenly among those with the lowest latency. See
:ref:`mongos-load-balancing` for more information.
The ``connect`` option is added.
The ``start_request``, ``in_request``, and ``end_request`` methods
@ -479,9 +488,19 @@ class MongoClient(common.BaseObject):
def address(self):
"""(host, port) of the current standalone, primary, or mongos, or None.
Accessing :attr:`address` raises :exc:`~.errors.InvalidOperation` if
the client is load-balancing among mongoses, since there is no single
address. Use :attr:`nodes` instead.
.. versionadded:: 3.0
"""
return self._server_property('address')
try:
return self._topology.get_direct_or_primary()
except InvalidOperation:
# Only one case where Topology throws InvalidOperation.
raise InvalidOperation(
'Cannot use "address" property when load balancing among'
' mongoses, use "nodes" instead.')
@property
def primary(self):

View File

@ -24,7 +24,7 @@ from pymongo.pool import PoolOptions
from pymongo.topology_description import (updated_topology_description,
TOPOLOGY_TYPE,
TopologyDescription)
from pymongo.errors import AutoReconnect
from pymongo.errors import AutoReconnect, InvalidOperation
from pymongo.server import Server
from pymongo.server_selectors import (address_server_selector,
apply_local_threshold,
@ -183,6 +183,23 @@ class Topology(object):
descriptions = selector(self._description.known_servers)
return set([d.address for d in descriptions])
def get_direct_or_primary(self):
"""Return the address of a connected primary or standalone, or None.
Raise InvalidOperation for Sharded topologies.
"""
# Implemented here in Topology instead of MongoClient, so it can lock.
with self._lock:
topology_type = self._description.topology_type
if topology_type == TOPOLOGY_TYPE.Sharded:
raise InvalidOperation()
if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
TOPOLOGY_TYPE.Single):
return None
descriptions = writable_server_selector(
self._description.known_servers)
return descriptions[0].address if descriptions else None
def get_secondaries(self):
"""Return set of secondary addresses."""
return self._get_replica_set_members(secondary_server_selector)

View File

@ -28,12 +28,13 @@ from pymongo.common import partition_node
from pymongo.errors import (AutoReconnect,
OperationFailure,
ConnectionFailure,
InvalidOperation,
WTimeoutError)
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import ReadPreference
from pymongo.server_description import ServerDescription
from pymongo.write_concern import WriteConcern
from test import SkipTest, unittest, utils, client_knobs
from test import unittest, utils, client_knobs
from test.utils import one, wait_until, connected
from test.version import Version
@ -814,46 +815,51 @@ class TestAlive(HATestCase):
self.assertFalse(secondary_cx.alive())
self.assertFalse(rsc.alive())
class TestMongosHighAvailability(HATestCase):
class TestMongosLoadBalancing(HATestCase):
def setUp(self):
super(TestMongosHighAvailability, self).setUp()
raise SkipTest(
'Mongos HA may be replaced with load balancing in PyMongo 3')
super(TestMongosLoadBalancing, self).setUp()
seed_list = ha_tools.create_sharded_cluster()
self.assertIsNotNone(seed_list)
self.dbname = 'pymongo_mongos_ha'
self.client = MongoClient(seed_list)
self.client.drop_database(self.dbname)
def test_mongos_ha(self):
def test_mongos_load_balancing(self):
wait_until(lambda: len(ha_tools.routers) == len(self.client.nodes),
'discover all mongoses')
# Can't access "address" when load balancing.
with self.assertRaises(InvalidOperation):
self.client.address
coll = self.client[self.dbname].test
coll.insert_one({'foo': 'bar'})
first = '%s:%d' % (self.client.host, self.client.port)
ha_tools.kill_mongos(first)
# Fail first attempt
self.assertRaises(AutoReconnect, coll.count)
# Find new mongos
self.assertEqual(1, coll.count())
live_routers = list(ha_tools.routers)
ha_tools.kill_mongos(live_routers.pop())
while live_routers:
try:
self.assertEqual(1, coll.count())
except ConnectionFailure:
# If first attempt happened to select the dead mongos.
self.assertEqual(1, coll.count())
second = '%s:%d' % (self.client.host, self.client.port)
self.assertNotEqual(first, second)
ha_tools.kill_mongos(second)
# Fail first attempt
self.assertRaises(AutoReconnect, coll.count)
# Find new mongos
self.assertEqual(1, coll.count())
wait_until(lambda: len(live_routers) == len(self.client.nodes),
'remove dead mongos',
timeout=30)
ha_tools.kill_mongos(live_routers.pop())
third = '%s:%d' % (self.client.host, self.client.port)
self.assertNotEqual(second, third)
ha_tools.kill_mongos(third)
# Fail first attempt
self.assertRaises(AutoReconnect, coll.count)
# Make sure the last one's really dead.
time.sleep(1)
# We've killed all three, restart one.
ha_tools.restart_mongos(first)
# I'm alone.
self.assertRaises(ConnectionFailure, coll.count)
wait_until(lambda: 0 == len(self.client.nodes),
'remove dead mongos',
timeout=30)
ha_tools.restart_mongos(one(ha_tools.routers))
# Find new mongos
self.assertEqual(1, coll.count())

View File

@ -78,8 +78,8 @@ class MockMonitor(Monitor):
def _check_once(self):
address = self._server_description.address
response = self.client.mock_is_master('%s:%d' % address)
return ServerDescription(address, IsMaster(response), 0)
response, rtt = self.client.mock_is_master('%s:%d' % address)
return ServerDescription(address, IsMaster(response), rtt)
class MockClient(MongoClient):
@ -117,6 +117,9 @@ class MockClient(MongoClient):
# Hostname -> max write batch size
self.mock_max_write_batch_sizes = {}
# Hostname -> round trip time
self.mock_rtts = {}
kwargs['_pool_class'] = partial(MockPool, self)
kwargs['_monitor_class'] = partial(MockMonitor, self)
@ -137,6 +140,7 @@ class MockClient(MongoClient):
self.mock_max_write_batch_sizes[host] = size
def mock_is_master(self, host):
"""Return mock ismaster response (a dict) and round trip time."""
min_wire_version, max_wire_version = self.mock_wire_versions.get(
host,
(common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION))
@ -144,19 +148,20 @@ class MockClient(MongoClient):
max_write_batch_size = self.mock_max_write_batch_sizes.get(
host, common.MAX_WRITE_BATCH_SIZE)
rtt = self.mock_rtts.get(host, 0)
# host is like 'a:1'.
if host in self.mock_down_hosts:
raise socket.timeout('mock timeout')
if host in self.mock_standalones:
return {
elif host in self.mock_standalones:
response = {
'ok': 1,
'ismaster': True,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version,
'maxWriteBatchSize': max_write_batch_size}
if host in self.mock_members:
elif host in self.mock_members:
ismaster = (host == self.mock_primary)
# Simulate a replica set member.
@ -172,21 +177,20 @@ class MockClient(MongoClient):
if self.mock_primary:
response['primary'] = self.mock_primary
return response
if host in self.mock_mongoses:
return {
elif host in self.mock_mongoses:
response = {
'ok': 1,
'ismaster': True,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version,
'msg': 'isdbgrid',
'maxWriteBatchSize': max_write_batch_size}
else:
# In test_internal_ips(), we try to connect to a host listed
# in ismaster['hosts'] but not publicly accessible.
raise socket.error('Unknown host: %s' % host)
# In test_internal_ips(), we try to connect to a host listed
# in ismaster['hosts'] but not publicly accessible.
raise socket.error('Unknown host: %s' % host)
return response, rtt
def _process_kill_cursors_queue(self):
# Avoid the background thread causing races, e.g. a surprising

View File

@ -12,15 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test MongoClient's mongos high-availability features using a mock."""
"""Test MongoClient's mongos load balancing using a mock."""
import sys
import threading
import time
sys.path[0:0] = [""]
from pymongo.errors import AutoReconnect
from test import unittest, client_context, SkipTest, MockClientTest
from pymongo.errors import AutoReconnect, InvalidOperation
from pymongo.server_selectors import writable_server_selector
from pymongo.topology_description import TOPOLOGY_TYPE
from test import unittest, client_context, MockClientTest
from test.pymongo_mocks import MockClient
from test.utils import connected, wait_until
@ -54,27 +57,39 @@ def do_simple_op(client, nthreads):
assert t.passed
class TestMongosHA(MockClientTest):
def writable_addresses(topology):
return set(server.description.address for server in
topology.select_servers(writable_server_selector))
def mock_client(self):
return MockClient(
class TestMongosLoadBalancing(MockClientTest):
def mock_client(self, **kwargs):
mock_client = MockClient(
standalones=[],
members=[],
mongoses=['a:1', 'b:2', 'c:3'],
host='a:1,b:2,c:3',
connect=False)
def test_lazy_connect(self):
# TODO: Reimplement Mongos HA with PyMongo 3's MongoClient.
raise SkipTest('Mongos HA must be reimplemented with 3.0 MongoClient')
connect=False,
**kwargs)
# Latencies in seconds.
mock_client.mock_rtts['a:1'] = 0.020
mock_client.mock_rtts['b:2'] = 0.025
mock_client.mock_rtts['c:3'] = 0.040
return mock_client
def test_lazy_connect(self):
# While connected() ensures we can trigger connection from the main
# thread and wait for the monitors, this test triggers connection from
# several threads at once to check for data races.
nthreads = 10
client = self.mock_client()
self.assertEqual(0, len(client.nodes))
# Trigger initial connection.
do_simple_op(client, nthreads)
self.assertEqual(3, len(client.nodes))
wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses')
def test_reconnect(self):
nthreads = 10
@ -92,35 +107,21 @@ class TestMongosHA(MockClientTest):
'reconnect to all mongoses')
def test_failover(self):
# TODO: PyMongo 3's MongoClient currently picks a new Mongos at random
# for each operation (besides getMore). Need to "pin".
raise SkipTest('Mongos HA must be reimplemented with 3.0 MongoClient')
nthreads = 1
# ['1:1', '2:2', '3:3', ...]
mock_hosts = ['%d:%d' % (i, i) for i in range(50)]
client = MockClient(
standalones=[],
members=[],
mongoses=mock_hosts,
host=','.join(mock_hosts))
self.assertEqual(len(mock_hosts), len(client.nodes))
nthreads = 10
client = connected(self.mock_client(localThresholdMS=0.001))
wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses')
# Our chosen mongos goes down.
client.kill_host('%s:%s' % (client.host, client.port))
client.kill_host('a:1')
# Trigger failover. AutoReconnect should be raised exactly once.
errors = []
# Trigger failover to higher-latency nodes. AutoReconnect should be
# raised at most once in each thread.
passed = []
def f():
try:
client.db.command('ismaster')
except AutoReconnect:
errors.append(True)
# Second attempt succeeds.
client.db.command('ismaster')
@ -133,11 +134,45 @@ class TestMongosHA(MockClientTest):
for t in threads:
t.join()
self.assertEqual(1, len(errors))
self.assertEqual(nthreads, len(passed))
# Down host is still in list.
self.assertEqual(len(mock_hosts), len(client.nodes))
# Down host removed from list.
self.assertEqual(2, len(client.nodes))
def test_local_threshold(self):
client = connected(self.mock_client(localThresholdMS=30))
wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses')
topology = client._topology
# All are within a 30-ms latency window, see self.mock_client().
self.assertEqual(set([('a', 1), ('b', 2), ('c', 3)]),
writable_addresses(topology))
def test_load_balancing(self):
# Although the server selection JSON tests already prove that
# select_servers works for sharded topologies, here we do an end-to-end
# test of discovering servers' round trip times and configuring
# localThresholdMS.
client = connected(self.mock_client())
wait_until(lambda: len(client.nodes) == 3, 'connect to all mongoses')
# Prohibited for topology type Sharded.
with self.assertRaises(InvalidOperation):
client.address
topology = client._topology
self.assertEqual(TOPOLOGY_TYPE.Sharded,
topology.description.topology_type)
# a and b are within the 15-ms latency window, see self.mock_client().
self.assertEqual(set([('a', 1), ('b', 2)]),
writable_addresses(topology))
client.mock_rtts['a:1'] = 0.040
# Discover only b is within latency window.
wait_until(lambda: set([('b', 2)]) == writable_addresses(topology),
'discover server "a" is too far')
if __name__ == "__main__":