PYTHON-1900 Add "connectionError" as a valid ConnectionCheckOutFailedEvent reason

This commit is contained in:
Shane Harvey 2019-07-16 18:07:54 -07:00
parent 3dfd03c9eb
commit 77913c7d36
6 changed files with 110 additions and 11 deletions

View File

@ -729,6 +729,11 @@ class ConnectionCheckOutFailedReason(object):
POOL_CLOSED = 'poolClosed'
"""The pool was previously closed, and cannot provide new connections."""
CONN_ERROR = 'connectionError'
"""The connection check out attempt experienced an error while setting up
a new connection.
"""
class _ConnectionEvent(object):
"""Private base class for some connection events."""

View File

@ -500,6 +500,7 @@ class SocketInfo(object):
# The pool's pool_id changes with each reset() so we can close sockets
# created before the last reset.
self.pool_id = pool.pool_id
self.ready = False
def ismaster(self, metadata, cluster_time):
cmd = SON([('ismaster', 1)])
@ -710,6 +711,13 @@ class SocketInfo(object):
auth.authenticate(credentials, self)
self.authset.add(credentials)
# CMAP spec says to publish the ready event only after authenticating
# the connection.
if not self.ready:
self.ready = True
if self.enabled_for_cmap:
self.listeners.publish_connection_ready(self.address, self.id)
def authenticate(self, credentials):
"""Log in to the server and store these credentials in `authset`.
@ -1068,9 +1076,6 @@ class Pool:
if self.handshake:
sock_info.ismaster(self.opts.metadata, None)
if self.enabled_for_cmap:
listeners.publish_connection_ready(self.address, conn_id)
return sock_info
@contextlib.contextmanager
@ -1102,15 +1107,20 @@ class Pool:
# First get a socket, then attempt authentication. Simplifies
# semaphore management in the face of network errors during auth.
sock_info = self._get_socket_no_auth()
checked_auth = False
try:
sock_info.check_auth(all_credentials)
checked_auth = True
if self.enabled_for_cmap:
listeners.publish_connection_checked_out(
self.address, sock_info.id)
yield sock_info
except:
# Exception in caller. Decrement semaphore.
self.return_socket(sock_info)
self.return_socket(sock_info, publish_checkin=checked_auth)
if self.enabled_for_cmap and not checked_auth:
self.opts.event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
raise
else:
if not checkout:
@ -1156,14 +1166,24 @@ class Pool:
self._socket_semaphore.release()
with self.lock:
self.active_sockets -= 1
if self.enabled_for_cmap:
self.opts.event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
raise
return sock_info
def return_socket(self, sock_info):
"""Return the socket to the pool, or if it's closed discard it."""
def return_socket(self, sock_info, publish_checkin=True):
"""Return the socket to the pool, or if it's closed discard it.
:Parameters:
- `sock_info`: The socket to check into the pool.
- `publish_checkin`: If False, a ConnectionCheckedInEvent will not
be published.
"""
listeners = self.opts.event_listeners
if self.enabled_for_cmap:
if self.enabled_for_cmap and publish_checkin:
listeners.publish_connection_checked_in(self.address, sock_info.id)
if self.pid != os.getpid():
self.reset()

View File

@ -28,18 +28,28 @@
"address": 42,
"options": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42,
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"address": 42,
"connectionId": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"address": 42,
@ -49,7 +59,6 @@
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted"
"ConnectionClosed"
]
}

View File

@ -14,6 +14,7 @@
"""Execute Transactions Spec tests."""
import functools
import os
import sys
import time
@ -322,11 +323,13 @@ class TestCMAP(IntegrationTest):
#
def test_1_client_connection_pool_options(self):
client = rs_or_single_client(**self.POOL_OPTIONS)
self.addCleanup(client.close)
pool_opts = get_pool(client).opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
def test_2_all_client_pools_have_same_options(self):
client = rs_or_single_client(**self.POOL_OPTIONS)
self.addCleanup(client.close)
client.admin.command('isMaster')
# Discover at least one secondary.
if client_context.has_secondaries:
@ -344,12 +347,14 @@ class TestCMAP(IntegrationTest):
for k, v in self.POOL_OPTIONS.items()])
uri = 'mongodb://%s/?%s' % (client_context.pair, opts)
client = rs_or_single_client(uri, **self.credentials)
self.addCleanup(client.close)
pool_opts = get_pool(client).opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
def test_4_subscribe_to_events(self):
listener = CMAPListener()
client = single_client(event_listeners=[listener])
self.addCleanup(client.close)
self.assertEqual(listener.event_count(PoolCreatedEvent), 1)
# Creates a new connection.
@ -372,6 +377,66 @@ class TestCMAP(IntegrationTest):
self.assertEqual(listener.event_count(PoolClearedEvent), 1)
self.assertEqual(listener.event_count(ConnectionClosedEvent), 1)
def test_5_check_out_fails_connection_error(self):
listener = CMAPListener()
client = single_client(event_listeners=[listener])
self.addCleanup(client.close)
pool = get_pool(client)
def mock_connect(*args, **kwargs):
raise ConnectionFailure('connect failed')
pool.connect = mock_connect
# Attempt to create a new connection.
with self.assertRaisesRegex(ConnectionFailure, 'connect failed'):
client.admin.command('isMaster')
self.assertIsInstance(listener.events[0], PoolCreatedEvent)
self.assertIsInstance(listener.events[1],
ConnectionCheckOutStartedEvent)
self.assertIsInstance(listener.events[2],
ConnectionCheckOutFailedEvent)
self.assertIsInstance(listener.events[3], PoolClearedEvent)
failed_event = listener.events[2]
self.assertEqual(
failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR)
def test_5_check_out_fails_auth_error(self):
listener = CMAPListener()
client = single_client(event_listeners=[listener])
self.addCleanup(client.close)
pool = get_pool(client)
connect = pool.connect
def mock_check_auth(self, *args, **kwargs):
self.close_socket(ConnectionClosedReason.ERROR)
raise ConnectionFailure('auth failed')
def mock_connect(*args, **kwargs):
sock_info = connect(*args, **kwargs)
sock_info.check_auth = functools.partial(mock_check_auth, sock_info)
return sock_info
pool.connect = mock_connect
# Attempt to create a new connection.
with self.assertRaisesRegex(ConnectionFailure, 'auth failed'):
client.admin.command('isMaster')
self.assertIsInstance(listener.events[0], PoolCreatedEvent)
self.assertIsInstance(listener.events[1],
ConnectionCheckOutStartedEvent)
self.assertIsInstance(listener.events[2], ConnectionCreatedEvent)
# Error happens here.
self.assertIsInstance(listener.events[3], ConnectionClosedEvent)
self.assertIsInstance(listener.events[4],
ConnectionCheckOutFailedEvent)
self.assertIsInstance(listener.events[5], PoolClearedEvent)
failed_event = listener.events[4]
self.assertEqual(
failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR)
#
# Extra non-spec tests
#

View File

@ -47,7 +47,7 @@ class MockPool(object):
def get_socket(self, all_credentials):
return MockSocketInfo()
def return_socket(self, _):
def return_socket(self, *args, **kwargs):
pass
def _reset(self):

View File

@ -60,7 +60,7 @@ class MockPool(object):
def get_socket(self, all_credentials):
return MockSocketInfo()
def return_socket(self, _):
def return_socket(self, *args, **kwargs):
pass
def _reset(self):