PYTHON-2395 Consider connection pool health during server selection (#515)

Change the driver to maintain a count of in-progress operations to each
server (per client). When selecting a mongos server, the driver now picks
2 suitable servers at random and selects the server with fewer in-progress
operations. Previously, the driver selected a mongos server at random.
The new behavior is intended to route operations away from unhealthy or
slow servers in highly concurrent single client workloads.

PYTHON-2460 Only reset Pool.active_sockets to 0 after a fork()
This commit is contained in:
Shane Harvey 2020-12-07 10:41:39 -10:00 committed by GitHub
parent ac07e0f4e2
commit e95d2187b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 632 additions and 31 deletions

View File

@ -1111,15 +1111,20 @@ class Pool:
if self.enabled_for_cmap:
self.opts.event_listeners.publish_pool_created(
self.address, self.opts.non_default_options)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count = 0
def _reset(self, close):
with self.lock:
if self.closed:
return
self.generation += 1
self.pid = os.getpid()
newpid = os.getpid()
if self.pid != newpid:
self.pid = newpid
self.active_sockets = 0
self.operation_count = 0
sockets, self.sockets = self.sockets, collections.deque()
self.active_sockets = 0
if close:
self.closed = True
@ -1300,6 +1305,9 @@ class Pool:
'Attempted to check out a connection from closed connection '
'pool')
with self.lock:
self.operation_count += 1
# Get a free socket or create one.
if self.opts.wait_queue_timeout:
deadline = _time() + self.opts.wait_queue_timeout
@ -1396,6 +1404,7 @@ class Pool:
self._socket_semaphore.release()
with self.lock:
self.active_sockets -= 1
self.operation_count -= 1
def _perished(self, sock_info):
"""Return True and close the connection if it is "perished".

View File

@ -238,9 +238,15 @@ class Topology(object):
server_selection_timeout=None,
address=None):
"""Like select_servers, but choose a random server if several match."""
return random.choice(self.select_servers(selector,
server_selection_timeout,
address))
servers = self.select_servers(
selector, server_selection_timeout, address)
if len(servers) == 1:
return servers[0]
server1, server2 = random.sample(servers, 2)
if server1.pool.operation_count <= server2.pool.operation_count:
return server1
else:
return server2
def select_server_by_address(self, address,
server_selection_timeout=None):

View File

@ -0,0 +1,46 @@
{
"description": "When in equilibrium selection is evenly distributed",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 5
},
{
"address": "b:27017",
"operation_count": 5
},
{
"address": "c:27017",
"operation_count": 5
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.33,
"b:27017": 0.33,
"c:27017": 0.33
}
}
}

View File

@ -0,0 +1,106 @@
{
"description": "Selections from many choices occur at correct frequencies",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "d:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "e:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "f:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "g:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "h:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "i:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 0
},
{
"address": "b:27017",
"operation_count": 5
},
{
"address": "c:27017",
"operation_count": 5
},
{
"address": "d:27017",
"operation_count": 10
},
{
"address": "e:27017",
"operation_count": 10
},
{
"address": "f:27017",
"operation_count": 20
},
{
"address": "g:27017",
"operation_count": 20
},
{
"address": "h:27017",
"operation_count": 50
},
{
"address": "i:27017",
"operation_count": 60
}
],
"iterations": 10000,
"outcome": {
"tolerance": 0.03,
"expected_frequencies": {
"a:27017": 0.22,
"b:27017": 0.18,
"c:27017": 0.18,
"d:27017": 0.125,
"e:27017": 0.125,
"f:27017": 0.074,
"g:27017": 0.074,
"h:27017": 0.0277,
"i:27017": 0
}
}
}

View File

@ -0,0 +1,46 @@
{
"description": "Least operations gets most selections, two tied share the rest",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 16
},
{
"address": "b:27017",
"operation_count": 10
},
{
"address": "c:27017",
"operation_count": 16
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.165,
"b:27017": 0.66,
"c:27017": 0.165
}
}
}

View File

@ -0,0 +1,46 @@
{
"description": "When in equilibrium selection is evenly distributed (replica set)",
"topology_description": {
"type": "ReplicaSetWithPrimary",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "RSSecondary"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 6
},
{
"address": "b:27017",
"operation_count": 6
},
{
"address": "c:27017",
"operation_count": 6
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.33,
"b:27017": 0.33,
"c:27017": 0.33
}
}
}

View File

@ -0,0 +1,46 @@
{
"description": "Selections from three servers occur at proper distributions (replica set)",
"topology_description": {
"type": "ReplicaSetWithPrimary",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "RSPrimary"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "RSSecondary"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "RSSecondary"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 3
},
{
"address": "b:27017",
"operation_count": 6
},
{
"address": "c:27017",
"operation_count": 20
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.66,
"b:27017": 0.33,
"c:27017": 0
}
}
}

View File

@ -0,0 +1,46 @@
{
"description": "Selections from three servers occur at proper distributions",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 3
},
{
"address": "b:27017",
"operation_count": 6
},
{
"address": "c:27017",
"operation_count": 20
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.66,
"b:27017": 0.33,
"c:27017": 0
}
}
}

View File

@ -0,0 +1,36 @@
{
"description": "Better of two choices always selected",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 0
},
{
"address": "b:27017",
"operation_count": 5
}
],
"iterations": 100,
"outcome": {
"tolerance": 0,
"expected_frequencies": {
"a:27017": 1,
"b:27017": 0
}
}
}

View File

@ -0,0 +1,46 @@
{
"description": "Two tied for least operations share all selections",
"topology_description": {
"type": "Sharded",
"servers": [
{
"address": "a:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "b:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
},
{
"address": "c:27017",
"avg_rtt_ms": 35,
"type": "Mongos"
}
]
},
"mocked_topology_state": [
{
"address": "a:27017",
"operation_count": 10
},
{
"address": "b:27017",
"operation_count": 10
},
{
"address": "c:27017",
"operation_count": 16
}
],
"iterations": 2000,
"outcome": {
"tolerance": 0.05,
"expected_frequencies": {
"a:27017": 0.5,
"b:27017": 0.5,
"c:27017": 0
}
}
}

View File

@ -0,0 +1,157 @@
# Copyright 2020-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the topology module's Server Selection Spec implementation."""
import os
import threading
from pymongo.common import clean_node
from pymongo.read_preferences import ReadPreference
from test import client_context, IntegrationTest, unittest
from test.utils_selection_tests import create_topology
from test.utils import TestCreator, rs_client, OvertCommandListener
# Location of JSON test specifications.
TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
os.path.join('server_selection', 'in_window'))
class TestAllScenarios(unittest.TestCase):
def run_scenario(self, scenario_def):
topology = create_topology(scenario_def)
# Update mock operation_count state:
for mock in scenario_def['mocked_topology_state']:
address = clean_node(mock['address'])
server = topology.get_server_by_address(address)
server.pool.operation_count = mock['operation_count']
pref = ReadPreference.NEAREST
counts = dict((address, 0) for address in
topology._description.server_descriptions())
# Number of times to repeat server selection
iterations = scenario_def['iterations']
for _ in range(iterations):
server = topology.select_server(pref, server_selection_timeout=0)
counts[server.description.address] += 1
# Verify expected_frequencies
outcome = scenario_def['outcome']
tolerance = outcome['tolerance']
expected_frequencies = outcome['expected_frequencies']
for host_str, freq in expected_frequencies.items():
address = clean_node(host_str)
actual_freq = float(counts[address])/iterations
if freq == 0:
# Should be exactly 0.
self.assertEqual(actual_freq, 0)
else:
# Should be within 'tolerance'.
self.assertAlmostEqual(actual_freq, freq, delta=tolerance)
def create_test(scenario_def, test, name):
def run_scenario(self):
self.run_scenario(scenario_def)
return run_scenario
class CustomTestCreator(TestCreator):
def tests(self, scenario_def):
"""Extract the tests from a spec file.
Server selection in_window tests do not have a 'tests' field.
The whole file represents a single test case.
"""
return [scenario_def]
CustomTestCreator(create_test, TestAllScenarios, TEST_PATH).create_tests()
class FinderThread(threading.Thread):
def __init__(self, collection, iterations):
super(FinderThread, self).__init__()
self.daemon = True
self.collection = collection
self.iterations = iterations
self.passed = False
def run(self):
for _ in range(self.iterations):
self.collection.find_one({})
self.passed = True
class TestProse(IntegrationTest):
def frequencies(self, client, listener):
coll = client.test.test
N_FINDS = 10
N_THREADS = 10
threads = [FinderThread(coll, N_FINDS) for _ in range(N_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
events = listener.results['started']
self.assertEqual(len(events), N_FINDS * N_THREADS)
nodes = client.nodes
self.assertEqual(len(nodes), 2)
freqs = {address: 0 for address in nodes}
for event in events:
freqs[event.connection_id] += 1
for address in freqs:
freqs[address] = freqs[address]/float(len(events))
return freqs
@client_context.require_failCommand_appName
@client_context.require_multiple_mongoses
def test_load_balancing(self):
listener = OvertCommandListener()
client = rs_client(client_context.mongos_seeds(),
appName='loadBalancingTest',
event_listeners=[listener])
self.addCleanup(client.close)
# Delay find commands on
delay_finds = {
'configureFailPoint': 'failCommand',
'mode': {'times': 10000},
'data': {
'failCommands': ['find'],
'blockConnection': True,
'blockTimeMS': 500,
'appName': 'loadBalancingTest',
},
}
with self.fail_point(delay_finds):
nodes = client_context.client.nodes
self.assertEqual(len(nodes), 1)
delayed_server = next(iter(nodes))
freqs = self.frequencies(client, listener)
self.assertLessEqual(freqs[delayed_server], 0.25)
listener.reset()
freqs = self.frequencies(client, listener)
self.assertAlmostEqual(freqs[delayed_server], 0.50, delta=0.15)
if __name__ == "__main__":
unittest.main()

View File

@ -233,10 +233,11 @@ class MockSocketInfo(object):
class MockPool(object):
def __init__(self, *args, **kwargs):
def __init__(self, address, options, handshake=True):
self.generation = 0
self._lock = threading.Lock()
self.opts = PoolOptions()
self.opts = options
self.operation_count = 0
def get_socket(self, all_credentials, checkout=False):
return MockSocketInfo()

View File

@ -130,40 +130,50 @@ def get_topology_settings_dict(**kwargs):
return settings
def create_topology(scenario_def, **kwargs):
# Initialize topologies.
if 'heartbeatFrequencyMS' in scenario_def:
frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0
else:
frequency = HEARTBEAT_FREQUENCY
seeds, hosts = get_addresses(
scenario_def['topology_description']['servers'])
settings = get_topology_settings_dict(
heartbeat_frequency=frequency,
seeds=seeds,
**kwargs
)
# "Eligible servers" is defined in the server selection spec as
# the set of servers matching both the ReadPreference's mode
# and tag sets.
topology = Topology(TopologySettings(**settings))
topology.open()
# Update topologies with server descriptions.
for server in scenario_def['topology_description']['servers']:
server_description = make_server_description(server, hosts)
topology.on_change(server_description)
return topology
def create_test(scenario_def):
def run_scenario(self):
# Initialize topologies.
if 'heartbeatFrequencyMS' in scenario_def:
frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0
else:
frequency = HEARTBEAT_FREQUENCY
seeds, hosts = get_addresses(
_, hosts = get_addresses(
scenario_def['topology_description']['servers'])
settings = get_topology_settings_dict(
heartbeat_frequency=frequency,
seeds=seeds
)
# "Eligible servers" is defined in the server selection spec as
# the set of servers matching both the ReadPreference's mode
# and tag sets.
top_latency = Topology(TopologySettings(**settings))
top_latency.open()
top_latency = create_topology(scenario_def)
# "In latency window" is defined in the server selection
# spec as the subset of suitable_servers that falls within the
# allowable latency window.
settings['local_threshold_ms'] = 1000000
top_suitable = Topology(TopologySettings(**settings))
top_suitable.open()
# Update topologies with server descriptions.
for server in scenario_def['topology_description']['servers']:
server_description = make_server_description(server, hosts)
top_suitable.on_change(server_description)
top_latency.on_change(server_description)
top_suitable = create_topology(
scenario_def, local_threshold_ms=1000000)
# Create server selector.
if scenario_def.get("operation") == "write":