diff --git a/doc/faq.rst b/doc/faq.rst index cbd8ffe0c..1d59ae09e 100644 --- a/doc/faq.rst +++ b/doc/faq.rst @@ -28,6 +28,16 @@ all other sockets are in use and the pool has reached its maximum, the thread pauses, waiting for a socket to be returned to the pool by another thread. +It is possible to set the minimum number of concurrent connections to each +server with ``minPoolSize``, which defaults to 0. The connection pool will be +initialized with this number of sockets. If sockets are removed from the pool +and closed, causing the total number of sockets (both in use and idle) to drop +below the set minimum, more sockets will be added until the minimum is reached. + +The maximum number of milliseconds that a connection can remain idle in the +pool before being removed and replaced can be set with ``maxIdleTime``, which +defaults to `None` (no limit). + The default configuration for a :class:`~pymongo.mongo_client.MongoClient` works for most applications:: diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 86a77ecc5..c2afe9b3b 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -98,6 +98,10 @@ def _parse_ssl_options(options): def _parse_pool_options(options): """Parse connection pool options.""" max_pool_size = options.get('maxpoolsize', common.MAX_POOL_SIZE) + min_pool_size = options.get('minpoolsize', common.MIN_POOL_SIZE) + max_idle_time_ms = options.get('maxidletimems', common.MAX_IDLE_TIME_MS) + if max_pool_size is not None and min_pool_size > max_pool_size: + raise ValueError("minPoolSize must be smaller or equal to maxPoolSize") connect_timeout = options.get('connecttimeoutms', common.CONNECT_TIMEOUT) socket_keepalive = options.get('socketkeepalive', False) socket_timeout = options.get('sockettimeoutms') @@ -106,6 +110,8 @@ def _parse_pool_options(options): event_listeners = options.get('event_listeners') ssl_context, ssl_match_hostname = _parse_ssl_options(options) return PoolOptions(max_pool_size, + min_pool_size, + max_idle_time_ms, connect_timeout, socket_timeout, wait_queue_timeout, wait_queue_multiple, ssl_context, ssl_match_hostname, socket_keepalive, diff --git a/pymongo/common.py b/pymongo/common.py index 5384cafdf..9481879d4 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -64,6 +64,12 @@ CONNECT_TIMEOUT = 20.0 # Default value for maxPoolSize. MAX_POOL_SIZE = 100 +# Default value for minPoolSize. +MIN_POOL_SIZE = 0 + +# Default value for maxIdleTimeMS. +MAX_IDLE_TIME_MS = None + # Default value for localThresholdMS. LOCAL_THRESHOLD_MS = 15 @@ -441,6 +447,8 @@ URI_VALIDATORS = { 'tz_aware': validate_boolean_or_string, 'uuidrepresentation': validate_uuid_representation, 'connect': validate_boolean_or_string, + 'event_listeners': _validate_event_listeners, + 'minpoolsize': validate_non_negative_integer } TIMEOUT_VALIDATORS = { @@ -448,6 +456,7 @@ TIMEOUT_VALIDATORS = { 'sockettimeoutms': validate_timeout_or_none, 'waitqueuetimeoutms': validate_timeout_or_none, 'serverselectiontimeoutms': validate_timeout_or_zero, + 'maxidletimems': validate_timeout_or_none, } KW_VALIDATORS = { diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 547f0f5a5..504b53b6e 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -17,6 +17,8 @@ import collections import datetime import struct +import sys +import traceback import bson from bson.codec_options import CodecOptions @@ -337,3 +339,20 @@ def _fields_list_to_dict(fields, option_name): raise TypeError("%s must be a mapping or " "list of key names" % (option_name,)) + + +def _handle_exception(): + """Print exceptions raised by subscribers to stderr.""" + # Heavily influenced by logging.Handler.handleError. + + # See note here: + # https://docs.python.org/3.4/library/sys.html#sys.__stderr__ + if sys.stderr: + einfo = sys.exc_info() + try: + traceback.print_exception(einfo[0], einfo[1], einfo[2], + None, sys.stderr) + except IOError: + pass + finally: + del einfo diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index b42a94495..7b1c9e292 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -34,7 +34,6 @@ access: import contextlib import datetime import threading -import warnings import weakref from collections import defaultdict @@ -124,10 +123,16 @@ class MongoClient(common.BaseObject): | **Other optional parameters can be passed as keyword arguments:** - - `maxPoolSize` (optional): The maximum number of connections - that the pool will open simultaneously. If this is set, operations - will block if there are `maxPoolSize` outstanding connections - from the pool. Defaults to 100. Cannot be 0. + - `maxPoolSize` (optional): The maximum allowable number of + concurrent connections to each connected server. Requests to a + server will block if there are `maxPoolSize` outstanding + connections to the requested server. Defaults to 100. Cannot be 0. + - `minPoolSize` (optional): The minimum required number of concurrent + connections that the pool will maintain to each connected server. + Default is 0. + - `maxIdleTimeMS` (optional): The maximum number of milliseconds that + a connection can remain idle in the pool before being removed and + replaced. Defaults to `None` (no limit). - `socketTimeoutMS`: (integer or None) Controls how long (in milliseconds) the driver will wait for a response after sending an ordinary (non-monitoring) database operation before concluding that @@ -390,7 +395,7 @@ class MongoClient(common.BaseObject): client = self_ref() if client is None: return False # Stop the executor. - MongoClient._process_kill_cursors_queue(client) + MongoClient._process_periodic_tasks(client) return True executor = periodic_executor.PeriodicExecutor( @@ -598,15 +603,34 @@ class MongoClient(common.BaseObject): @property def max_pool_size(self): - """The maximum number of sockets the pool will open concurrently. + """The maximum allowable number of concurrent connections to each + connected server. Requests to a server will block if there are + `maxPoolSize` outstanding connections to the requested server. + Defaults to 100. Cannot be 0. - When the pool has reached `max_pool_size`, operations block waiting for - a socket to be returned to the pool. If ``waitQueueTimeoutMS`` is set, - a blocked operation will raise :exc:`~pymongo.errors.ConnectionFailure` - after a timeout. By default ``waitQueueTimeoutMS`` is not set. + When a server's pool has reached `max_pool_size`, operations for that + server block waiting for a socket to be returned to the pool. If + ``waitQueueTimeoutMS`` is set, a blocked operation will raise + :exc:`~pymongo.errors.ConnectionFailure` after a timeout. + By default ``waitQueueTimeoutMS`` is not set. """ return self.__options.pool_options.max_pool_size + @property + def min_pool_size(self): + """The minimum required number of concurrent connections that the pool + will maintain to each connected server. Default is 0. + """ + return self.__options.pool_options.min_pool_size + + @property + def max_idle_time_ms(self): + """The maximum number of milliseconds that a connection can remain + idle in the pool before being removed and replaced. Defaults to + `None` (no limit). + """ + return self.__options.pool_options.max_idle_time_ms + @property def nodes(self): """Set of all currently connected servers. @@ -945,8 +969,9 @@ class MongoClient(common.BaseObject): self.__kill_cursors_queue.append((address, cursor_ids)) # This method is run periodically by a background thread. - def _process_kill_cursors_queue(self): - """Process any pending kill cursors requests.""" + def _process_periodic_tasks(self): + """Process any pending kill cursors requests and + maintain connection pool parameters.""" address_to_cursor_ids = defaultdict(list) # Other threads or the GC may append to the queue concurrently. @@ -1018,9 +1043,12 @@ class MongoClient(common.BaseObject): duration, reply, 'killCursors', request_id, address) - except ConnectionFailure as exc: - warnings.warn("couldn't close cursor on %s: %s" - % (address, exc)) + except Exception: + helpers._handle_exception() + try: + self._topology.update_pool() + except Exception: + helpers._handle_exception() def server_info(self): """Get information about the MongoDB server we're connected to.""" diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 693c5fc0c..2173e3a44 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -70,6 +70,7 @@ import sys import traceback from collections import namedtuple, Sequence +from pymongo.helpers import _handle_exception _Listeners = namedtuple('Listeners', ('command_listeners',)) @@ -133,22 +134,6 @@ def register(listener): _LISTENERS.command_listeners.append(listener) -def _handle_exception(): - """Print exceptions raised by subscribers to stderr.""" - # Heavily influenced by logging.Handler.handleError. - - # See note here: - # https://docs.python.org/3.4/library/sys.html#sys.__stderr__ - if sys.stderr: - einfo = sys.exc_info() - try: - traceback.print_exception(einfo[0], einfo[1], einfo[2], - None, sys.stderr) - except IOError: - pass - finally: - del einfo - # Note - to avoid bugs from forgetting which if these is all lowercase and # which are camelCase, and at the same time avoid having to add a test for # every command, use all lowercase here and test against command_name.lower(). diff --git a/pymongo/pool.py b/pymongo/pool.py index 904c6b19b..73f3b0ce0 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -67,18 +67,22 @@ def _raise_connection_failure(address, error): class PoolOptions(object): - __slots__ = ('__max_pool_size', '__connect_timeout', '__socket_timeout', + __slots__ = ('__max_pool_size', '__min_pool_size', '__max_idle_time_ms', + '__connect_timeout', '__socket_timeout', '__wait_queue_timeout', '__wait_queue_multiple', '__ssl_context', '__ssl_match_hostname', '__socket_keepalive', '__event_listeners') - def __init__(self, max_pool_size=100, connect_timeout=None, + def __init__(self, max_pool_size=100, min_pool_size=0, + max_idle_time_ms=None, connect_timeout=None, socket_timeout=None, wait_queue_timeout=None, wait_queue_multiple=None, ssl_context=None, ssl_match_hostname=True, socket_keepalive=False, event_listeners=None): self.__max_pool_size = max_pool_size + self.__min_pool_size = min_pool_size + self.__max_idle_time_ms = max_idle_time_ms self.__connect_timeout = connect_timeout self.__socket_timeout = socket_timeout self.__wait_queue_timeout = wait_queue_timeout @@ -90,12 +94,34 @@ class PoolOptions(object): @property def max_pool_size(self): - """The maximum number of connections that the pool will open - simultaneously. If this is set, operations will block if there - are `max_pool_size` outstanding connections. + """The maximum allowable number of concurrent connections to each + connected server. Requests to a server will block if there are + `maxPoolSize` outstanding connections to the requested server. + Defaults to 100. Cannot be 0. + + When a server's pool has reached `max_pool_size`, operations for that + server block waiting for a socket to be returned to the pool. If + ``waitQueueTimeoutMS`` is set, a blocked operation will raise + :exc:`~pymongo.errors.ConnectionFailure` after a timeout. + By default ``waitQueueTimeoutMS`` is not set. """ return self.__max_pool_size + @property + def min_pool_size(self): + """The minimum required number of concurrent connections that the pool + will maintain to each connected server. Default is 0. + """ + return self.__min_pool_size + + @property + def max_idle_time_ms(self): + """The maximum number of milliseconds that a connection can remain + idle in the pool before being removed and replaced. Defaults to + `None` (no limit). + """ + return self.__max_idle_time_ms + @property def connect_timeout(self): """How long a connection can take to be opened before timing out. @@ -459,6 +485,7 @@ class Pool: self.sockets = set() self.lock = threading.Lock() + self.active_sockets = 0 # Keep track of resets, so we notice sockets created before the most # recent reset and close them. @@ -483,10 +510,26 @@ class Pool: self.pool_id += 1 self.pid = os.getpid() sockets, self.sockets = self.sockets, set() + self.active_sockets = 0 for sock_info in sockets: sock_info.close() + def remove_stale_sockets(self): + with self.lock: + if self.opts.max_idle_time_ms is not None: + for sock_info in self.sockets.copy(): + age = _time() - sock_info.last_checkout + if age > self.opts.max_idle_time_ms: + self.sockets.remove(sock_info) + sock_info.close() + + while len( + self.sockets) + self.active_sockets < self.opts.min_pool_size: + sock_info = self.connect() + with self.lock: + self.sockets.add(sock_info) + def connect(self): """Connect to Mongo and return a new SocketInfo. @@ -560,6 +603,8 @@ class Pool: if not self._socket_semaphore.acquire( True, self.opts.wait_queue_timeout): self._raise_wait_queue_timeout() + with self.lock: + self.active_sockets += 1 # We've now acquired the semaphore and must release it on error. try: @@ -571,6 +616,12 @@ class Pool: except KeyError: # Can raise ConnectionFailure or CertificateError. sock_info, from_pool = self.connect(), False + # If socket is idle, open a new one. + if self.opts.max_idle_time_ms is not None: + age = _time() - sock_info.last_checkout + if age > self.opts.max_idle_time_ms: + sock_info.close() + sock_info, from_pool = self.connect(), False if from_pool: # Can raise ConnectionFailure. @@ -578,6 +629,8 @@ class Pool: except: self._socket_semaphore.release() + with self.lock: + self.active_sockets -= 1 raise sock_info.last_checkout = _time() @@ -595,6 +648,8 @@ class Pool: self.sockets.add(sock_info) self._socket_semaphore.release() + with self.lock: + self.active_sockets -= 1 def _check(self, sock_info): """This side-effecty function checks if this pool has been reset since diff --git a/pymongo/topology.py b/pymongo/topology.py index 20cd64d05..bec295131 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -254,6 +254,12 @@ class Topology(object): self._reset_server(address) self._request_check(address) + def update_pool(self): + # Remove any stale sockets and add new sockets if pool is too small. + with self._lock: + for server in self._servers.values(): + server._pool.remove_stale_sockets() + def close(self): """Clear pools and terminate monitors. Topology reopens on demand.""" with self._lock: diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 2d77debf7..c312ef892 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -192,7 +192,7 @@ class MockClient(MongoClient): return response, rtt - def _process_kill_cursors_queue(self): + def _process_periodic_tasks(self): # Avoid the background thread causing races, e.g. a surprising # reconnect while we're trying to test a disconnected client. pass diff --git a/test/test_client.py b/test/test_client.py index 541536d60..7a72456e4 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -187,6 +187,77 @@ class ClientUnitTest(unittest.TestCase): class TestClient(IntegrationTest): + def test_max_idle_time_reaper(self): + with client_knobs(kill_cursor_frequency=0.1): + # Assert reaper doesn't remove sockets when maxIdleTimeMS not set + client = MongoClient(host, port) + server = client._get_topology().select_server(any_server_selector) + with server._pool.get_socket({}) as sock_info: + pass + time.sleep(1) + self.assertEqual(1, len(server._pool.sockets)) + self.assertTrue(sock_info in server._pool.sockets) + + # Assert reaper removes idle socket and replaces it with a new one + client = MongoClient(host, port, maxIdleTimeMS=.5, minPoolSize=1) + server = client._get_topology().select_server(any_server_selector) + with server._pool.get_socket({}) as sock_info: + pass + time.sleep(2) + self.assertEqual(1, len(server._pool.sockets)) + self.assertFalse(sock_info in server._pool.sockets) + + # Assert reaper has removed idle socket and NOT replaced it + client = MongoClient(host, port, maxIdleTimeMS=.5) + server = client._get_topology().select_server(any_server_selector) + with server._pool.get_socket({}): + pass + time.sleep(1) + self.assertEqual(0, len(server._pool.sockets)) + + def test_min_pool_size(self): + with client_knobs(kill_cursor_frequency=.1): + client = MongoClient(host, port) + server = client._get_topology().select_server(any_server_selector) + time.sleep(1) + self.assertEqual(0, len(server._pool.sockets)) + + # Assert that pool started up at minPoolSize + client = MongoClient(host, port, minPoolSize=10) + server = client._get_topology().select_server(any_server_selector) + time.sleep(1) + self.assertEqual(10, len(server._pool.sockets)) + + # Assert that if a socket is closed, a new one takes its place + with server._pool.get_socket({}) as sock_info: + sock_info.close() + time.sleep(1) + self.assertEqual(10, len(server._pool.sockets)) + self.assertFalse(sock_info in server._pool.sockets) + + def test_max_idle_time_checkout(self): + with client_knobs(kill_cursor_frequency=99999999): + client = MongoClient(host, port, maxIdleTimeMS=.5) + time.sleep(1) + server = client._get_topology().select_server(any_server_selector) + with server._pool.get_socket({}) as sock_info: + pass + time.sleep(1) + with server._pool.get_socket({}) as new_sock_info: + self.assertNotEqual(sock_info, new_sock_info) + self.assertEqual(1, len(server._pool.sockets)) + self.assertFalse(sock_info in server._pool.sockets) + self.assertTrue(new_sock_info in server._pool.sockets) + + client = MongoClient(host, port) + server = client._get_topology().select_server(any_server_selector) + with server._pool.get_socket({}) as sock_info: + pass + time.sleep(1) + with server._pool.get_socket({}) as new_sock_info: + self.assertEqual(sock_info, new_sock_info) + self.assertEqual(1, len(server._pool.sockets)) + def test_constants(self): # Set bad defaults. MongoClient.HOST = "somedomainthatdoesntexist.org" @@ -798,23 +869,6 @@ class TestClient(IntegrationTest): wait_until(raises_cursor_not_found, 'close cursor') - def test_kill_cursors_with_server_unavailable(self): - with client_knobs(kill_cursor_frequency=9999999): - client = MongoClient('doesnt exist', connect=False, - serverSelectionTimeoutMS=0) - - # Wait for the first tick of the periodic kill-cursors to pass. - time.sleep(1) - - # Enqueue a kill-cursors message. - client.close_cursor(1234, ('doesnt-exist', 27017)) - - with warnings.catch_warnings(record=True) as user_warnings: - client._process_kill_cursors_queue() - - self.assertIn("couldn't close cursor on ('doesnt-exist', 27017)", - str(user_warnings[0].message)) - def test_lazy_connect_w0(self): # Ensure that connect-on-demand works when the first operation is # an unacknowledged write. This exercises _writable_max_wire_version().