From e95d2187b64d37973f62ad1afd223cebfc485a9b Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 7 Dec 2020 10:41:39 -1000 Subject: [PATCH] PYTHON-2395 Consider connection pool health during server selection (#515) Change the driver to maintain a count of in-progress operations to each server (per client). When selecting a mongos server, the driver now picks 2 suitable servers at random and selects the server with fewer in-progress operations. Previously, the driver selected a mongos server at random. The new behavior is intended to route operations away from unhealthy or slow servers in highly concurrent single client workloads. PYTHON-2460 Only reset Pool.active_sockets to 0 after a fork() --- pymongo/pool.py | 13 +- pymongo/topology.py | 12 +- .../in_window/equilibrium.json | 46 +++++ .../in_window/many-choices.json | 106 ++++++++++++ .../in_window/one-least-two-tied.json | 46 +++++ .../in_window/rs-equilibrium.json | 46 +++++ .../in_window/rs-three-choices.json | 46 +++++ .../in_window/three-choices.json | 46 +++++ .../in_window/two-choices.json | 36 ++++ .../server_selection/in_window/two-least.json | 46 +++++ test/test_server_selection_in_window.py | 157 ++++++++++++++++++ test/utils.py | 5 +- test/utils_selection_tests.py | 58 ++++--- 13 files changed, 632 insertions(+), 31 deletions(-) create mode 100644 test/server_selection/in_window/equilibrium.json create mode 100644 test/server_selection/in_window/many-choices.json create mode 100644 test/server_selection/in_window/one-least-two-tied.json create mode 100644 test/server_selection/in_window/rs-equilibrium.json create mode 100644 test/server_selection/in_window/rs-three-choices.json create mode 100644 test/server_selection/in_window/three-choices.json create mode 100644 test/server_selection/in_window/two-choices.json create mode 100644 test/server_selection/in_window/two-least.json create mode 100644 test/test_server_selection_in_window.py diff --git a/pymongo/pool.py b/pymongo/pool.py index 529e35934..20164bb8a 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -1111,15 +1111,20 @@ class Pool: if self.enabled_for_cmap: self.opts.event_listeners.publish_pool_created( self.address, self.opts.non_default_options) + # Similar to active_sockets but includes threads in the wait queue. + self.operation_count = 0 def _reset(self, close): with self.lock: if self.closed: return self.generation += 1 - self.pid = os.getpid() + newpid = os.getpid() + if self.pid != newpid: + self.pid = newpid + self.active_sockets = 0 + self.operation_count = 0 sockets, self.sockets = self.sockets, collections.deque() - self.active_sockets = 0 if close: self.closed = True @@ -1300,6 +1305,9 @@ class Pool: 'Attempted to check out a connection from closed connection ' 'pool') + with self.lock: + self.operation_count += 1 + # Get a free socket or create one. if self.opts.wait_queue_timeout: deadline = _time() + self.opts.wait_queue_timeout @@ -1396,6 +1404,7 @@ class Pool: self._socket_semaphore.release() with self.lock: self.active_sockets -= 1 + self.operation_count -= 1 def _perished(self, sock_info): """Return True and close the connection if it is "perished". diff --git a/pymongo/topology.py b/pymongo/topology.py index eb84a344e..20b8bbc08 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -238,9 +238,15 @@ class Topology(object): server_selection_timeout=None, address=None): """Like select_servers, but choose a random server if several match.""" - return random.choice(self.select_servers(selector, - server_selection_timeout, - address)) + servers = self.select_servers( + selector, server_selection_timeout, address) + if len(servers) == 1: + return servers[0] + server1, server2 = random.sample(servers, 2) + if server1.pool.operation_count <= server2.pool.operation_count: + return server1 + else: + return server2 def select_server_by_address(self, address, server_selection_timeout=None): diff --git a/test/server_selection/in_window/equilibrium.json b/test/server_selection/in_window/equilibrium.json new file mode 100644 index 000000000..c5f177d49 --- /dev/null +++ b/test/server_selection/in_window/equilibrium.json @@ -0,0 +1,46 @@ +{ + "description": "When in equilibrium selection is evenly distributed", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 5 + }, + { + "address": "b:27017", + "operation_count": 5 + }, + { + "address": "c:27017", + "operation_count": 5 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.33, + "b:27017": 0.33, + "c:27017": 0.33 + } + } +} diff --git a/test/server_selection/in_window/many-choices.json b/test/server_selection/in_window/many-choices.json new file mode 100644 index 000000000..7e940513e --- /dev/null +++ b/test/server_selection/in_window/many-choices.json @@ -0,0 +1,106 @@ +{ + "description": "Selections from many choices occur at correct frequencies", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "d:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "e:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "f:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "g:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "h:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "i:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 0 + }, + { + "address": "b:27017", + "operation_count": 5 + }, + { + "address": "c:27017", + "operation_count": 5 + }, + { + "address": "d:27017", + "operation_count": 10 + }, + { + "address": "e:27017", + "operation_count": 10 + }, + { + "address": "f:27017", + "operation_count": 20 + }, + { + "address": "g:27017", + "operation_count": 20 + }, + { + "address": "h:27017", + "operation_count": 50 + }, + { + "address": "i:27017", + "operation_count": 60 + } + ], + "iterations": 10000, + "outcome": { + "tolerance": 0.03, + "expected_frequencies": { + "a:27017": 0.22, + "b:27017": 0.18, + "c:27017": 0.18, + "d:27017": 0.125, + "e:27017": 0.125, + "f:27017": 0.074, + "g:27017": 0.074, + "h:27017": 0.0277, + "i:27017": 0 + } + } +} diff --git a/test/server_selection/in_window/one-least-two-tied.json b/test/server_selection/in_window/one-least-two-tied.json new file mode 100644 index 000000000..ed7526e71 --- /dev/null +++ b/test/server_selection/in_window/one-least-two-tied.json @@ -0,0 +1,46 @@ +{ + "description": "Least operations gets most selections, two tied share the rest", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 16 + }, + { + "address": "b:27017", + "operation_count": 10 + }, + { + "address": "c:27017", + "operation_count": 16 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.165, + "b:27017": 0.66, + "c:27017": 0.165 + } + } +} diff --git a/test/server_selection/in_window/rs-equilibrium.json b/test/server_selection/in_window/rs-equilibrium.json new file mode 100644 index 000000000..61c6687e5 --- /dev/null +++ b/test/server_selection/in_window/rs-equilibrium.json @@ -0,0 +1,46 @@ +{ + "description": "When in equilibrium selection is evenly distributed (replica set)", + "topology_description": { + "type": "ReplicaSetWithPrimary", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "RSSecondary" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 6 + }, + { + "address": "b:27017", + "operation_count": 6 + }, + { + "address": "c:27017", + "operation_count": 6 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.33, + "b:27017": 0.33, + "c:27017": 0.33 + } + } +} diff --git a/test/server_selection/in_window/rs-three-choices.json b/test/server_selection/in_window/rs-three-choices.json new file mode 100644 index 000000000..3fdc15205 --- /dev/null +++ b/test/server_selection/in_window/rs-three-choices.json @@ -0,0 +1,46 @@ +{ + "description": "Selections from three servers occur at proper distributions (replica set)", + "topology_description": { + "type": "ReplicaSetWithPrimary", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "RSPrimary" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "RSSecondary" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "RSSecondary" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 3 + }, + { + "address": "b:27017", + "operation_count": 6 + }, + { + "address": "c:27017", + "operation_count": 20 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.66, + "b:27017": 0.33, + "c:27017": 0 + } + } +} diff --git a/test/server_selection/in_window/three-choices.json b/test/server_selection/in_window/three-choices.json new file mode 100644 index 000000000..7b5b41454 --- /dev/null +++ b/test/server_selection/in_window/three-choices.json @@ -0,0 +1,46 @@ +{ + "description": "Selections from three servers occur at proper distributions", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 3 + }, + { + "address": "b:27017", + "operation_count": 6 + }, + { + "address": "c:27017", + "operation_count": 20 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.66, + "b:27017": 0.33, + "c:27017": 0 + } + } +} diff --git a/test/server_selection/in_window/two-choices.json b/test/server_selection/in_window/two-choices.json new file mode 100644 index 000000000..2c7a605d8 --- /dev/null +++ b/test/server_selection/in_window/two-choices.json @@ -0,0 +1,36 @@ +{ + "description": "Better of two choices always selected", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 0 + }, + { + "address": "b:27017", + "operation_count": 5 + } + ], + "iterations": 100, + "outcome": { + "tolerance": 0, + "expected_frequencies": { + "a:27017": 1, + "b:27017": 0 + } + } +} diff --git a/test/server_selection/in_window/two-least.json b/test/server_selection/in_window/two-least.json new file mode 100644 index 000000000..73214fc64 --- /dev/null +++ b/test/server_selection/in_window/two-least.json @@ -0,0 +1,46 @@ +{ + "description": "Two tied for least operations share all selections", + "topology_description": { + "type": "Sharded", + "servers": [ + { + "address": "a:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "b:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + }, + { + "address": "c:27017", + "avg_rtt_ms": 35, + "type": "Mongos" + } + ] + }, + "mocked_topology_state": [ + { + "address": "a:27017", + "operation_count": 10 + }, + { + "address": "b:27017", + "operation_count": 10 + }, + { + "address": "c:27017", + "operation_count": 16 + } + ], + "iterations": 2000, + "outcome": { + "tolerance": 0.05, + "expected_frequencies": { + "a:27017": 0.5, + "b:27017": 0.5, + "c:27017": 0 + } + } +} diff --git a/test/test_server_selection_in_window.py b/test/test_server_selection_in_window.py new file mode 100644 index 000000000..bdd778e81 --- /dev/null +++ b/test/test_server_selection_in_window.py @@ -0,0 +1,157 @@ +# Copyright 2020-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the topology module's Server Selection Spec implementation.""" + +import os +import threading + +from pymongo.common import clean_node +from pymongo.read_preferences import ReadPreference +from test import client_context, IntegrationTest, unittest +from test.utils_selection_tests import create_topology +from test.utils import TestCreator, rs_client, OvertCommandListener + + +# Location of JSON test specifications. +TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + os.path.join('server_selection', 'in_window')) + + +class TestAllScenarios(unittest.TestCase): + def run_scenario(self, scenario_def): + topology = create_topology(scenario_def) + + # Update mock operation_count state: + for mock in scenario_def['mocked_topology_state']: + address = clean_node(mock['address']) + server = topology.get_server_by_address(address) + server.pool.operation_count = mock['operation_count'] + + pref = ReadPreference.NEAREST + counts = dict((address, 0) for address in + topology._description.server_descriptions()) + + # Number of times to repeat server selection + iterations = scenario_def['iterations'] + for _ in range(iterations): + server = topology.select_server(pref, server_selection_timeout=0) + counts[server.description.address] += 1 + + # Verify expected_frequencies + outcome = scenario_def['outcome'] + tolerance = outcome['tolerance'] + expected_frequencies = outcome['expected_frequencies'] + for host_str, freq in expected_frequencies.items(): + address = clean_node(host_str) + actual_freq = float(counts[address])/iterations + if freq == 0: + # Should be exactly 0. + self.assertEqual(actual_freq, 0) + else: + # Should be within 'tolerance'. + self.assertAlmostEqual(actual_freq, freq, delta=tolerance) + + +def create_test(scenario_def, test, name): + def run_scenario(self): + self.run_scenario(scenario_def) + + return run_scenario + + +class CustomTestCreator(TestCreator): + def tests(self, scenario_def): + """Extract the tests from a spec file. + + Server selection in_window tests do not have a 'tests' field. + The whole file represents a single test case. + """ + return [scenario_def] + + +CustomTestCreator(create_test, TestAllScenarios, TEST_PATH).create_tests() + + +class FinderThread(threading.Thread): + def __init__(self, collection, iterations): + super(FinderThread, self).__init__() + self.daemon = True + self.collection = collection + self.iterations = iterations + self.passed = False + + def run(self): + for _ in range(self.iterations): + self.collection.find_one({}) + self.passed = True + + +class TestProse(IntegrationTest): + def frequencies(self, client, listener): + coll = client.test.test + N_FINDS = 10 + N_THREADS = 10 + threads = [FinderThread(coll, N_FINDS) for _ in range(N_THREADS)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + events = listener.results['started'] + self.assertEqual(len(events), N_FINDS * N_THREADS) + nodes = client.nodes + self.assertEqual(len(nodes), 2) + freqs = {address: 0 for address in nodes} + for event in events: + freqs[event.connection_id] += 1 + for address in freqs: + freqs[address] = freqs[address]/float(len(events)) + return freqs + + @client_context.require_failCommand_appName + @client_context.require_multiple_mongoses + def test_load_balancing(self): + listener = OvertCommandListener() + client = rs_client(client_context.mongos_seeds(), + appName='loadBalancingTest', + event_listeners=[listener]) + self.addCleanup(client.close) + # Delay find commands on + delay_finds = { + 'configureFailPoint': 'failCommand', + 'mode': {'times': 10000}, + 'data': { + 'failCommands': ['find'], + 'blockConnection': True, + 'blockTimeMS': 500, + 'appName': 'loadBalancingTest', + }, + } + with self.fail_point(delay_finds): + nodes = client_context.client.nodes + self.assertEqual(len(nodes), 1) + delayed_server = next(iter(nodes)) + freqs = self.frequencies(client, listener) + self.assertLessEqual(freqs[delayed_server], 0.25) + listener.reset() + freqs = self.frequencies(client, listener) + self.assertAlmostEqual(freqs[delayed_server], 0.50, delta=0.15) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/utils.py b/test/utils.py index 2f5b84554..3e24c684f 100644 --- a/test/utils.py +++ b/test/utils.py @@ -233,10 +233,11 @@ class MockSocketInfo(object): class MockPool(object): - def __init__(self, *args, **kwargs): + def __init__(self, address, options, handshake=True): self.generation = 0 self._lock = threading.Lock() - self.opts = PoolOptions() + self.opts = options + self.operation_count = 0 def get_socket(self, all_credentials, checkout=False): return MockSocketInfo() diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index 8bea70ae3..0d4edb085 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -130,40 +130,50 @@ def get_topology_settings_dict(**kwargs): return settings +def create_topology(scenario_def, **kwargs): + # Initialize topologies. + if 'heartbeatFrequencyMS' in scenario_def: + frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0 + else: + frequency = HEARTBEAT_FREQUENCY + + seeds, hosts = get_addresses( + scenario_def['topology_description']['servers']) + + settings = get_topology_settings_dict( + heartbeat_frequency=frequency, + seeds=seeds, + **kwargs + ) + + # "Eligible servers" is defined in the server selection spec as + # the set of servers matching both the ReadPreference's mode + # and tag sets. + topology = Topology(TopologySettings(**settings)) + topology.open() + + # Update topologies with server descriptions. + for server in scenario_def['topology_description']['servers']: + server_description = make_server_description(server, hosts) + topology.on_change(server_description) + + return topology + + def create_test(scenario_def): def run_scenario(self): - # Initialize topologies. - if 'heartbeatFrequencyMS' in scenario_def: - frequency = int(scenario_def['heartbeatFrequencyMS']) / 1000.0 - else: - frequency = HEARTBEAT_FREQUENCY - - seeds, hosts = get_addresses( + _, hosts = get_addresses( scenario_def['topology_description']['servers']) - - settings = get_topology_settings_dict( - heartbeat_frequency=frequency, - seeds=seeds - ) - # "Eligible servers" is defined in the server selection spec as # the set of servers matching both the ReadPreference's mode # and tag sets. - top_latency = Topology(TopologySettings(**settings)) - top_latency.open() + top_latency = create_topology(scenario_def) # "In latency window" is defined in the server selection # spec as the subset of suitable_servers that falls within the # allowable latency window. - settings['local_threshold_ms'] = 1000000 - top_suitable = Topology(TopologySettings(**settings)) - top_suitable.open() - - # Update topologies with server descriptions. - for server in scenario_def['topology_description']['servers']: - server_description = make_server_description(server, hosts) - top_suitable.on_change(server_description) - top_latency.on_change(server_description) + top_suitable = create_topology( + scenario_def, local_threshold_ms=1000000) # Create server selector. if scenario_def.get("operation") == "write":