PYTHON-658 - Support minPoolSize, maxIdleTimeMS

This commit is contained in:
aherlihy 2016-06-14 14:37:06 +02:00
parent ecab1c9432
commit 10608144d6
10 changed files with 227 additions and 55 deletions

View File

@ -28,6 +28,16 @@ all other sockets are in use and the pool has reached its maximum, the
thread pauses, waiting for a socket to be returned to the pool by another
thread.
It is possible to set the minimum number of concurrent connections to each
server with ``minPoolSize``, which defaults to 0. The connection pool will be
initialized with this number of sockets. If sockets are removed from the pool
and closed, causing the total number of sockets (both in use and idle) to drop
below the set minimum, more sockets will be added until the minimum is reached.
The maximum number of milliseconds that a connection can remain idle in the
pool before being removed and replaced can be set with ``maxIdleTime``, which
defaults to `None` (no limit).
The default configuration for a :class:`~pymongo.mongo_client.MongoClient`
works for most applications::

View File

@ -98,6 +98,10 @@ def _parse_ssl_options(options):
def _parse_pool_options(options):
"""Parse connection pool options."""
max_pool_size = options.get('maxpoolsize', common.MAX_POOL_SIZE)
min_pool_size = options.get('minpoolsize', common.MIN_POOL_SIZE)
max_idle_time_ms = options.get('maxidletimems', common.MAX_IDLE_TIME_MS)
if max_pool_size is not None and min_pool_size > max_pool_size:
raise ValueError("minPoolSize must be smaller or equal to maxPoolSize")
connect_timeout = options.get('connecttimeoutms', common.CONNECT_TIMEOUT)
socket_keepalive = options.get('socketkeepalive', False)
socket_timeout = options.get('sockettimeoutms')
@ -106,6 +110,8 @@ def _parse_pool_options(options):
event_listeners = options.get('event_listeners')
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
return PoolOptions(max_pool_size,
min_pool_size,
max_idle_time_ms,
connect_timeout, socket_timeout,
wait_queue_timeout, wait_queue_multiple,
ssl_context, ssl_match_hostname, socket_keepalive,

View File

@ -64,6 +64,12 @@ CONNECT_TIMEOUT = 20.0
# Default value for maxPoolSize.
MAX_POOL_SIZE = 100
# Default value for minPoolSize.
MIN_POOL_SIZE = 0
# Default value for maxIdleTimeMS.
MAX_IDLE_TIME_MS = None
# Default value for localThresholdMS.
LOCAL_THRESHOLD_MS = 15
@ -441,6 +447,8 @@ URI_VALIDATORS = {
'tz_aware': validate_boolean_or_string,
'uuidrepresentation': validate_uuid_representation,
'connect': validate_boolean_or_string,
'event_listeners': _validate_event_listeners,
'minpoolsize': validate_non_negative_integer
}
TIMEOUT_VALIDATORS = {
@ -448,6 +456,7 @@ TIMEOUT_VALIDATORS = {
'sockettimeoutms': validate_timeout_or_none,
'waitqueuetimeoutms': validate_timeout_or_none,
'serverselectiontimeoutms': validate_timeout_or_zero,
'maxidletimems': validate_timeout_or_none,
}
KW_VALIDATORS = {

View File

@ -17,6 +17,8 @@
import collections
import datetime
import struct
import sys
import traceback
import bson
from bson.codec_options import CodecOptions
@ -337,3 +339,20 @@ def _fields_list_to_dict(fields, option_name):
raise TypeError("%s must be a mapping or "
"list of key names" % (option_name,))
def _handle_exception():
"""Print exceptions raised by subscribers to stderr."""
# Heavily influenced by logging.Handler.handleError.
# See note here:
# https://docs.python.org/3.4/library/sys.html#sys.__stderr__
if sys.stderr:
einfo = sys.exc_info()
try:
traceback.print_exception(einfo[0], einfo[1], einfo[2],
None, sys.stderr)
except IOError:
pass
finally:
del einfo

View File

@ -34,7 +34,6 @@ access:
import contextlib
import datetime
import threading
import warnings
import weakref
from collections import defaultdict
@ -124,10 +123,16 @@ class MongoClient(common.BaseObject):
| **Other optional parameters can be passed as keyword arguments:**
- `maxPoolSize` (optional): The maximum number of connections
that the pool will open simultaneously. If this is set, operations
will block if there are `maxPoolSize` outstanding connections
from the pool. Defaults to 100. Cannot be 0.
- `maxPoolSize` (optional): The maximum allowable number of
concurrent connections to each connected server. Requests to a
server will block if there are `maxPoolSize` outstanding
connections to the requested server. Defaults to 100. Cannot be 0.
- `minPoolSize` (optional): The minimum required number of concurrent
connections that the pool will maintain to each connected server.
Default is 0.
- `maxIdleTimeMS` (optional): The maximum number of milliseconds that
a connection can remain idle in the pool before being removed and
replaced. Defaults to `None` (no limit).
- `socketTimeoutMS`: (integer or None) Controls how long (in
milliseconds) the driver will wait for a response after sending an
ordinary (non-monitoring) database operation before concluding that
@ -390,7 +395,7 @@ class MongoClient(common.BaseObject):
client = self_ref()
if client is None:
return False # Stop the executor.
MongoClient._process_kill_cursors_queue(client)
MongoClient._process_periodic_tasks(client)
return True
executor = periodic_executor.PeriodicExecutor(
@ -598,15 +603,34 @@ class MongoClient(common.BaseObject):
@property
def max_pool_size(self):
"""The maximum number of sockets the pool will open concurrently.
"""The maximum allowable number of concurrent connections to each
connected server. Requests to a server will block if there are
`maxPoolSize` outstanding connections to the requested server.
Defaults to 100. Cannot be 0.
When the pool has reached `max_pool_size`, operations block waiting for
a socket to be returned to the pool. If ``waitQueueTimeoutMS`` is set,
a blocked operation will raise :exc:`~pymongo.errors.ConnectionFailure`
after a timeout. By default ``waitQueueTimeoutMS`` is not set.
When a server's pool has reached `max_pool_size`, operations for that
server block waiting for a socket to be returned to the pool. If
``waitQueueTimeoutMS`` is set, a blocked operation will raise
:exc:`~pymongo.errors.ConnectionFailure` after a timeout.
By default ``waitQueueTimeoutMS`` is not set.
"""
return self.__options.pool_options.max_pool_size
@property
def min_pool_size(self):
"""The minimum required number of concurrent connections that the pool
will maintain to each connected server. Default is 0.
"""
return self.__options.pool_options.min_pool_size
@property
def max_idle_time_ms(self):
"""The maximum number of milliseconds that a connection can remain
idle in the pool before being removed and replaced. Defaults to
`None` (no limit).
"""
return self.__options.pool_options.max_idle_time_ms
@property
def nodes(self):
"""Set of all currently connected servers.
@ -945,8 +969,9 @@ class MongoClient(common.BaseObject):
self.__kill_cursors_queue.append((address, cursor_ids))
# This method is run periodically by a background thread.
def _process_kill_cursors_queue(self):
"""Process any pending kill cursors requests."""
def _process_periodic_tasks(self):
"""Process any pending kill cursors requests and
maintain connection pool parameters."""
address_to_cursor_ids = defaultdict(list)
# Other threads or the GC may append to the queue concurrently.
@ -1018,9 +1043,12 @@ class MongoClient(common.BaseObject):
duration, reply, 'killCursors', request_id,
address)
except ConnectionFailure as exc:
warnings.warn("couldn't close cursor on %s: %s"
% (address, exc))
except Exception:
helpers._handle_exception()
try:
self._topology.update_pool()
except Exception:
helpers._handle_exception()
def server_info(self):
"""Get information about the MongoDB server we're connected to."""

View File

@ -70,6 +70,7 @@ import sys
import traceback
from collections import namedtuple, Sequence
from pymongo.helpers import _handle_exception
_Listeners = namedtuple('Listeners', ('command_listeners',))
@ -133,22 +134,6 @@ def register(listener):
_LISTENERS.command_listeners.append(listener)
def _handle_exception():
"""Print exceptions raised by subscribers to stderr."""
# Heavily influenced by logging.Handler.handleError.
# See note here:
# https://docs.python.org/3.4/library/sys.html#sys.__stderr__
if sys.stderr:
einfo = sys.exc_info()
try:
traceback.print_exception(einfo[0], einfo[1], einfo[2],
None, sys.stderr)
except IOError:
pass
finally:
del einfo
# Note - to avoid bugs from forgetting which if these is all lowercase and
# which are camelCase, and at the same time avoid having to add a test for
# every command, use all lowercase here and test against command_name.lower().

View File

@ -67,18 +67,22 @@ def _raise_connection_failure(address, error):
class PoolOptions(object):
__slots__ = ('__max_pool_size', '__connect_timeout', '__socket_timeout',
__slots__ = ('__max_pool_size', '__min_pool_size', '__max_idle_time_ms',
'__connect_timeout', '__socket_timeout',
'__wait_queue_timeout', '__wait_queue_multiple',
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
'__event_listeners')
def __init__(self, max_pool_size=100, connect_timeout=None,
def __init__(self, max_pool_size=100, min_pool_size=0,
max_idle_time_ms=None, connect_timeout=None,
socket_timeout=None, wait_queue_timeout=None,
wait_queue_multiple=None, ssl_context=None,
ssl_match_hostname=True, socket_keepalive=False,
event_listeners=None):
self.__max_pool_size = max_pool_size
self.__min_pool_size = min_pool_size
self.__max_idle_time_ms = max_idle_time_ms
self.__connect_timeout = connect_timeout
self.__socket_timeout = socket_timeout
self.__wait_queue_timeout = wait_queue_timeout
@ -90,12 +94,34 @@ class PoolOptions(object):
@property
def max_pool_size(self):
"""The maximum number of connections that the pool will open
simultaneously. If this is set, operations will block if there
are `max_pool_size` outstanding connections.
"""The maximum allowable number of concurrent connections to each
connected server. Requests to a server will block if there are
`maxPoolSize` outstanding connections to the requested server.
Defaults to 100. Cannot be 0.
When a server's pool has reached `max_pool_size`, operations for that
server block waiting for a socket to be returned to the pool. If
``waitQueueTimeoutMS`` is set, a blocked operation will raise
:exc:`~pymongo.errors.ConnectionFailure` after a timeout.
By default ``waitQueueTimeoutMS`` is not set.
"""
return self.__max_pool_size
@property
def min_pool_size(self):
"""The minimum required number of concurrent connections that the pool
will maintain to each connected server. Default is 0.
"""
return self.__min_pool_size
@property
def max_idle_time_ms(self):
"""The maximum number of milliseconds that a connection can remain
idle in the pool before being removed and replaced. Defaults to
`None` (no limit).
"""
return self.__max_idle_time_ms
@property
def connect_timeout(self):
"""How long a connection can take to be opened before timing out.
@ -459,6 +485,7 @@ class Pool:
self.sockets = set()
self.lock = threading.Lock()
self.active_sockets = 0
# Keep track of resets, so we notice sockets created before the most
# recent reset and close them.
@ -483,10 +510,26 @@ class Pool:
self.pool_id += 1
self.pid = os.getpid()
sockets, self.sockets = self.sockets, set()
self.active_sockets = 0
for sock_info in sockets:
sock_info.close()
def remove_stale_sockets(self):
with self.lock:
if self.opts.max_idle_time_ms is not None:
for sock_info in self.sockets.copy():
age = _time() - sock_info.last_checkout
if age > self.opts.max_idle_time_ms:
self.sockets.remove(sock_info)
sock_info.close()
while len(
self.sockets) + self.active_sockets < self.opts.min_pool_size:
sock_info = self.connect()
with self.lock:
self.sockets.add(sock_info)
def connect(self):
"""Connect to Mongo and return a new SocketInfo.
@ -560,6 +603,8 @@ class Pool:
if not self._socket_semaphore.acquire(
True, self.opts.wait_queue_timeout):
self._raise_wait_queue_timeout()
with self.lock:
self.active_sockets += 1
# We've now acquired the semaphore and must release it on error.
try:
@ -571,6 +616,12 @@ class Pool:
except KeyError:
# Can raise ConnectionFailure or CertificateError.
sock_info, from_pool = self.connect(), False
# If socket is idle, open a new one.
if self.opts.max_idle_time_ms is not None:
age = _time() - sock_info.last_checkout
if age > self.opts.max_idle_time_ms:
sock_info.close()
sock_info, from_pool = self.connect(), False
if from_pool:
# Can raise ConnectionFailure.
@ -578,6 +629,8 @@ class Pool:
except:
self._socket_semaphore.release()
with self.lock:
self.active_sockets -= 1
raise
sock_info.last_checkout = _time()
@ -595,6 +648,8 @@ class Pool:
self.sockets.add(sock_info)
self._socket_semaphore.release()
with self.lock:
self.active_sockets -= 1
def _check(self, sock_info):
"""This side-effecty function checks if this pool has been reset since

View File

@ -254,6 +254,12 @@ class Topology(object):
self._reset_server(address)
self._request_check(address)
def update_pool(self):
# Remove any stale sockets and add new sockets if pool is too small.
with self._lock:
for server in self._servers.values():
server._pool.remove_stale_sockets()
def close(self):
"""Clear pools and terminate monitors. Topology reopens on demand."""
with self._lock:

View File

@ -192,7 +192,7 @@ class MockClient(MongoClient):
return response, rtt
def _process_kill_cursors_queue(self):
def _process_periodic_tasks(self):
# Avoid the background thread causing races, e.g. a surprising
# reconnect while we're trying to test a disconnected client.
pass

View File

@ -187,6 +187,77 @@ class ClientUnitTest(unittest.TestCase):
class TestClient(IntegrationTest):
def test_max_idle_time_reaper(self):
with client_knobs(kill_cursor_frequency=0.1):
# Assert reaper doesn't remove sockets when maxIdleTimeMS not set
client = MongoClient(host, port)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
time.sleep(1)
self.assertEqual(1, len(server._pool.sockets))
self.assertTrue(sock_info in server._pool.sockets)
# Assert reaper removes idle socket and replaces it with a new one
client = MongoClient(host, port, maxIdleTimeMS=.5, minPoolSize=1)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
time.sleep(2)
self.assertEqual(1, len(server._pool.sockets))
self.assertFalse(sock_info in server._pool.sockets)
# Assert reaper has removed idle socket and NOT replaced it
client = MongoClient(host, port, maxIdleTimeMS=.5)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}):
pass
time.sleep(1)
self.assertEqual(0, len(server._pool.sockets))
def test_min_pool_size(self):
with client_knobs(kill_cursor_frequency=.1):
client = MongoClient(host, port)
server = client._get_topology().select_server(any_server_selector)
time.sleep(1)
self.assertEqual(0, len(server._pool.sockets))
# Assert that pool started up at minPoolSize
client = MongoClient(host, port, minPoolSize=10)
server = client._get_topology().select_server(any_server_selector)
time.sleep(1)
self.assertEqual(10, len(server._pool.sockets))
# Assert that if a socket is closed, a new one takes its place
with server._pool.get_socket({}) as sock_info:
sock_info.close()
time.sleep(1)
self.assertEqual(10, len(server._pool.sockets))
self.assertFalse(sock_info in server._pool.sockets)
def test_max_idle_time_checkout(self):
with client_knobs(kill_cursor_frequency=99999999):
client = MongoClient(host, port, maxIdleTimeMS=.5)
time.sleep(1)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
time.sleep(1)
with server._pool.get_socket({}) as new_sock_info:
self.assertNotEqual(sock_info, new_sock_info)
self.assertEqual(1, len(server._pool.sockets))
self.assertFalse(sock_info in server._pool.sockets)
self.assertTrue(new_sock_info in server._pool.sockets)
client = MongoClient(host, port)
server = client._get_topology().select_server(any_server_selector)
with server._pool.get_socket({}) as sock_info:
pass
time.sleep(1)
with server._pool.get_socket({}) as new_sock_info:
self.assertEqual(sock_info, new_sock_info)
self.assertEqual(1, len(server._pool.sockets))
def test_constants(self):
# Set bad defaults.
MongoClient.HOST = "somedomainthatdoesntexist.org"
@ -798,23 +869,6 @@ class TestClient(IntegrationTest):
wait_until(raises_cursor_not_found, 'close cursor')
def test_kill_cursors_with_server_unavailable(self):
with client_knobs(kill_cursor_frequency=9999999):
client = MongoClient('doesnt exist', connect=False,
serverSelectionTimeoutMS=0)
# Wait for the first tick of the periodic kill-cursors to pass.
time.sleep(1)
# Enqueue a kill-cursors message.
client.close_cursor(1234, ('doesnt-exist', 27017))
with warnings.catch_warnings(record=True) as user_warnings:
client._process_kill_cursors_queue()
self.assertIn("couldn't close cursor on ('doesnt-exist', 27017)",
str(user_warnings[0].message))
def test_lazy_connect_w0(self):
# Ensure that connect-on-demand works when the first operation is
# an unacknowledged write. This exercises _writable_max_wire_version().