From c2e6471ee642196ad655c78ef1cd18f88dda93cf Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Thu, 3 Jul 2014 14:10:37 -0700 Subject: [PATCH] PYTHON-727 - Implement and use PoolOptions class --- pymongo/cluster.py | 10 +-- pymongo/mongo_client.py | 46 +++++----- pymongo/mongo_replica_set_client.py | 40 ++++----- pymongo/pool.py | 128 +++++++++++++++++++--------- test/pymongo_mocks.py | 12 +-- test/test_client.py | 12 +-- test/test_pooling_base.py | 56 ++++++------ test/test_pooling_gevent.py | 43 +++++----- test/test_replica_set_client.py | 10 ++- 9 files changed, 202 insertions(+), 155 deletions(-) diff --git a/pymongo/cluster.py b/pymongo/cluster.py index a4846e275..e6c4b09a5 100644 --- a/pymongo/cluster.py +++ b/pymongo/cluster.py @@ -22,6 +22,7 @@ from pymongo.cluster_description import (updated_cluster_description, CLUSTER_TYPE, ClusterDescription) from pymongo.errors import InvalidOperation, ConnectionFailure +from pymongo.pool import PoolOptions from pymongo.server import Server @@ -176,8 +177,7 @@ class Cluster(object): # TODO: Need PoolSettings, SocketSettings, and SSLContext classes. return self._settings.pool_class( address, - max_size=100, - net_timeout=None, - conn_timeout=20, - ssl_context=None, - use_greenlets=False) + PoolOptions( + max_pool_size=100, + connect_timeout=20) + ) diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index f33d333ba..013b34239 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -287,20 +287,20 @@ class MongoClient(common.BaseObject): options[option] = value options.update(opts) - self.__max_pool_size = common.validate_positive_integer_or_none( - 'max_pool_size', max_pool_size) - self.__cursor_manager = CursorManager(self) self.__repl = options.get('replicaset') self.__direct = len(seeds) == 1 and not self.__repl - self.__net_timeout = options.get('sockettimeoutms') - self.__conn_timeout = options.get('connecttimeoutms') - self.__wait_queue_timeout = options.get('waitqueuetimeoutms') - self.__wait_queue_multiple = options.get('waitqueuemultiple') + max_pool_size = common.validate_positive_integer_or_none( + 'max_pool_size', max_pool_size) + connect_timeout = options.get('connecttimeoutms') + socket_timeout = options.get('sockettimeoutms') + wait_queue_timeout = options.get('waitqueuetimeoutms') + wait_queue_multiple = options.get('waitqueuemultiple') + use_greenlets = options.get('use_greenlets', False) - self.__ssl_ctx = None + ssl_context = None use_ssl = options.get('ssl', None) keyfile = options.get('ssl_keyfile', None) certfile = options.get('ssl_certfile', None) @@ -335,13 +335,21 @@ class MongoClient(common.BaseObject): ctx.load_verify_locations(ca_certs) if cert_reqs is not None: ctx.verify_mode = cert_reqs - self.__ssl_ctx = ctx + ssl_context = ctx + + self.__pool_opts = pool.PoolOptions( + max_pool_size=max_pool_size, + connect_timeout=connect_timeout, + socket_timeout=socket_timeout, + wait_queue_timeout=wait_queue_timeout, + wait_queue_multiple=wait_queue_multiple, + ssl_context=ssl_context, + use_greenlets=use_greenlets) - self.__use_greenlets = options.get('use_greenlets', False) self.__pool_class = pool_class self.__connecting = False - if self.__use_greenlets: + if use_greenlets: # Greenlets don't need to lock around access to the Member; # they're only interrupted when they do I/O. self.__connecting_lock = thread_util.DummyLock() @@ -352,7 +360,7 @@ class MongoClient(common.BaseObject): self.__event_class = event_class else: # Prevent a cycle; this lambda shouldn't refer to self. - g = self.__use_greenlets + g = use_greenlets event_class = lambda: thread_util.create_event(g) self.__event_class = event_class @@ -476,15 +484,7 @@ class MongoClient(common.BaseObject): del self.__auth_credentials[source] def __create_pool(self, pair): - return self.__pool_class( - pair, - self.__max_pool_size, - self.__net_timeout, - self.__conn_timeout, - self.__ssl_ctx, - use_greenlets=self.__use_greenlets, - wait_queue_timeout=self.__wait_queue_timeout, - wait_queue_multiple=self.__wait_queue_multiple) + return self.__pool_class(pair, self.__pool_opts) def __check_auth(self, sock_info): """Authenticate using cached database credentials. @@ -570,7 +570,7 @@ class MongoClient(common.BaseObject): .. versionchanged:: 2.6 .. versionadded:: 1.11 """ - return self.__max_pool_size + return self.__pool_opts.max_pool_size @property def use_greenlets(self): @@ -579,7 +579,7 @@ class MongoClient(common.BaseObject): .. versionadded:: 2.4.2 """ - return self.__use_greenlets + return self.__pool_opts.use_greenlets @property def nodes(self): diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 5ec00509f..d5d3b6c8c 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -584,8 +584,6 @@ class MongoReplicaSetClient(common.BaseObject): self.__index_cache = {} self.__auth_credentials = {} - self.__max_pool_size = common.validate_positive_integer_or_none( - 'max_pool_size', max_pool_size) self.__tz_aware = common.validate_boolean('tz_aware', tz_aware) self.__document_class = document_class self.__monitor = None @@ -624,6 +622,12 @@ class MongoReplicaSetClient(common.BaseObject): self.__opts[option] = value self.__opts.update(options) + max_pool_size = common.validate_positive_integer_or_none( + 'max_pool_size', max_pool_size) + connect_timeout = self.__opts.get('connecttimeoutms') + socket_timeout = self.__opts.get('sockettimeoutms') + wait_queue_timeout = self.__opts.get('waitqueuetimeoutms') + wait_queue_multiple = self.__opts.get('waitqueuemultiple') self.__use_greenlets = self.__opts.get('use_greenlets', False) if self.__use_greenlets and not have_gevent: raise ConfigurationError( @@ -643,12 +647,7 @@ class MongoReplicaSetClient(common.BaseObject): raise ConfigurationError("the replicaSet " "keyword parameter is required.") - self.__net_timeout = self.__opts.get('sockettimeoutms') - self.__conn_timeout = self.__opts.get('connecttimeoutms') - self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms') - self.__wait_queue_multiple = self.__opts.get('waitqueuemultiple') - - self.__ssl_ctx = None + ssl_context = None use_ssl = self.__opts.get('ssl', None) keyfile = self.__opts.get('ssl_keyfile', None) certfile = self.__opts.get('ssl_certfile', None) @@ -683,7 +682,16 @@ class MongoReplicaSetClient(common.BaseObject): ctx.load_verify_locations(ca_certs) if cert_reqs is not None: ctx.verify_mode = cert_reqs - self.__ssl_ctx = ctx + ssl_context = ctx + + self.__pool_opts = pool.PoolOptions( + max_pool_size=max_pool_size, + connect_timeout=connect_timeout, + socket_timeout=socket_timeout, + wait_queue_timeout=wait_queue_timeout, + wait_queue_multiple=wait_queue_multiple, + ssl_context=ssl_context, + use_greenlets=self.__use_greenlets) super(MongoReplicaSetClient, self).__init__(**self.__opts) @@ -910,7 +918,7 @@ class MongoReplicaSetClient(common.BaseObject): .. versionchanged:: 2.6 """ - return self.__max_pool_size + return self.__pool_opts.max_pool_size @property def use_greenlets(self): @@ -919,7 +927,7 @@ class MongoReplicaSetClient(common.BaseObject): .. versionadded:: 2.4.2 """ - return self.__use_greenlets + return self.__pool_opts.use_greenlets def get_document_class(self): """document_class getter""" @@ -1031,15 +1039,7 @@ class MongoReplicaSetClient(common.BaseObject): """Directly call ismaster. Returns (response, connection_pool, ping_time in seconds). """ - connection_pool = self.pool_class( - host, - self.__max_pool_size, - self.__net_timeout, - self.__conn_timeout, - self.__ssl_ctx, - use_greenlets=self.__use_greenlets, - wait_queue_timeout=self.__wait_queue_timeout, - wait_queue_multiple=self.__wait_queue_multiple) + connection_pool = self.pool_class(host, self.__pool_opts) if self.in_request(): connection_pool.start_request() diff --git a/pymongo/pool.py b/pymongo/pool.py index aea4689d9..9fbff8853 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -55,6 +55,72 @@ def _closed(sock): return len(rd) > 0 +class PoolOptions(object): + + __slots__ = ('__max_pool_size', '__connect_timeout', '__socket_timeout', + '__wait_queue_timeout', '__wait_queue_multiple', + '__ssl_context', '__use_greenlets') + + def __init__(self, max_pool_size=100, connect_timeout=None, + socket_timeout=None, wait_queue_timeout=None, + wait_queue_multiple=None, ssl_context=None, + use_greenlets=False): + + self.__max_pool_size = max_pool_size + self.__connect_timeout = connect_timeout + self.__socket_timeout = socket_timeout + self.__wait_queue_timeout = wait_queue_timeout + self.__wait_queue_multiple = wait_queue_multiple + self.__ssl_context = ssl_context + self.__use_greenlets = use_greenlets + + @property + def max_pool_size(self): + """The maximum number of connections that the pool will open + simultaneously. If this is set, operations will block if there + are `max_pool_size` outstanding connections. + """ + return self.__max_pool_size + + @property + def connect_timeout(self): + """How long a connection can take to be opened before timing out. + """ + return self.__connect_timeout + + @property + def socket_timeout(self): + """How long a send or receive on a socket can take before timing out. + """ + return self.__socket_timeout + + @property + def wait_queue_timeout(self): + """How long a thread will wait for a socket from the pool if the pool + has no free sockets. + """ + return self.__wait_queue_timeout + + @property + def wait_queue_multiple(self): + """Multiplied by max_pool_size to give the number of threads allowed + to wait for a socket at one time. + """ + return self.__wait_queue_multiple + + @property + def ssl_context(self): + """An SSLContext instance or None. + """ + return self.__ssl_context + + @property + def use_greenlets(self): + """Use greenlet ids for "thread affinity" in requests. + """ + return self.__use_greenlets + + class SocketInfo(object): """Store a socket with some metadata """ @@ -137,27 +203,11 @@ class SocketInfo(object): # Do *not* explicitly inherit from object or Jython won't call __del__ # http://bugs.jython.org/issue1057 class Pool: - def __init__(self, pair, max_size, net_timeout, - conn_timeout, ssl_context, use_greenlets, - wait_queue_timeout=None, wait_queue_multiple=None): + def __init__(self, pair, options): """ :Parameters: - `pair`: a (hostname, port) tuple - - `max_size`: The maximum number of open sockets. Calls to - `get_socket` will block if this is set, this pool has opened - `max_size` sockets, and there are none idle. Set to `None` to - disable. - - `net_timeout`: timeout in seconds for operations on open connection - - `conn_timeout`: timeout in seconds for establishing connection - - `ssl_context`: an SSLContext instance or None - - `use_greenlets`: bool, if True then start_request() assigns a - socket to the current greenlet - otherwise it is assigned to the - current thread - - `wait_queue_timeout`: (integer) How long (in seconds) a - thread will wait for a socket from the pool if the pool has no - free sockets. - - `wait_queue_multiple`: (integer) Multiplied by max_pool_size to give - the number of threads allowed to wait for a socket at one time. + - `options`: a PoolOptions instance """ # Only check a socket's health with _closed() every once in a while. # Can override for testing: 0 to always check, None to never check. @@ -171,34 +221,31 @@ class Pool: self.pool_id = 0 self.pid = os.getpid() self.pair = pair - self.max_size = max_size - self.net_timeout = net_timeout - self.conn_timeout = conn_timeout - self.wait_queue_timeout = wait_queue_timeout - self.wait_queue_multiple = wait_queue_multiple - self.ssl_context = ssl_context + self.opts = options # Map self._ident.get() -> request socket self._tid_to_sock = {} - if use_greenlets and not thread_util.have_gevent: + if self.opts.use_greenlets and not thread_util.have_gevent: raise ConfigurationError( "The Gevent module is not available. " "Install the gevent package from PyPI." ) - self._ident = thread_util.create_ident(use_greenlets) + self._ident = thread_util.create_ident(self.opts.use_greenlets) # Count the number of calls to start_request() per thread or greenlet - self._request_counter = thread_util.Counter(use_greenlets) + self._request_counter = thread_util.Counter(self.opts.use_greenlets) - if self.wait_queue_multiple is None or self.max_size is None: + if (self.opts.wait_queue_multiple is None or + self.opts.max_pool_size is None): max_waiters = None else: - max_waiters = self.max_size * self.wait_queue_multiple + max_waiters = ( + self.opts.max_pool_size * self.opts.wait_queue_multiple) self._socket_semaphore = thread_util.create_semaphore( - self.max_size, max_waiters, use_greenlets) + self.opts.max_pool_size, max_waiters, self.opts.use_greenlets) def reset(self): # Ignore this race condition -- if many threads are resetting at once, @@ -255,7 +302,7 @@ class Pool: try: sock = socket.socket(af, socktype, proto) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.settimeout(self.conn_timeout or 20.0) + sock.settimeout(self.opts.connect_timeout or 20.0) sock.connect(sa) return sock except socket.error as e: @@ -279,22 +326,23 @@ class Pool: """ sock = self.create_connection() hostname = self.pair[0] + ssl_context = self.opts.ssl_context - if self.ssl_context is not None: + if ssl_context is not None: try: - sock = self.ssl_context.wrap_socket(sock) + sock = ssl_context.wrap_socket(sock) except ssl.SSLError: sock.close() raise ConnectionFailure("SSL handshake failed. MongoDB may " "not be configured with SSL support.") - if self.ssl_context.verify_mode != ssl.CERT_NONE: + if ssl_context.verify_mode != ssl.CERT_NONE: try: match_hostname(sock.getpeercert(), hostname) except CertificateError: sock.close() raise - sock.settimeout(self.net_timeout) + sock.settimeout(self.opts.socket_timeout) return SocketInfo(sock, self.pool_id, hostname) def get_socket(self, force=False): @@ -334,7 +382,8 @@ class Pool: # having acquired it for this socket. if not self._socket_semaphore.acquire(False): forced = True - elif not self._socket_semaphore.acquire(True, self.wait_queue_timeout): + elif not self._socket_semaphore.acquire( + True, self.opts.wait_queue_timeout): self._raise_wait_queue_timeout() # We've now acquired the semaphore and must release it on error. @@ -429,8 +478,9 @@ class Pool: """ try: self.lock.acquire() - too_many_sockets = (self.max_size is not None - and len(self.sockets) >= self.max_size) + max_size = self.opts.max_pool_size + too_many_sockets = (max_size is not None + and len(self.sockets) >= max_size) if not too_many_sockets and sock_info.pool_id == self.pool_id: self.sockets.add(sock_info) @@ -535,7 +585,7 @@ class Pool: raise ConnectionFailure( 'Timed out waiting for socket from pool with max_size %r and' ' wait_queue_timeout %r' % ( - self.max_size, self.wait_queue_timeout)) + self.opts.max_pool_size, self.opts.wait_queue_timeout)) def __del__(self): # Avoid ResourceWarnings in Python 3 diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 1086aabb5..69d4e02b9 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -18,7 +18,7 @@ import socket from pymongo import common from pymongo import MongoClient, MongoReplicaSetClient -from pymongo.pool import Pool +from pymongo.pool import Pool, PoolOptions from test import host as default_host, port as default_port from test.utils import my_partial @@ -31,14 +31,8 @@ class MockPool(Pool): self.mock_host, self.mock_port = pair # Actually connect to the default server. - Pool.__init__( - self, - pair=(default_host, default_port), - max_size=None, - net_timeout=None, - conn_timeout=20, - ssl_context=None, - use_greenlets=False) + Pool.__init__(self, + (default_host, default_port), PoolOptions(connect_timeout=20)) def get_socket(self, force=False): client = self.client diff --git a/test/test_client.py b/test/test_client.py index 9d31832e4..a4dd990bb 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -506,16 +506,16 @@ class TestClient(IntegrationTest, TestRequestMixin): def test_timeouts(self): client = MongoClient(host, port, connectTimeoutMS=10500) - self.assertEqual(10.5, get_pool(client).conn_timeout) + self.assertEqual(10.5, get_pool(client).opts.connect_timeout) client = MongoClient(host, port, socketTimeoutMS=10500) - self.assertEqual(10.5, get_pool(client).net_timeout) + self.assertEqual(10.5, get_pool(client).opts.socket_timeout) def test_socket_timeout_ms_validation(self): c = get_client(socketTimeoutMS=10 * 1000) - self.assertEqual(10, c._MongoClient__net_timeout) + self.assertEqual(10, c._MongoClient__pool_opts.socket_timeout) c = get_client(socketTimeoutMS=None) - self.assertEqual(None, c._MongoClient__net_timeout) + self.assertEqual(None, c._MongoClient__pool_opts.socket_timeout) self.assertRaises(ConfigurationError, get_client, socketTimeoutMS=0) @@ -549,12 +549,12 @@ class TestClient(IntegrationTest, TestRequestMixin): def test_waitQueueTimeoutMS(self): client = MongoClient(host, port, waitQueueTimeoutMS=2000) - self.assertEqual(get_pool(client).wait_queue_timeout, 2) + self.assertEqual(get_pool(client).opts.wait_queue_timeout, 2) def test_waitQueueMultiple(self): client = MongoClient(host, port, max_pool_size=3, waitQueueMultiple=2) pool = get_pool(client) - self.assertEqual(pool.wait_queue_multiple, 2) + self.assertEqual(pool.opts.wait_queue_multiple, 2) self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6) def test_tz_aware(self): diff --git a/test/test_pooling_base.py b/test/test_pooling_base.py index 6db0bea54..c2f5d9b4f 100644 --- a/test/test_pooling_base.py +++ b/test/test_pooling_base.py @@ -27,7 +27,8 @@ sys.path[0:0] = [""] import pymongo.pool from bson.py3compat import thread from pymongo.mongo_client import MongoClient -from pymongo.pool import Pool, NO_REQUEST, NO_SOCKET_YET, SocketInfo +from pymongo.pool import (Pool, PoolOptions, + NO_REQUEST, NO_SOCKET_YET, SocketInfo) from pymongo.errors import ConfigurationError, ConnectionFailure from pymongo.errors import ExceededMaxWaiters from test import host, port, SkipTest @@ -346,9 +347,9 @@ class _TestPoolingBase(object): opts['use_greenlets'] = self.use_greenlets return get_client(*args, **opts) - def get_pool(self, *args, **kwargs): + def get_pool(self, pair, *args, **kwargs): kwargs['use_greenlets'] = self.use_greenlets - return Pool(*args, **kwargs) + return Pool(pair, PoolOptions(*args, **kwargs)) def sleep(self, seconds): if self.use_greenlets: @@ -438,7 +439,7 @@ class _TestPooling(_TestPoolingBase): def test_independent_pools(self): # Test for regression of very early PyMongo bug: separate pools shared # state. - p = self.get_pool((host, port), 10, None, None, None) + p = self.get_pool((host, port), 10) self.c.start_request() self.c.pymongo_test.test.find_one() self.assertEqual(set(), p.sockets) @@ -506,10 +507,9 @@ class _TestPooling(_TestPoolingBase): # get_socket() -- doesn't automatically put us in a request any more cx_pool = self.get_pool( pair=(host,port), - max_size=10, - net_timeout=1000, - conn_timeout=1000, - ssl_context=None + max_pool_size=10, + connect_timeout=1000, + socket_timeout=1000 ) sock0 = cx_pool.get_socket() @@ -547,7 +547,7 @@ class _TestPooling(_TestPoolingBase): # reset() is called after a fork, or after a socket error. Ensure that # a new request is begun if a request was in progress when the reset() # occurred, otherwise no request is begun. - p = self.get_pool((host, port), 10, None, None, None) + p = self.get_pool((host, port), 10) self.assertFalse(p.in_request()) p.start_request() self.assertTrue(p.in_request()) @@ -560,7 +560,7 @@ class _TestPooling(_TestPoolingBase): def test_pool_reuses_open_socket(self): # Test Pool's _check_closed() method doesn't close a healthy socket - cx_pool = self.get_pool((host,port), 10, None, None, None) + cx_pool = self.get_pool((host,port), 10) cx_pool._check_interval_seconds = 0 # Always check. sock_info = cx_pool.get_socket() cx_pool.maybe_return_socket(sock_info) @@ -573,7 +573,7 @@ class _TestPooling(_TestPoolingBase): def test_pool_removes_dead_socket(self): # Test that Pool removes dead socket and the socket doesn't return # itself PYTHON-344 - cx_pool = self.get_pool((host,port), 10, None, None, None) + cx_pool = self.get_pool((host,port), 10) cx_pool._check_interval_seconds = 0 # Always check. sock_info = cx_pool.get_socket() @@ -589,7 +589,7 @@ class _TestPooling(_TestPoolingBase): def test_pool_removes_dead_request_socket_after_check(self): # Test that Pool keeps request going even if a socket dies in request - cx_pool = self.get_pool((host,port), 10, None, None, None) + cx_pool = self.get_pool((host,port), 10) cx_pool._check_interval_seconds = 0 # Always check. cx_pool.start_request() @@ -615,7 +615,7 @@ class _TestPooling(_TestPoolingBase): def test_pool_removes_dead_request_socket(self): # Test that Pool keeps request going even if a socket dies in request - cx_pool = self.get_pool((host,port), 10, None, None, None) + cx_pool = self.get_pool((host,port), 10) cx_pool.start_request() # Get the request socket @@ -644,7 +644,7 @@ class _TestPooling(_TestPoolingBase): def test_pool_removes_dead_socket_after_request(self): # Test that Pool handles a socket dying that *used* to be the request # socket. - cx_pool = self.get_pool((host,port), 10, None, None, None) + cx_pool = self.get_pool((host,port), 10) cx_pool._check_interval_seconds = 0 # Always check. cx_pool.start_request() @@ -673,7 +673,7 @@ class _TestPooling(_TestPoolingBase): # When a pool replaces a dead request socket, the semaphore it uses # to enforce max_size should remain unaffected. cx_pool = self.get_pool( - (host, port), 1, None, None, None, wait_queue_timeout=1) + (host, port), 1, wait_queue_timeout=1) cx_pool._check_interval_seconds = 0 # Always check. cx_pool.start_request() @@ -703,10 +703,9 @@ class _TestPooling(_TestPoolingBase): # the request, that the socket is reclaimed into the pool. cx_pool = self.get_pool( pair=(host,port), - max_size=10, - net_timeout=1000, - conn_timeout=1000, - ssl_context=None, + max_pool_size=10, + connect_timeout=1000, + socket_timeout=1000 ) self.assertEqual(0, len(cx_pool.sockets)) @@ -1013,13 +1012,14 @@ class _TestMaxPoolSize(_TestPoolingBase): raise socket.error() test_pool = TestPool( - pair=('example.com', 27017), - max_size=1, - net_timeout=1, - conn_timeout=1, - ssl_context=None, - wait_queue_timeout=1, - use_greenlets=self.use_greenlets) + ('example.com', 27017), + PoolOptions( + max_pool_size=1, + connect_timeout=1, + socket_timeout=1, + wait_queue_timeout=1, + use_greenlets=self.use_greenlets), + ) # First call to get_socket fails; if pool doesn't release its semaphore # then the second call raises "ConnectionFailure: Timed out waiting for @@ -1048,7 +1048,7 @@ class _TestMaxOpenSockets(_TestPoolingBase): """ def get_pool_with_wait_queue_timeout(self, wait_queue_timeout): return self.get_pool((host, port), - 1, None, None, None, + 1, None, None, wait_queue_timeout=wait_queue_timeout, wait_queue_multiple=None) @@ -1095,7 +1095,7 @@ class _TestWaitQueueMultiple(_TestPoolingBase): """ def get_pool_with_wait_queue_multiple(self, wait_queue_multiple): return self.get_pool((host, port), - 2, None, None, None, + 2, None, None, wait_queue_timeout=None, wait_queue_multiple=wait_queue_multiple) diff --git a/test/test_pooling_gevent.py b/test/test_pooling_gevent.py index dce514c7c..5a52757ea 100644 --- a/test/test_pooling_gevent.py +++ b/test/test_pooling_gevent.py @@ -48,12 +48,13 @@ class TestPoolingGeventSpecial(unittest.TestCase): raise SkipTest('Gevent not installed') cx_pool = pool.Pool( - pair=(host, port), - max_size=10, - net_timeout=1000, - conn_timeout=1000, - ssl_context=None, - use_greenlets=True) + (host, port), + pool.PoolOptions( + max_pool_size=10, + connect_timeout=1000, + socket_timeout=1000, + use_greenlets=True) + ) socks = [] @@ -81,12 +82,11 @@ class TestPoolingGeventSpecial(unittest.TestCase): except ImportError: raise SkipTest('Gevent not installed') - pool_args = dict( - pair=(host,port), - max_size=10, - net_timeout=1000, - conn_timeout=1000, - ssl_context=None, + pair = (host, port) + pool_opts = dict( + max_pool_size=10, + connect_timeout=1000, + socket_timeout=1000, ) for use_greenlets, use_request, expect_success in [ @@ -95,9 +95,9 @@ class TestPoolingGeventSpecial(unittest.TestCase): (False, True, False), (False, False, False), ]: - pool_args_cp = pool_args.copy() - pool_args_cp['use_greenlets'] = use_greenlets - cx_pool = pool.Pool(**pool_args_cp) + pool_opts_cp = pool_opts.copy() + pool_opts_cp['use_greenlets'] = use_greenlets + cx_pool = pool.Pool(pair, pool.PoolOptions(**pool_opts_cp)) # Map: greenlet -> socket greenlet2socks = {} @@ -223,12 +223,13 @@ class TestUseGreenletsWithoutGevent(unittest.TestCase): self.assertRaises( ConfigurationError, TestPool, - pair=(host, port), - max_size=10, - net_timeout=1000, - conn_timeout=1000, - ssl_context=None, - use_greenlets=True) + (host, port), + pool.PoolOptions( + max_pool_size=10, + connect_timeout=1000, + socket_timeout=1000, + use_greenlets=True) + ) # Convince PyPy to call __del__. for _ in range(10): diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 570814e3e..d126b899f 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -605,10 +605,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): def test_socket_timeout_ms_validation(self): c = self._get_client(socketTimeoutMS=10 * 1000) - self.assertEqual(10, c._MongoReplicaSetClient__net_timeout) + self.assertEqual(10, + c._MongoReplicaSetClient__pool_opts.socket_timeout) c = self._get_client(socketTimeoutMS=None) - self.assertEqual(None, c._MongoReplicaSetClient__net_timeout) + self.assertEqual(None, + c._MongoReplicaSetClient__pool_opts.socket_timeout) self.assertRaises(ConfigurationError, self._get_client, socketTimeoutMS=0) @@ -691,12 +693,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): def test_waitQueueTimeoutMS(self): client = self._get_client(waitQueueTimeoutMS=2000) pool = get_pool(client) - self.assertEqual(pool.wait_queue_timeout, 2) + self.assertEqual(pool.opts.wait_queue_timeout, 2) def test_waitQueueMultiple(self): client = self._get_client(max_pool_size=3, waitQueueMultiple=2) pool = get_pool(client) - self.assertEqual(pool.wait_queue_multiple, 2) + self.assertEqual(pool.opts.wait_queue_multiple, 2) self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6) def test_tz_aware(self):