From d1fd3f7e982f5d1ae3e8b8b8edeba7e146bb02f8 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 23 Nov 2020 15:55:54 -0800 Subject: [PATCH] PYTHON-2363 Rate limit new connection creations via maxConnecting (#511) At most 2 connections can be in the pending state per connection pool. The pending state covers all the work required to setup a new connection including TCP, TLS, and MongoDB authentication. For example, if two threads are currently creating connections, a third thread will wait for either an existing connection to be checked back into the pool or for one of the two threads to finish creating a connection. The change reduces the likelihood of connection storms and improves the driver's ability to reuse existing connections. --- doc/faq.rst | 23 +++- pymongo/common.py | 3 + pymongo/pool.py | 94 ++++++++++++-- ...ol-checkout-maxConnecting-is-enforced.json | 104 +++++++++++++++ .../pool-checkout-maxConnecting-timeout.json | 98 +++++++++++++++ ...out-returned-connection-maxConnecting.json | 119 ++++++++++++++++++ test/test_cmap.py | 21 +++- test/test_pooling.py | 34 +++++ test/utils_spec_runner.py | 4 +- 9 files changed, 477 insertions(+), 23 deletions(-) create mode 100644 test/cmap/pool-checkout-maxConnecting-is-enforced.json create mode 100644 test/cmap/pool-checkout-maxConnecting-timeout.json create mode 100644 test/cmap/pool-checkout-returned-connection-maxConnecting.json diff --git a/doc/faq.rst b/doc/faq.rst index 8e820229a..dc9973e27 100644 --- a/doc/faq.rst +++ b/doc/faq.rst @@ -58,17 +58,32 @@ to 100. If there are ``maxPoolSize`` connections to a server and all are in use, the next request to that server will wait until one of the connections becomes available. -The client instance opens one additional socket per server in your MongoDB +The client instance opens two additional sockets per server in your MongoDB topology for monitoring the server's state. -For example, a client connected to a 3-node replica set opens 3 monitoring +For example, a client connected to a 3-node replica set opens 6 monitoring sockets. It also opens as many sockets as needed to support a multi-threaded application's concurrent operations on each server, up to ``maxPoolSize``. With a ``maxPoolSize`` of 100, if the application only uses the primary (the default), then only the primary connection pool grows and the total connections -is at most 103. If the application uses a +is at most 106. If the application uses a :class:`~pymongo.read_preferences.ReadPreference` to query the secondaries, -their pools also grow and the total connections can reach 303. +their pools also grow and the total connections can reach 306. + +Additionally, the pools are rate limited such that each connection pool can +only create at most 2 connections in parallel at any time. The connection +creation covers covers all the work required to setup a new connection +including DNS, TCP, SSL/TLS, MongoDB handshake, and MongoDB authentication. +For example, if three threads concurrently attempt to check out a connection +from an empty pool, the first two threads will begin creating new connections +while the third thread will wait. The third thread stops waiting when either: + +- one of the first two threads finishes creating a connection, or +- an existing connection is checked back into the pool. + +Rate limiting concurrent connection creation reduces the likelihood of +connection storms and improves the driver's ability to reuse existing +connections. It is possible to set the minimum number of concurrent connections to each server with ``minPoolSize``, which defaults to 0. The connection pool will be diff --git a/pymongo/common.py b/pymongo/common.py index cab9d526c..81555ef39 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -89,6 +89,9 @@ MAX_POOL_SIZE = 100 # Default value for minPoolSize. MIN_POOL_SIZE = 0 +# The maximum number of concurrent connection creation attempts per pool. +MAX_CONNECTING = 2 + # Default value for maxIdleTimeMS. MAX_IDLE_TIME_MS = None diff --git a/pymongo/pool.py b/pymongo/pool.py index 9aed75845..529e35934 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -28,11 +28,12 @@ from pymongo.ssl_support import ( IPADDR_SAFE as _IPADDR_SAFE) from bson import DEFAULT_CODEC_OPTIONS -from bson.py3compat import imap, itervalues, _unicode +from bson.py3compat import imap, itervalues, _unicode, PY3 from bson.son import SON from pymongo import auth, helpers, thread_util, __version__ from pymongo.client_session import _validate_session_write_concern from pymongo.common import (MAX_BSON_SIZE, + MAX_CONNECTING, MAX_IDLE_TIME_SEC, MAX_MESSAGE_SIZE, MAX_POOL_SIZE, @@ -285,6 +286,20 @@ def _raise_connection_failure(address, error, msg_prefix=None): else: raise AutoReconnect(msg) +if PY3: + def _cond_wait(condition, deadline): + timeout = deadline - _time() if deadline else None + return condition.wait(timeout) +else: + def _cond_wait(condition, deadline): + timeout = deadline - _time() if deadline else None + condition.wait(timeout) + # Python 2.7 always returns False for wait(), + # manually check for a timeout. + if timeout and _time() >= deadline: + return False + return True + class PoolOptions(object): @@ -294,7 +309,7 @@ class PoolOptions(object): '__wait_queue_timeout', '__wait_queue_multiple', '__ssl_context', '__ssl_match_hostname', '__socket_keepalive', '__event_listeners', '__appname', '__driver', '__metadata', - '__compression_settings') + '__compression_settings', '__max_connecting') def __init__(self, max_pool_size=MAX_POOL_SIZE, min_pool_size=MIN_POOL_SIZE, @@ -303,7 +318,7 @@ class PoolOptions(object): wait_queue_multiple=None, ssl_context=None, ssl_match_hostname=True, socket_keepalive=True, event_listeners=None, appname=None, driver=None, - compression_settings=None): + compression_settings=None, max_connecting=MAX_CONNECTING): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -319,6 +334,7 @@ class PoolOptions(object): self.__appname = appname self.__driver = driver self.__compression_settings = compression_settings + self.__max_connecting = max_connecting self.__metadata = copy.deepcopy(_METADATA) if appname: self.__metadata['application'] = {'name': appname} @@ -357,6 +373,8 @@ class PoolOptions(object): opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000 if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT: opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000 + if self.__max_connecting != MAX_CONNECTING: + opts['maxConnecting'] = self.__max_connecting return opts @property @@ -381,6 +399,13 @@ class PoolOptions(object): """ return self.__min_pool_size + @property + def max_connecting(self): + """The maximum number of concurrent connection creation attempts per + pool. Defaults to 2. + """ + return self.__max_connecting + @property def max_idle_time_seconds(self): """The maximum number of seconds that a connection can remain @@ -1080,6 +1105,9 @@ class Pool: self._socket_semaphore = thread_util.create_semaphore( self.opts.max_pool_size, max_waiters) + self._max_connecting_cond = threading.Condition(self.lock) + self._max_connecting = self.opts.max_connecting + self._pending = 0 if self.enabled_for_cmap: self.opts.event_listeners.publish_pool_created( self.address, self.opts.non_default_options) @@ -1143,21 +1171,34 @@ class Pool: if (len(self.sockets) + self.active_sockets >= self.opts.min_pool_size): # There are enough sockets in the pool. - break + return # We must acquire the semaphore to respect max_pool_size. if not self._socket_semaphore.acquire(False): - break + return + incremented = False try: + with self._max_connecting_cond: + # If maxConnecting connections are already being created + # by this pool then try again later instead of waiting. + if self._pending >= self._max_connecting: + return + self._pending += 1 + incremented = True sock_info = self.connect(all_credentials) with self.lock: # Close connection and return if the pool was reset during # socket creation or while acquiring the pool lock. if self.generation != reference_generation: sock_info.close_socket(ConnectionClosedReason.STALE) - break + return self.sockets.appendleft(sock_info) finally: + if incremented: + # Notify after adding the socket to the pool. + with self._max_connecting_cond: + self._pending -= 1 + self._max_connecting_cond.notify() self._socket_semaphore.release() def connect(self, all_credentials=None): @@ -1260,6 +1301,10 @@ class Pool: 'pool') # Get a free socket or create one. + if self.opts.wait_queue_timeout: + deadline = _time() + self.opts.wait_queue_timeout + else: + deadline = None if not self._socket_semaphore.acquire( True, self.opts.wait_queue_timeout): self._raise_wait_queue_timeout() @@ -1267,21 +1312,42 @@ class Pool: # We've now acquired the semaphore and must release it on error. sock_info = None incremented = False + emitted_event = False try: with self.lock: self.active_sockets += 1 incremented = True while sock_info is None: - try: - with self.lock: + # CMAP: we MUST wait for either maxConnecting OR for a socket + # to be checked back into the pool. + with self._max_connecting_cond: + while not (self.sockets or + self._pending < self._max_connecting): + if not _cond_wait(self._max_connecting_cond, deadline): + # Timed out, notify the next thread to ensure a + # timeout doesn't consume the condition. + if (self.sockets or + self._pending < self._max_connecting): + self._max_connecting_cond.notify() + emitted_event = True + self._raise_wait_queue_timeout() + + try: sock_info = self.sockets.popleft() - except IndexError: - # Can raise ConnectionFailure or CertificateError. - sock_info = self.connect(all_credentials) - else: + except IndexError: + self._pending += 1 + if sock_info: # We got a socket from the pool if self._perished(sock_info): sock_info = None + continue + else: # We need to create a new connection + try: + sock_info = self.connect(all_credentials) + finally: + with self._max_connecting_cond: + self._pending -= 1 + self._max_connecting_cond.notify() sock_info.check_auth(all_credentials) except Exception: if sock_info: @@ -1293,7 +1359,7 @@ class Pool: with self.lock: self.active_sockets -= 1 - if self.enabled_for_cmap: + if self.enabled_for_cmap and not emitted_event: self.opts.event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.CONN_ERROR) raise @@ -1324,6 +1390,8 @@ class Pool: sock_info.update_last_checkin_time() sock_info.update_is_writable(self.is_writable) self.sockets.appendleft(sock_info) + # Notify any threads waiting to create a connection. + self._max_connecting_cond.notify() self._socket_semaphore.release() with self.lock: diff --git a/test/cmap/pool-checkout-maxConnecting-is-enforced.json b/test/cmap/pool-checkout-maxConnecting-is-enforced.json new file mode 100644 index 000000000..358e0801b --- /dev/null +++ b/test/cmap/pool-checkout-maxConnecting-is-enforced.json @@ -0,0 +1,104 @@ +{ + "version": 1, + "style": "integration", + "description": "maxConnecting is enforced", + "runOn": [ + { + "minServerVersion": "4.4.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 50 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 750 + } + }, + "poolOptions": { + "maxPoolSize": 10, + "waitQueueTimeoutMS": 5000 + }, + "operations": [ + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "wait", + "thread": "thread2", + "ms": 100 + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "wait", + "thread": "thread3", + "ms": 100 + }, + { + "name": "checkOut", + "thread": "thread3" + }, + { + "name": "waitForEvent", + "event": "ConnectionReady", + "count": 3 + } + ], + "events": [ + { + "type": "ConnectionCreated", + "address": 42, + "connectionId": 1 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42, + "connectionId": 1 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42 + } + ], + "ignore": [ + "ConnectionCheckOutStarted", + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionClosed", + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/pool-checkout-maxConnecting-timeout.json b/test/cmap/pool-checkout-maxConnecting-timeout.json new file mode 100644 index 000000000..ef71216ef --- /dev/null +++ b/test/cmap/pool-checkout-maxConnecting-timeout.json @@ -0,0 +1,98 @@ +{ + "version": 1, + "style": "integration", + "description": "waiting on maxConnecting is limited by WaitQueueTimeoutMS", + "runOn": [ + { + "minServerVersion": "4.4.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 50 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 750 + } + }, + "poolOptions": { + "maxPoolSize": 10, + "waitQueueTimeoutMS": 50 + }, + "operations": [ + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 2 + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "checkOut", + "thread": "thread3" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + }, + { + "name": "waitForThread", + "target": "thread3" + } + ], + "error": { + "type": "WaitQueueTimeoutError", + "message": "Timed out while checking out a connection from connection pool" + }, + "events": [ + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "timeout", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionClosed", + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/pool-checkout-returned-connection-maxConnecting.json b/test/cmap/pool-checkout-returned-connection-maxConnecting.json new file mode 100644 index 000000000..308d640f0 --- /dev/null +++ b/test/cmap/pool-checkout-returned-connection-maxConnecting.json @@ -0,0 +1,119 @@ +{ + "version": 1, + "style": "integration", + "description": "threads blocked by maxConnecting check out returned connections", + "runOn": [ + { + "minServerVersion": "4.4.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 50 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 750 + } + }, + "poolOptions": { + "maxPoolSize": 10, + "waitQueueTimeoutMS": 5000 + }, + "operations": [ + { + "name": "checkOut", + "label": "conn0" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "checkOut", + "thread": "thread3" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 4 + }, + { + "name": "wait", + "ms": 100 + }, + { + "name": "checkIn", + "connection": "conn0" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckedOut", + "count": 4 + } + ], + "events": [ + { + "type": "ConnectionCreated", + "address": 42, + "connectionId": 1 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + } + ], + "ignore": [ + "ConnectionClosed", + "ConnectionReady", + "ConnectionPoolCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/test_cmap.py b/test/test_cmap.py index bd22cdd72..b21708743 100644 --- a/test/test_cmap.py +++ b/test/test_cmap.py @@ -14,14 +14,14 @@ """Execute Transactions Spec tests.""" -import functools import os import sys import time -import threading sys.path[0:0] = [""] +from bson.son import SON + from pymongo.errors import (ConnectionFailure, OperationFailure, PyMongoError) @@ -184,13 +184,28 @@ class TestCMAP(IntegrationTest): self.check_object(actual, expected) self.assertIn(message, str(actual)) + def _set_fail_point(self, client, command_args): + cmd = SON([('configureFailPoint', 'failCommand')]) + cmd.update(command_args) + client.admin.command(cmd) + + def set_fail_point(self, command_args): + self._set_fail_point(self.client, command_args) + def run_scenario(self, scenario_def, test): """Run a CMAP spec test.""" self.assertEqual(scenario_def['version'], 1) - self.assertEqual(scenario_def['style'], 'unit') + self.assertIn(scenario_def['style'], ['unit', 'integration']) self.listener = CMAPListener() self._ops = [] + # Configure the fail point before creating the client. + if 'failPoint' in test: + fp = test['failPoint'] + self.set_fail_point(fp) + self.addCleanup(self.set_fail_point, { + 'configureFailPoint': fp['configureFailPoint'], 'mode': 'off'}) + opts = test['poolOptions'].copy() opts['event_listeners'] = [self.listener] client = single_client(**opts) diff --git a/test/test_pooling.py b/test/test_pooling.py index b1728d791..156d28103 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -379,6 +379,40 @@ class TestPooling(_TestPoolingBase): for socket_info in socks: socket_info.close_socket(None) + def test_maxConnecting(self): + client = rs_or_single_client() + self.addCleanup(client.close) + pool = get_pool(client) + docs = [] + + # Run 50 short running operations + def find_one(): + docs.append(client.test.test.find_one({'$where': delay(0.001)})) + threads = [threading.Thread(target=find_one) for _ in range(50)] + for thread in threads: + thread.start() + for thread in threads: + thread.join(10) + + self.assertEqual(len(docs), 50) + self.assertLessEqual(len(pool.sockets), 50) + # TLS and auth make connection establishment more expensive than + # the artificially delayed query which leads to more threads + # hitting maxConnecting. The end result is fewer total connections + # and better latency. + if client_context.tls and client_context.auth_enabled: + self.assertLessEqual(len(pool.sockets), 30) + else: + self.assertLessEqual(len(pool.sockets), 50) + # MongoDB 4.4.1 with auth + ssl: + # maxConnecting = 2: 6 connections in ~0.231+ seconds + # maxConnecting = unbounded: 50 connections in ~0.642+ seconds + # + # MongoDB 4.4.1 with no-auth no-ssl Python 3.8: + # maxConnecting = 2: 15-22 connections in ~0.108+ seconds + # maxConnecting = unbounded: 30+ connections in ~0.140+ seconds + print(len(pool.sockets)) + class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self): diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index 4ab4d1d10..a15537c71 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -121,11 +121,9 @@ class SpecRunner(IntegrationTest): client.admin.command(cmd) def set_fail_point(self, command_args): - cmd = SON([('configureFailPoint', 'failCommand')]) - cmd.update(command_args) clients = self.mongos_clients if self.mongos_clients else [self.client] for client in clients: - self._set_fail_point(client, cmd) + self._set_fail_point(client, command_args) def targeted_fail_point(self, session, fail_point): """Run the targetedFailPoint test operation.