diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 8b2210665..103d0d3ee 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -729,6 +729,11 @@ class ConnectionCheckOutFailedReason(object): POOL_CLOSED = 'poolClosed' """The pool was previously closed, and cannot provide new connections.""" + CONN_ERROR = 'connectionError' + """The connection check out attempt experienced an error while setting up + a new connection. + """ + class _ConnectionEvent(object): """Private base class for some connection events.""" diff --git a/pymongo/pool.py b/pymongo/pool.py index 9356a1a1b..74aa68821 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -500,6 +500,7 @@ class SocketInfo(object): # The pool's pool_id changes with each reset() so we can close sockets # created before the last reset. self.pool_id = pool.pool_id + self.ready = False def ismaster(self, metadata, cluster_time): cmd = SON([('ismaster', 1)]) @@ -710,6 +711,13 @@ class SocketInfo(object): auth.authenticate(credentials, self) self.authset.add(credentials) + # CMAP spec says to publish the ready event only after authenticating + # the connection. + if not self.ready: + self.ready = True + if self.enabled_for_cmap: + self.listeners.publish_connection_ready(self.address, self.id) + def authenticate(self, credentials): """Log in to the server and store these credentials in `authset`. @@ -1068,9 +1076,6 @@ class Pool: if self.handshake: sock_info.ismaster(self.opts.metadata, None) - if self.enabled_for_cmap: - listeners.publish_connection_ready(self.address, conn_id) - return sock_info @contextlib.contextmanager @@ -1102,15 +1107,20 @@ class Pool: # First get a socket, then attempt authentication. Simplifies # semaphore management in the face of network errors during auth. sock_info = self._get_socket_no_auth() + checked_auth = False try: sock_info.check_auth(all_credentials) + checked_auth = True if self.enabled_for_cmap: listeners.publish_connection_checked_out( self.address, sock_info.id) yield sock_info except: # Exception in caller. Decrement semaphore. - self.return_socket(sock_info) + self.return_socket(sock_info, publish_checkin=checked_auth) + if self.enabled_for_cmap and not checked_auth: + self.opts.event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR) raise else: if not checkout: @@ -1156,14 +1166,24 @@ class Pool: self._socket_semaphore.release() with self.lock: self.active_sockets -= 1 + + if self.enabled_for_cmap: + self.opts.event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR) raise return sock_info - def return_socket(self, sock_info): - """Return the socket to the pool, or if it's closed discard it.""" + def return_socket(self, sock_info, publish_checkin=True): + """Return the socket to the pool, or if it's closed discard it. + + :Parameters: + - `sock_info`: The socket to check into the pool. + - `publish_checkin`: If False, a ConnectionCheckedInEvent will not + be published. + """ listeners = self.opts.event_listeners - if self.enabled_for_cmap: + if self.enabled_for_cmap and publish_checkin: listeners.publish_connection_checked_in(self.address, sock_info.id) if self.pid != os.getpid(): self.reset() diff --git a/test/cmap/pool-checkout-error-closed.json b/test/cmap/pool-checkout-error-closed.json index 78c1ea792..3823c23a7 100644 --- a/test/cmap/pool-checkout-error-closed.json +++ b/test/cmap/pool-checkout-error-closed.json @@ -28,18 +28,28 @@ "address": 42, "options": 42 }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, { "type": "ConnectionCheckedOut", + "address": 42, "connectionId": 42 }, { "type": "ConnectionCheckedIn", + "address": 42, "connectionId": 42 }, { "type": "ConnectionPoolClosed", "address": 42 }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, { "type": "ConnectionCheckOutFailed", "address": 42, @@ -49,7 +59,6 @@ "ignore": [ "ConnectionCreated", "ConnectionReady", - "ConnectionClosed", - "ConnectionCheckOutStarted" + "ConnectionClosed" ] } diff --git a/test/test_cmap.py b/test/test_cmap.py index d690a22ef..199a9241c 100644 --- a/test/test_cmap.py +++ b/test/test_cmap.py @@ -14,6 +14,7 @@ """Execute Transactions Spec tests.""" +import functools import os import sys import time @@ -322,11 +323,13 @@ class TestCMAP(IntegrationTest): # def test_1_client_connection_pool_options(self): client = rs_or_single_client(**self.POOL_OPTIONS) + self.addCleanup(client.close) pool_opts = get_pool(client).opts self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS) def test_2_all_client_pools_have_same_options(self): client = rs_or_single_client(**self.POOL_OPTIONS) + self.addCleanup(client.close) client.admin.command('isMaster') # Discover at least one secondary. if client_context.has_secondaries: @@ -344,12 +347,14 @@ class TestCMAP(IntegrationTest): for k, v in self.POOL_OPTIONS.items()]) uri = 'mongodb://%s/?%s' % (client_context.pair, opts) client = rs_or_single_client(uri, **self.credentials) + self.addCleanup(client.close) pool_opts = get_pool(client).opts self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS) def test_4_subscribe_to_events(self): listener = CMAPListener() client = single_client(event_listeners=[listener]) + self.addCleanup(client.close) self.assertEqual(listener.event_count(PoolCreatedEvent), 1) # Creates a new connection. @@ -372,6 +377,66 @@ class TestCMAP(IntegrationTest): self.assertEqual(listener.event_count(PoolClearedEvent), 1) self.assertEqual(listener.event_count(ConnectionClosedEvent), 1) + def test_5_check_out_fails_connection_error(self): + listener = CMAPListener() + client = single_client(event_listeners=[listener]) + self.addCleanup(client.close) + pool = get_pool(client) + + def mock_connect(*args, **kwargs): + raise ConnectionFailure('connect failed') + pool.connect = mock_connect + + # Attempt to create a new connection. + with self.assertRaisesRegex(ConnectionFailure, 'connect failed'): + client.admin.command('isMaster') + + self.assertIsInstance(listener.events[0], PoolCreatedEvent) + self.assertIsInstance(listener.events[1], + ConnectionCheckOutStartedEvent) + self.assertIsInstance(listener.events[2], + ConnectionCheckOutFailedEvent) + self.assertIsInstance(listener.events[3], PoolClearedEvent) + + failed_event = listener.events[2] + self.assertEqual( + failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR) + + def test_5_check_out_fails_auth_error(self): + listener = CMAPListener() + client = single_client(event_listeners=[listener]) + self.addCleanup(client.close) + pool = get_pool(client) + connect = pool.connect + + def mock_check_auth(self, *args, **kwargs): + self.close_socket(ConnectionClosedReason.ERROR) + raise ConnectionFailure('auth failed') + + def mock_connect(*args, **kwargs): + sock_info = connect(*args, **kwargs) + sock_info.check_auth = functools.partial(mock_check_auth, sock_info) + return sock_info + pool.connect = mock_connect + + # Attempt to create a new connection. + with self.assertRaisesRegex(ConnectionFailure, 'auth failed'): + client.admin.command('isMaster') + + self.assertIsInstance(listener.events[0], PoolCreatedEvent) + self.assertIsInstance(listener.events[1], + ConnectionCheckOutStartedEvent) + self.assertIsInstance(listener.events[2], ConnectionCreatedEvent) + # Error happens here. + self.assertIsInstance(listener.events[3], ConnectionClosedEvent) + self.assertIsInstance(listener.events[4], + ConnectionCheckOutFailedEvent) + self.assertIsInstance(listener.events[5], PoolClearedEvent) + + failed_event = listener.events[4] + self.assertEqual( + failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR) + # # Extra non-spec tests # diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py index cf1620649..c6576969c 100644 --- a/test/test_heartbeat_monitoring.py +++ b/test/test_heartbeat_monitoring.py @@ -47,7 +47,7 @@ class MockPool(object): def get_socket(self, all_credentials): return MockSocketInfo() - def return_socket(self, _): + def return_socket(self, *args, **kwargs): pass def _reset(self): diff --git a/test/test_topology.py b/test/test_topology.py index 7852f6863..8e7d53289 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -60,7 +60,7 @@ class MockPool(object): def get_socket(self, all_credentials): return MockSocketInfo() - def return_socket(self, _): + def return_socket(self, *args, **kwargs): pass def _reset(self):