PYTHON-727 - Implement and use PoolOptions class
This commit is contained in:
parent
698ddd88c5
commit
c2e6471ee6
@ -22,6 +22,7 @@ from pymongo.cluster_description import (updated_cluster_description,
|
||||
CLUSTER_TYPE,
|
||||
ClusterDescription)
|
||||
from pymongo.errors import InvalidOperation, ConnectionFailure
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.server import Server
|
||||
|
||||
|
||||
@ -176,8 +177,7 @@ class Cluster(object):
|
||||
# TODO: Need PoolSettings, SocketSettings, and SSLContext classes.
|
||||
return self._settings.pool_class(
|
||||
address,
|
||||
max_size=100,
|
||||
net_timeout=None,
|
||||
conn_timeout=20,
|
||||
ssl_context=None,
|
||||
use_greenlets=False)
|
||||
PoolOptions(
|
||||
max_pool_size=100,
|
||||
connect_timeout=20)
|
||||
)
|
||||
|
||||
@ -287,20 +287,20 @@ class MongoClient(common.BaseObject):
|
||||
options[option] = value
|
||||
options.update(opts)
|
||||
|
||||
self.__max_pool_size = common.validate_positive_integer_or_none(
|
||||
'max_pool_size', max_pool_size)
|
||||
|
||||
self.__cursor_manager = CursorManager(self)
|
||||
|
||||
self.__repl = options.get('replicaset')
|
||||
self.__direct = len(seeds) == 1 and not self.__repl
|
||||
|
||||
self.__net_timeout = options.get('sockettimeoutms')
|
||||
self.__conn_timeout = options.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = options.get('waitqueuetimeoutms')
|
||||
self.__wait_queue_multiple = options.get('waitqueuemultiple')
|
||||
max_pool_size = common.validate_positive_integer_or_none(
|
||||
'max_pool_size', max_pool_size)
|
||||
connect_timeout = options.get('connecttimeoutms')
|
||||
socket_timeout = options.get('sockettimeoutms')
|
||||
wait_queue_timeout = options.get('waitqueuetimeoutms')
|
||||
wait_queue_multiple = options.get('waitqueuemultiple')
|
||||
use_greenlets = options.get('use_greenlets', False)
|
||||
|
||||
self.__ssl_ctx = None
|
||||
ssl_context = None
|
||||
use_ssl = options.get('ssl', None)
|
||||
keyfile = options.get('ssl_keyfile', None)
|
||||
certfile = options.get('ssl_certfile', None)
|
||||
@ -335,13 +335,21 @@ class MongoClient(common.BaseObject):
|
||||
ctx.load_verify_locations(ca_certs)
|
||||
if cert_reqs is not None:
|
||||
ctx.verify_mode = cert_reqs
|
||||
self.__ssl_ctx = ctx
|
||||
ssl_context = ctx
|
||||
|
||||
self.__pool_opts = pool.PoolOptions(
|
||||
max_pool_size=max_pool_size,
|
||||
connect_timeout=connect_timeout,
|
||||
socket_timeout=socket_timeout,
|
||||
wait_queue_timeout=wait_queue_timeout,
|
||||
wait_queue_multiple=wait_queue_multiple,
|
||||
ssl_context=ssl_context,
|
||||
use_greenlets=use_greenlets)
|
||||
|
||||
self.__use_greenlets = options.get('use_greenlets', False)
|
||||
self.__pool_class = pool_class
|
||||
|
||||
self.__connecting = False
|
||||
if self.__use_greenlets:
|
||||
if use_greenlets:
|
||||
# Greenlets don't need to lock around access to the Member;
|
||||
# they're only interrupted when they do I/O.
|
||||
self.__connecting_lock = thread_util.DummyLock()
|
||||
@ -352,7 +360,7 @@ class MongoClient(common.BaseObject):
|
||||
self.__event_class = event_class
|
||||
else:
|
||||
# Prevent a cycle; this lambda shouldn't refer to self.
|
||||
g = self.__use_greenlets
|
||||
g = use_greenlets
|
||||
event_class = lambda: thread_util.create_event(g)
|
||||
self.__event_class = event_class
|
||||
|
||||
@ -476,15 +484,7 @@ class MongoClient(common.BaseObject):
|
||||
del self.__auth_credentials[source]
|
||||
|
||||
def __create_pool(self, pair):
|
||||
return self.__pool_class(
|
||||
pair,
|
||||
self.__max_pool_size,
|
||||
self.__net_timeout,
|
||||
self.__conn_timeout,
|
||||
self.__ssl_ctx,
|
||||
use_greenlets=self.__use_greenlets,
|
||||
wait_queue_timeout=self.__wait_queue_timeout,
|
||||
wait_queue_multiple=self.__wait_queue_multiple)
|
||||
return self.__pool_class(pair, self.__pool_opts)
|
||||
|
||||
def __check_auth(self, sock_info):
|
||||
"""Authenticate using cached database credentials.
|
||||
@ -570,7 +570,7 @@ class MongoClient(common.BaseObject):
|
||||
.. versionchanged:: 2.6
|
||||
.. versionadded:: 1.11
|
||||
"""
|
||||
return self.__max_pool_size
|
||||
return self.__pool_opts.max_pool_size
|
||||
|
||||
@property
|
||||
def use_greenlets(self):
|
||||
@ -579,7 +579,7 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
.. versionadded:: 2.4.2
|
||||
"""
|
||||
return self.__use_greenlets
|
||||
return self.__pool_opts.use_greenlets
|
||||
|
||||
@property
|
||||
def nodes(self):
|
||||
|
||||
@ -584,8 +584,6 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
self.__index_cache = {}
|
||||
self.__auth_credentials = {}
|
||||
|
||||
self.__max_pool_size = common.validate_positive_integer_or_none(
|
||||
'max_pool_size', max_pool_size)
|
||||
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
|
||||
self.__document_class = document_class
|
||||
self.__monitor = None
|
||||
@ -624,6 +622,12 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
self.__opts[option] = value
|
||||
self.__opts.update(options)
|
||||
|
||||
max_pool_size = common.validate_positive_integer_or_none(
|
||||
'max_pool_size', max_pool_size)
|
||||
connect_timeout = self.__opts.get('connecttimeoutms')
|
||||
socket_timeout = self.__opts.get('sockettimeoutms')
|
||||
wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
|
||||
wait_queue_multiple = self.__opts.get('waitqueuemultiple')
|
||||
self.__use_greenlets = self.__opts.get('use_greenlets', False)
|
||||
if self.__use_greenlets and not have_gevent:
|
||||
raise ConfigurationError(
|
||||
@ -643,12 +647,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
raise ConfigurationError("the replicaSet "
|
||||
"keyword parameter is required.")
|
||||
|
||||
self.__net_timeout = self.__opts.get('sockettimeoutms')
|
||||
self.__conn_timeout = self.__opts.get('connecttimeoutms')
|
||||
self.__wait_queue_timeout = self.__opts.get('waitqueuetimeoutms')
|
||||
self.__wait_queue_multiple = self.__opts.get('waitqueuemultiple')
|
||||
|
||||
self.__ssl_ctx = None
|
||||
ssl_context = None
|
||||
use_ssl = self.__opts.get('ssl', None)
|
||||
keyfile = self.__opts.get('ssl_keyfile', None)
|
||||
certfile = self.__opts.get('ssl_certfile', None)
|
||||
@ -683,7 +682,16 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
ctx.load_verify_locations(ca_certs)
|
||||
if cert_reqs is not None:
|
||||
ctx.verify_mode = cert_reqs
|
||||
self.__ssl_ctx = ctx
|
||||
ssl_context = ctx
|
||||
|
||||
self.__pool_opts = pool.PoolOptions(
|
||||
max_pool_size=max_pool_size,
|
||||
connect_timeout=connect_timeout,
|
||||
socket_timeout=socket_timeout,
|
||||
wait_queue_timeout=wait_queue_timeout,
|
||||
wait_queue_multiple=wait_queue_multiple,
|
||||
ssl_context=ssl_context,
|
||||
use_greenlets=self.__use_greenlets)
|
||||
|
||||
super(MongoReplicaSetClient, self).__init__(**self.__opts)
|
||||
|
||||
@ -910,7 +918,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
|
||||
.. versionchanged:: 2.6
|
||||
"""
|
||||
return self.__max_pool_size
|
||||
return self.__pool_opts.max_pool_size
|
||||
|
||||
@property
|
||||
def use_greenlets(self):
|
||||
@ -919,7 +927,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
|
||||
.. versionadded:: 2.4.2
|
||||
"""
|
||||
return self.__use_greenlets
|
||||
return self.__pool_opts.use_greenlets
|
||||
|
||||
def get_document_class(self):
|
||||
"""document_class getter"""
|
||||
@ -1031,15 +1039,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
"""Directly call ismaster.
|
||||
Returns (response, connection_pool, ping_time in seconds).
|
||||
"""
|
||||
connection_pool = self.pool_class(
|
||||
host,
|
||||
self.__max_pool_size,
|
||||
self.__net_timeout,
|
||||
self.__conn_timeout,
|
||||
self.__ssl_ctx,
|
||||
use_greenlets=self.__use_greenlets,
|
||||
wait_queue_timeout=self.__wait_queue_timeout,
|
||||
wait_queue_multiple=self.__wait_queue_multiple)
|
||||
connection_pool = self.pool_class(host, self.__pool_opts)
|
||||
|
||||
if self.in_request():
|
||||
connection_pool.start_request()
|
||||
|
||||
128
pymongo/pool.py
128
pymongo/pool.py
@ -55,6 +55,72 @@ def _closed(sock):
|
||||
return len(rd) > 0
|
||||
|
||||
|
||||
class PoolOptions(object):
|
||||
|
||||
__slots__ = ('__max_pool_size', '__connect_timeout', '__socket_timeout',
|
||||
'__wait_queue_timeout', '__wait_queue_multiple',
|
||||
'__ssl_context', '__use_greenlets')
|
||||
|
||||
def __init__(self, max_pool_size=100, connect_timeout=None,
|
||||
socket_timeout=None, wait_queue_timeout=None,
|
||||
wait_queue_multiple=None, ssl_context=None,
|
||||
use_greenlets=False):
|
||||
|
||||
self.__max_pool_size = max_pool_size
|
||||
self.__connect_timeout = connect_timeout
|
||||
self.__socket_timeout = socket_timeout
|
||||
self.__wait_queue_timeout = wait_queue_timeout
|
||||
self.__wait_queue_multiple = wait_queue_multiple
|
||||
self.__ssl_context = ssl_context
|
||||
self.__use_greenlets = use_greenlets
|
||||
|
||||
@property
|
||||
def max_pool_size(self):
|
||||
"""The maximum number of connections that the pool will open
|
||||
simultaneously. If this is set, operations will block if there
|
||||
are `max_pool_size` outstanding connections.
|
||||
"""
|
||||
return self.__max_pool_size
|
||||
|
||||
@property
|
||||
def connect_timeout(self):
|
||||
"""How long a connection can take to be opened before timing out.
|
||||
"""
|
||||
return self.__connect_timeout
|
||||
|
||||
@property
|
||||
def socket_timeout(self):
|
||||
"""How long a send or receive on a socket can take before timing out.
|
||||
"""
|
||||
return self.__socket_timeout
|
||||
|
||||
@property
|
||||
def wait_queue_timeout(self):
|
||||
"""How long a thread will wait for a socket from the pool if the pool
|
||||
has no free sockets.
|
||||
"""
|
||||
return self.__wait_queue_timeout
|
||||
|
||||
@property
|
||||
def wait_queue_multiple(self):
|
||||
"""Multiplied by max_pool_size to give the number of threads allowed
|
||||
to wait for a socket at one time.
|
||||
"""
|
||||
return self.__wait_queue_multiple
|
||||
|
||||
@property
|
||||
def ssl_context(self):
|
||||
"""An SSLContext instance or None.
|
||||
"""
|
||||
return self.__ssl_context
|
||||
|
||||
@property
|
||||
def use_greenlets(self):
|
||||
"""Use greenlet ids for "thread affinity" in requests.
|
||||
"""
|
||||
return self.__use_greenlets
|
||||
|
||||
|
||||
class SocketInfo(object):
|
||||
"""Store a socket with some metadata
|
||||
"""
|
||||
@ -137,27 +203,11 @@ class SocketInfo(object):
|
||||
# Do *not* explicitly inherit from object or Jython won't call __del__
|
||||
# http://bugs.jython.org/issue1057
|
||||
class Pool:
|
||||
def __init__(self, pair, max_size, net_timeout,
|
||||
conn_timeout, ssl_context, use_greenlets,
|
||||
wait_queue_timeout=None, wait_queue_multiple=None):
|
||||
def __init__(self, pair, options):
|
||||
"""
|
||||
:Parameters:
|
||||
- `pair`: a (hostname, port) tuple
|
||||
- `max_size`: The maximum number of open sockets. Calls to
|
||||
`get_socket` will block if this is set, this pool has opened
|
||||
`max_size` sockets, and there are none idle. Set to `None` to
|
||||
disable.
|
||||
- `net_timeout`: timeout in seconds for operations on open connection
|
||||
- `conn_timeout`: timeout in seconds for establishing connection
|
||||
- `ssl_context`: an SSLContext instance or None
|
||||
- `use_greenlets`: bool, if True then start_request() assigns a
|
||||
socket to the current greenlet - otherwise it is assigned to the
|
||||
current thread
|
||||
- `wait_queue_timeout`: (integer) How long (in seconds) a
|
||||
thread will wait for a socket from the pool if the pool has no
|
||||
free sockets.
|
||||
- `wait_queue_multiple`: (integer) Multiplied by max_pool_size to give
|
||||
the number of threads allowed to wait for a socket at one time.
|
||||
- `options`: a PoolOptions instance
|
||||
"""
|
||||
# Only check a socket's health with _closed() every once in a while.
|
||||
# Can override for testing: 0 to always check, None to never check.
|
||||
@ -171,34 +221,31 @@ class Pool:
|
||||
self.pool_id = 0
|
||||
self.pid = os.getpid()
|
||||
self.pair = pair
|
||||
self.max_size = max_size
|
||||
self.net_timeout = net_timeout
|
||||
self.conn_timeout = conn_timeout
|
||||
self.wait_queue_timeout = wait_queue_timeout
|
||||
self.wait_queue_multiple = wait_queue_multiple
|
||||
self.ssl_context = ssl_context
|
||||
self.opts = options
|
||||
|
||||
# Map self._ident.get() -> request socket
|
||||
self._tid_to_sock = {}
|
||||
|
||||
if use_greenlets and not thread_util.have_gevent:
|
||||
if self.opts.use_greenlets and not thread_util.have_gevent:
|
||||
raise ConfigurationError(
|
||||
"The Gevent module is not available. "
|
||||
"Install the gevent package from PyPI."
|
||||
)
|
||||
|
||||
self._ident = thread_util.create_ident(use_greenlets)
|
||||
self._ident = thread_util.create_ident(self.opts.use_greenlets)
|
||||
|
||||
# Count the number of calls to start_request() per thread or greenlet
|
||||
self._request_counter = thread_util.Counter(use_greenlets)
|
||||
self._request_counter = thread_util.Counter(self.opts.use_greenlets)
|
||||
|
||||
if self.wait_queue_multiple is None or self.max_size is None:
|
||||
if (self.opts.wait_queue_multiple is None or
|
||||
self.opts.max_pool_size is None):
|
||||
max_waiters = None
|
||||
else:
|
||||
max_waiters = self.max_size * self.wait_queue_multiple
|
||||
max_waiters = (
|
||||
self.opts.max_pool_size * self.opts.wait_queue_multiple)
|
||||
|
||||
self._socket_semaphore = thread_util.create_semaphore(
|
||||
self.max_size, max_waiters, use_greenlets)
|
||||
self.opts.max_pool_size, max_waiters, self.opts.use_greenlets)
|
||||
|
||||
def reset(self):
|
||||
# Ignore this race condition -- if many threads are resetting at once,
|
||||
@ -255,7 +302,7 @@ class Pool:
|
||||
try:
|
||||
sock = socket.socket(af, socktype, proto)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
sock.settimeout(self.conn_timeout or 20.0)
|
||||
sock.settimeout(self.opts.connect_timeout or 20.0)
|
||||
sock.connect(sa)
|
||||
return sock
|
||||
except socket.error as e:
|
||||
@ -279,22 +326,23 @@ class Pool:
|
||||
"""
|
||||
sock = self.create_connection()
|
||||
hostname = self.pair[0]
|
||||
ssl_context = self.opts.ssl_context
|
||||
|
||||
if self.ssl_context is not None:
|
||||
if ssl_context is not None:
|
||||
try:
|
||||
sock = self.ssl_context.wrap_socket(sock)
|
||||
sock = ssl_context.wrap_socket(sock)
|
||||
except ssl.SSLError:
|
||||
sock.close()
|
||||
raise ConnectionFailure("SSL handshake failed. MongoDB may "
|
||||
"not be configured with SSL support.")
|
||||
if self.ssl_context.verify_mode != ssl.CERT_NONE:
|
||||
if ssl_context.verify_mode != ssl.CERT_NONE:
|
||||
try:
|
||||
match_hostname(sock.getpeercert(), hostname)
|
||||
except CertificateError:
|
||||
sock.close()
|
||||
raise
|
||||
|
||||
sock.settimeout(self.net_timeout)
|
||||
sock.settimeout(self.opts.socket_timeout)
|
||||
return SocketInfo(sock, self.pool_id, hostname)
|
||||
|
||||
def get_socket(self, force=False):
|
||||
@ -334,7 +382,8 @@ class Pool:
|
||||
# having acquired it for this socket.
|
||||
if not self._socket_semaphore.acquire(False):
|
||||
forced = True
|
||||
elif not self._socket_semaphore.acquire(True, self.wait_queue_timeout):
|
||||
elif not self._socket_semaphore.acquire(
|
||||
True, self.opts.wait_queue_timeout):
|
||||
self._raise_wait_queue_timeout()
|
||||
|
||||
# We've now acquired the semaphore and must release it on error.
|
||||
@ -429,8 +478,9 @@ class Pool:
|
||||
"""
|
||||
try:
|
||||
self.lock.acquire()
|
||||
too_many_sockets = (self.max_size is not None
|
||||
and len(self.sockets) >= self.max_size)
|
||||
max_size = self.opts.max_pool_size
|
||||
too_many_sockets = (max_size is not None
|
||||
and len(self.sockets) >= max_size)
|
||||
|
||||
if not too_many_sockets and sock_info.pool_id == self.pool_id:
|
||||
self.sockets.add(sock_info)
|
||||
@ -535,7 +585,7 @@ class Pool:
|
||||
raise ConnectionFailure(
|
||||
'Timed out waiting for socket from pool with max_size %r and'
|
||||
' wait_queue_timeout %r' % (
|
||||
self.max_size, self.wait_queue_timeout))
|
||||
self.opts.max_pool_size, self.opts.wait_queue_timeout))
|
||||
|
||||
def __del__(self):
|
||||
# Avoid ResourceWarnings in Python 3
|
||||
|
||||
@ -18,7 +18,7 @@ import socket
|
||||
|
||||
from pymongo import common
|
||||
from pymongo import MongoClient, MongoReplicaSetClient
|
||||
from pymongo.pool import Pool
|
||||
from pymongo.pool import Pool, PoolOptions
|
||||
|
||||
from test import host as default_host, port as default_port
|
||||
from test.utils import my_partial
|
||||
@ -31,14 +31,8 @@ class MockPool(Pool):
|
||||
self.mock_host, self.mock_port = pair
|
||||
|
||||
# Actually connect to the default server.
|
||||
Pool.__init__(
|
||||
self,
|
||||
pair=(default_host, default_port),
|
||||
max_size=None,
|
||||
net_timeout=None,
|
||||
conn_timeout=20,
|
||||
ssl_context=None,
|
||||
use_greenlets=False)
|
||||
Pool.__init__(self,
|
||||
(default_host, default_port), PoolOptions(connect_timeout=20))
|
||||
|
||||
def get_socket(self, force=False):
|
||||
client = self.client
|
||||
|
||||
@ -506,16 +506,16 @@ class TestClient(IntegrationTest, TestRequestMixin):
|
||||
|
||||
def test_timeouts(self):
|
||||
client = MongoClient(host, port, connectTimeoutMS=10500)
|
||||
self.assertEqual(10.5, get_pool(client).conn_timeout)
|
||||
self.assertEqual(10.5, get_pool(client).opts.connect_timeout)
|
||||
client = MongoClient(host, port, socketTimeoutMS=10500)
|
||||
self.assertEqual(10.5, get_pool(client).net_timeout)
|
||||
self.assertEqual(10.5, get_pool(client).opts.socket_timeout)
|
||||
|
||||
def test_socket_timeout_ms_validation(self):
|
||||
c = get_client(socketTimeoutMS=10 * 1000)
|
||||
self.assertEqual(10, c._MongoClient__net_timeout)
|
||||
self.assertEqual(10, c._MongoClient__pool_opts.socket_timeout)
|
||||
|
||||
c = get_client(socketTimeoutMS=None)
|
||||
self.assertEqual(None, c._MongoClient__net_timeout)
|
||||
self.assertEqual(None, c._MongoClient__pool_opts.socket_timeout)
|
||||
|
||||
self.assertRaises(ConfigurationError,
|
||||
get_client, socketTimeoutMS=0)
|
||||
@ -549,12 +549,12 @@ class TestClient(IntegrationTest, TestRequestMixin):
|
||||
|
||||
def test_waitQueueTimeoutMS(self):
|
||||
client = MongoClient(host, port, waitQueueTimeoutMS=2000)
|
||||
self.assertEqual(get_pool(client).wait_queue_timeout, 2)
|
||||
self.assertEqual(get_pool(client).opts.wait_queue_timeout, 2)
|
||||
|
||||
def test_waitQueueMultiple(self):
|
||||
client = MongoClient(host, port, max_pool_size=3, waitQueueMultiple=2)
|
||||
pool = get_pool(client)
|
||||
self.assertEqual(pool.wait_queue_multiple, 2)
|
||||
self.assertEqual(pool.opts.wait_queue_multiple, 2)
|
||||
self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6)
|
||||
|
||||
def test_tz_aware(self):
|
||||
|
||||
@ -27,7 +27,8 @@ sys.path[0:0] = [""]
|
||||
import pymongo.pool
|
||||
from bson.py3compat import thread
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.pool import Pool, NO_REQUEST, NO_SOCKET_YET, SocketInfo
|
||||
from pymongo.pool import (Pool, PoolOptions,
|
||||
NO_REQUEST, NO_SOCKET_YET, SocketInfo)
|
||||
from pymongo.errors import ConfigurationError, ConnectionFailure
|
||||
from pymongo.errors import ExceededMaxWaiters
|
||||
from test import host, port, SkipTest
|
||||
@ -346,9 +347,9 @@ class _TestPoolingBase(object):
|
||||
opts['use_greenlets'] = self.use_greenlets
|
||||
return get_client(*args, **opts)
|
||||
|
||||
def get_pool(self, *args, **kwargs):
|
||||
def get_pool(self, pair, *args, **kwargs):
|
||||
kwargs['use_greenlets'] = self.use_greenlets
|
||||
return Pool(*args, **kwargs)
|
||||
return Pool(pair, PoolOptions(*args, **kwargs))
|
||||
|
||||
def sleep(self, seconds):
|
||||
if self.use_greenlets:
|
||||
@ -438,7 +439,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
def test_independent_pools(self):
|
||||
# Test for regression of very early PyMongo bug: separate pools shared
|
||||
# state.
|
||||
p = self.get_pool((host, port), 10, None, None, None)
|
||||
p = self.get_pool((host, port), 10)
|
||||
self.c.start_request()
|
||||
self.c.pymongo_test.test.find_one()
|
||||
self.assertEqual(set(), p.sockets)
|
||||
@ -506,10 +507,9 @@ class _TestPooling(_TestPoolingBase):
|
||||
# get_socket() -- doesn't automatically put us in a request any more
|
||||
cx_pool = self.get_pool(
|
||||
pair=(host,port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
ssl_context=None
|
||||
max_pool_size=10,
|
||||
connect_timeout=1000,
|
||||
socket_timeout=1000
|
||||
)
|
||||
|
||||
sock0 = cx_pool.get_socket()
|
||||
@ -547,7 +547,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
# reset() is called after a fork, or after a socket error. Ensure that
|
||||
# a new request is begun if a request was in progress when the reset()
|
||||
# occurred, otherwise no request is begun.
|
||||
p = self.get_pool((host, port), 10, None, None, None)
|
||||
p = self.get_pool((host, port), 10)
|
||||
self.assertFalse(p.in_request())
|
||||
p.start_request()
|
||||
self.assertTrue(p.in_request())
|
||||
@ -560,7 +560,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
|
||||
def test_pool_reuses_open_socket(self):
|
||||
# Test Pool's _check_closed() method doesn't close a healthy socket
|
||||
cx_pool = self.get_pool((host,port), 10, None, None, None)
|
||||
cx_pool = self.get_pool((host,port), 10)
|
||||
cx_pool._check_interval_seconds = 0 # Always check.
|
||||
sock_info = cx_pool.get_socket()
|
||||
cx_pool.maybe_return_socket(sock_info)
|
||||
@ -573,7 +573,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
def test_pool_removes_dead_socket(self):
|
||||
# Test that Pool removes dead socket and the socket doesn't return
|
||||
# itself PYTHON-344
|
||||
cx_pool = self.get_pool((host,port), 10, None, None, None)
|
||||
cx_pool = self.get_pool((host,port), 10)
|
||||
cx_pool._check_interval_seconds = 0 # Always check.
|
||||
sock_info = cx_pool.get_socket()
|
||||
|
||||
@ -589,7 +589,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
|
||||
def test_pool_removes_dead_request_socket_after_check(self):
|
||||
# Test that Pool keeps request going even if a socket dies in request
|
||||
cx_pool = self.get_pool((host,port), 10, None, None, None)
|
||||
cx_pool = self.get_pool((host,port), 10)
|
||||
cx_pool._check_interval_seconds = 0 # Always check.
|
||||
cx_pool.start_request()
|
||||
|
||||
@ -615,7 +615,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
|
||||
def test_pool_removes_dead_request_socket(self):
|
||||
# Test that Pool keeps request going even if a socket dies in request
|
||||
cx_pool = self.get_pool((host,port), 10, None, None, None)
|
||||
cx_pool = self.get_pool((host,port), 10)
|
||||
cx_pool.start_request()
|
||||
|
||||
# Get the request socket
|
||||
@ -644,7 +644,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
def test_pool_removes_dead_socket_after_request(self):
|
||||
# Test that Pool handles a socket dying that *used* to be the request
|
||||
# socket.
|
||||
cx_pool = self.get_pool((host,port), 10, None, None, None)
|
||||
cx_pool = self.get_pool((host,port), 10)
|
||||
cx_pool._check_interval_seconds = 0 # Always check.
|
||||
cx_pool.start_request()
|
||||
|
||||
@ -673,7 +673,7 @@ class _TestPooling(_TestPoolingBase):
|
||||
# When a pool replaces a dead request socket, the semaphore it uses
|
||||
# to enforce max_size should remain unaffected.
|
||||
cx_pool = self.get_pool(
|
||||
(host, port), 1, None, None, None, wait_queue_timeout=1)
|
||||
(host, port), 1, wait_queue_timeout=1)
|
||||
|
||||
cx_pool._check_interval_seconds = 0 # Always check.
|
||||
cx_pool.start_request()
|
||||
@ -703,10 +703,9 @@ class _TestPooling(_TestPoolingBase):
|
||||
# the request, that the socket is reclaimed into the pool.
|
||||
cx_pool = self.get_pool(
|
||||
pair=(host,port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
ssl_context=None,
|
||||
max_pool_size=10,
|
||||
connect_timeout=1000,
|
||||
socket_timeout=1000
|
||||
)
|
||||
|
||||
self.assertEqual(0, len(cx_pool.sockets))
|
||||
@ -1013,13 +1012,14 @@ class _TestMaxPoolSize(_TestPoolingBase):
|
||||
raise socket.error()
|
||||
|
||||
test_pool = TestPool(
|
||||
pair=('example.com', 27017),
|
||||
max_size=1,
|
||||
net_timeout=1,
|
||||
conn_timeout=1,
|
||||
ssl_context=None,
|
||||
wait_queue_timeout=1,
|
||||
use_greenlets=self.use_greenlets)
|
||||
('example.com', 27017),
|
||||
PoolOptions(
|
||||
max_pool_size=1,
|
||||
connect_timeout=1,
|
||||
socket_timeout=1,
|
||||
wait_queue_timeout=1,
|
||||
use_greenlets=self.use_greenlets),
|
||||
)
|
||||
|
||||
# First call to get_socket fails; if pool doesn't release its semaphore
|
||||
# then the second call raises "ConnectionFailure: Timed out waiting for
|
||||
@ -1048,7 +1048,7 @@ class _TestMaxOpenSockets(_TestPoolingBase):
|
||||
"""
|
||||
def get_pool_with_wait_queue_timeout(self, wait_queue_timeout):
|
||||
return self.get_pool((host, port),
|
||||
1, None, None, None,
|
||||
1, None, None,
|
||||
wait_queue_timeout=wait_queue_timeout,
|
||||
wait_queue_multiple=None)
|
||||
|
||||
@ -1095,7 +1095,7 @@ class _TestWaitQueueMultiple(_TestPoolingBase):
|
||||
"""
|
||||
def get_pool_with_wait_queue_multiple(self, wait_queue_multiple):
|
||||
return self.get_pool((host, port),
|
||||
2, None, None, None,
|
||||
2, None, None,
|
||||
wait_queue_timeout=None,
|
||||
wait_queue_multiple=wait_queue_multiple)
|
||||
|
||||
|
||||
@ -48,12 +48,13 @@ class TestPoolingGeventSpecial(unittest.TestCase):
|
||||
raise SkipTest('Gevent not installed')
|
||||
|
||||
cx_pool = pool.Pool(
|
||||
pair=(host, port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
ssl_context=None,
|
||||
use_greenlets=True)
|
||||
(host, port),
|
||||
pool.PoolOptions(
|
||||
max_pool_size=10,
|
||||
connect_timeout=1000,
|
||||
socket_timeout=1000,
|
||||
use_greenlets=True)
|
||||
)
|
||||
|
||||
socks = []
|
||||
|
||||
@ -81,12 +82,11 @@ class TestPoolingGeventSpecial(unittest.TestCase):
|
||||
except ImportError:
|
||||
raise SkipTest('Gevent not installed')
|
||||
|
||||
pool_args = dict(
|
||||
pair=(host,port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
ssl_context=None,
|
||||
pair = (host, port)
|
||||
pool_opts = dict(
|
||||
max_pool_size=10,
|
||||
connect_timeout=1000,
|
||||
socket_timeout=1000,
|
||||
)
|
||||
|
||||
for use_greenlets, use_request, expect_success in [
|
||||
@ -95,9 +95,9 @@ class TestPoolingGeventSpecial(unittest.TestCase):
|
||||
(False, True, False),
|
||||
(False, False, False),
|
||||
]:
|
||||
pool_args_cp = pool_args.copy()
|
||||
pool_args_cp['use_greenlets'] = use_greenlets
|
||||
cx_pool = pool.Pool(**pool_args_cp)
|
||||
pool_opts_cp = pool_opts.copy()
|
||||
pool_opts_cp['use_greenlets'] = use_greenlets
|
||||
cx_pool = pool.Pool(pair, pool.PoolOptions(**pool_opts_cp))
|
||||
|
||||
# Map: greenlet -> socket
|
||||
greenlet2socks = {}
|
||||
@ -223,12 +223,13 @@ class TestUseGreenletsWithoutGevent(unittest.TestCase):
|
||||
self.assertRaises(
|
||||
ConfigurationError,
|
||||
TestPool,
|
||||
pair=(host, port),
|
||||
max_size=10,
|
||||
net_timeout=1000,
|
||||
conn_timeout=1000,
|
||||
ssl_context=None,
|
||||
use_greenlets=True)
|
||||
(host, port),
|
||||
pool.PoolOptions(
|
||||
max_pool_size=10,
|
||||
connect_timeout=1000,
|
||||
socket_timeout=1000,
|
||||
use_greenlets=True)
|
||||
)
|
||||
|
||||
# Convince PyPy to call __del__.
|
||||
for _ in range(10):
|
||||
|
||||
@ -605,10 +605,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
|
||||
def test_socket_timeout_ms_validation(self):
|
||||
c = self._get_client(socketTimeoutMS=10 * 1000)
|
||||
self.assertEqual(10, c._MongoReplicaSetClient__net_timeout)
|
||||
self.assertEqual(10,
|
||||
c._MongoReplicaSetClient__pool_opts.socket_timeout)
|
||||
|
||||
c = self._get_client(socketTimeoutMS=None)
|
||||
self.assertEqual(None, c._MongoReplicaSetClient__net_timeout)
|
||||
self.assertEqual(None,
|
||||
c._MongoReplicaSetClient__pool_opts.socket_timeout)
|
||||
|
||||
self.assertRaises(ConfigurationError,
|
||||
self._get_client, socketTimeoutMS=0)
|
||||
@ -691,12 +693,12 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
def test_waitQueueTimeoutMS(self):
|
||||
client = self._get_client(waitQueueTimeoutMS=2000)
|
||||
pool = get_pool(client)
|
||||
self.assertEqual(pool.wait_queue_timeout, 2)
|
||||
self.assertEqual(pool.opts.wait_queue_timeout, 2)
|
||||
|
||||
def test_waitQueueMultiple(self):
|
||||
client = self._get_client(max_pool_size=3, waitQueueMultiple=2)
|
||||
pool = get_pool(client)
|
||||
self.assertEqual(pool.wait_queue_multiple, 2)
|
||||
self.assertEqual(pool.opts.wait_queue_multiple, 2)
|
||||
self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6)
|
||||
|
||||
def test_tz_aware(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user