PYTHON-525 Remove "force" param for Pool.get_socket().
MongoReplicaSetClient's old monitor used "force" to get a socket even when the pool had reached its max size. This was so complicated and bug-prone that I prohibited it in the Server Discovery And Monitoring Spec: monitors use dedicated sockets now instead of sharing a pool with the application.
This commit is contained in:
parent
081ca08163
commit
647ed74dcf
@ -148,7 +148,6 @@ class SocketInfo(object):
|
||||
self.authset = set()
|
||||
self.closed = False
|
||||
self.last_checkout = time.time()
|
||||
self.forced = False
|
||||
self.pool_ref = weakref.ref(pool)
|
||||
|
||||
# The pool's pool_id changes with each reset() so we can close sockets
|
||||
@ -434,7 +433,7 @@ class Pool:
|
||||
sock.settimeout(self.opts.socket_timeout)
|
||||
return SocketInfo(sock, self, hostname)
|
||||
|
||||
def get_socket(self, all_credentials, force=False):
|
||||
def get_socket(self, all_credentials):
|
||||
"""Get a socket from the pool.
|
||||
|
||||
Returns a :class:`SocketInfo` object wrapping a connected
|
||||
@ -443,12 +442,10 @@ class Pool:
|
||||
|
||||
:Parameters:
|
||||
- `all_credentials`: dict, maps auth source to MongoCredential.
|
||||
- `force`: optional boolean, forces a connection to be returned
|
||||
without blocking, even if `max_size` has been reached.
|
||||
"""
|
||||
# First get a socket, then attempt authentication. Simplifies
|
||||
# semaphore management in the face of network errors during auth.
|
||||
sock_info = self._get_socket_no_auth(force)
|
||||
sock_info = self._get_socket_no_auth()
|
||||
try:
|
||||
sock_info.check_auth(all_credentials)
|
||||
return sock_info
|
||||
@ -456,7 +453,7 @@ class Pool:
|
||||
self.maybe_return_socket(sock_info)
|
||||
raise
|
||||
|
||||
def _get_socket_no_auth(self, force):
|
||||
def _get_socket_no_auth(self):
|
||||
# We use the pid here to avoid issues with fork / multiprocessing.
|
||||
# See test.test_client:TestClient.test_fork for an example of
|
||||
# what could go wrong otherwise
|
||||
@ -474,16 +471,8 @@ class Pool:
|
||||
checked_sock.last_checkout = time.time()
|
||||
return checked_sock
|
||||
|
||||
forced = False
|
||||
# We're not in a request, just get any free socket or create one
|
||||
if force:
|
||||
# If we're doing an internal operation, attempt to play nicely with
|
||||
# max_size, but if there is no open "slot" force the connection
|
||||
# and mark it as forced so we don't release the semaphore without
|
||||
# having acquired it for this socket.
|
||||
if not self._socket_semaphore.acquire(False):
|
||||
forced = True
|
||||
elif not self._socket_semaphore.acquire(
|
||||
# We're not in a request, just get any free socket or create one.
|
||||
if not self._socket_semaphore.acquire(
|
||||
True, self.opts.wait_queue_timeout):
|
||||
self._raise_wait_queue_timeout()
|
||||
|
||||
@ -504,16 +493,13 @@ class Pool:
|
||||
if from_pool:
|
||||
sock_info = self._check(sock_info)
|
||||
|
||||
sock_info.forced = forced
|
||||
|
||||
if req_state == NO_SOCKET_YET:
|
||||
# start_request has been called but we haven't assigned a
|
||||
# socket to the request yet. Let's use this socket for this
|
||||
# request until end_request.
|
||||
self._set_request_state(sock_info)
|
||||
except:
|
||||
if not forced:
|
||||
self._socket_semaphore.release()
|
||||
self._socket_semaphore.release()
|
||||
raise
|
||||
|
||||
sock_info.last_checkout = time.time()
|
||||
@ -552,14 +538,11 @@ class Pool:
|
||||
return
|
||||
|
||||
if self.pid != os.getpid():
|
||||
if not sock_info.forced:
|
||||
self._socket_semaphore.release()
|
||||
self._socket_semaphore.release()
|
||||
self.reset()
|
||||
else:
|
||||
if sock_info.closed:
|
||||
if sock_info.forced:
|
||||
sock_info.forced = False
|
||||
elif sock_info != self._get_request_state():
|
||||
if sock_info != self._get_request_state():
|
||||
self._socket_semaphore.release()
|
||||
return
|
||||
|
||||
@ -582,10 +565,7 @@ class Pool:
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
||||
if sock_info.forced:
|
||||
sock_info.forced = False
|
||||
else:
|
||||
self._socket_semaphore.release()
|
||||
self._socket_semaphore.release()
|
||||
|
||||
def _check(self, sock_info):
|
||||
"""This side-effecty function checks if this pool has been reset since
|
||||
|
||||
@ -38,7 +38,7 @@ class MockPool(Pool):
|
||||
Pool.__init__(self,
|
||||
(default_host, default_port), PoolOptions(connect_timeout=20))
|
||||
|
||||
def get_socket(self, all_credentials, force=False):
|
||||
def get_socket(self, all_credentials):
|
||||
client = self.client
|
||||
host_and_port = '%s:%s' % (self.mock_host, self.mock_port)
|
||||
if host_and_port in client.mock_down_hosts:
|
||||
@ -49,7 +49,7 @@ class MockPool(Pool):
|
||||
+ client.mock_members
|
||||
+ client.mock_mongoses), "bad host: %s" % host_and_port
|
||||
|
||||
sock_info = Pool.get_socket(self, all_credentials, force)
|
||||
sock_info = Pool.get_socket(self, all_credentials)
|
||||
sock_info.mock_host = self.mock_host
|
||||
sock_info.mock_port = self.mock_port
|
||||
return sock_info
|
||||
|
||||
Loading…
Reference in New Issue
Block a user