diff --git a/doc/changelog.rst b/doc/changelog.rst index a71b81ff2..5567089b4 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -4,6 +4,11 @@ Changelog Changes in Version 4.0 ---------------------- +Breaking Changes in 4.0 +``````````````````````` + +- Removed :mod:`~pymongo.thread_util`. + Issues Resolved ............... diff --git a/pymongo/event_loggers.py b/pymongo/event_loggers.py index 5019ea548..7d5501c37 100644 --- a/pymongo/event_loggers.py +++ b/pymongo/event_loggers.py @@ -171,6 +171,9 @@ class ConnectionPoolLogger(monitoring.ConnectionPoolListener): def pool_created(self, event): logging.info("[pool {0.address}] pool created".format(event)) + def pool_ready(self, event): + logging.info("[pool {0.address}] pool ready".format(event)) + def pool_cleared(self, event): logging.info("[pool {0.address}] pool cleared".format(event)) diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 6e1e24481..2948b3321 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -737,7 +737,7 @@ class MongoClient(common.BaseObject): executor = periodic_executor.PeriodicExecutor( interval=common.KILL_CURSOR_FREQUENCY, - min_interval=0.5, + min_interval=common.MIN_HEARTBEAT_INTERVAL, target=target, name="pymongo_kill_cursors_thread") diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 462b22b33..e5b3d88c4 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -255,6 +255,18 @@ class ConnectionPoolListener(_EventListener): """ raise NotImplementedError + def pool_ready(self, event): + """Abstract method to handle a :class:`PoolReadyEvent`. + + Emitted when a Connection Pool is marked ready. + + :Parameters: + - `event`: An instance of :class:`PoolReadyEvent`. + + .. versionadded:: 4.0 + """ + raise NotImplementedError + def pool_cleared(self, event): """Abstract method to handle a `PoolClearedEvent`. @@ -692,6 +704,18 @@ class PoolCreatedEvent(_PoolEvent): self.__class__.__name__, self.address, self.__options) +class PoolReadyEvent(_PoolEvent): + """Published when a Connection Pool is marked ready. + + :Parameters: + - `address`: The address (host, port) pair of the server this Pool is + attempting to connect to. + + .. versionadded:: 4.0 + """ + __slots__ = () + + class PoolClearedEvent(_PoolEvent): """Published when a Connection Pool is cleared. @@ -1475,6 +1499,16 @@ class _EventListeners(object): except Exception: _handle_exception() + def publish_pool_ready(self, address): + """Publish a :class:`PoolReadyEvent` to all pool listeners. + """ + event = PoolReadyEvent(address) + for subscriber in self.__cmap_listeners: + try: + subscriber.pool_ready(event) + except Exception: + _handle_exception() + def publish_pool_cleared(self, address): """Publish a :class:`PoolClearedEvent` to all pool listeners. """ diff --git a/pymongo/pool.py b/pymongo/pool.py index 20164bb8a..d39c9ed8f 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -30,7 +30,7 @@ from pymongo.ssl_support import ( from bson import DEFAULT_CODEC_OPTIONS from bson.py3compat import imap, itervalues, _unicode, PY3 from bson.son import SON -from pymongo import auth, helpers, thread_util, __version__ +from pymongo import auth, helpers, __version__ from pymongo.client_session import _validate_session_write_concern from pymongo.common import (MAX_BSON_SIZE, MAX_CONNECTING, @@ -46,6 +46,7 @@ from pymongo.errors import (AutoReconnect, CertificateError, ConnectionFailure, ConfigurationError, + ExceededMaxWaiters, InvalidOperation, DocumentTooLarge, NetworkTimeout, @@ -309,7 +310,8 @@ class PoolOptions(object): '__wait_queue_timeout', '__wait_queue_multiple', '__ssl_context', '__ssl_match_hostname', '__socket_keepalive', '__event_listeners', '__appname', '__driver', '__metadata', - '__compression_settings', '__max_connecting') + '__compression_settings', '__max_connecting', + '__pause_enabled') def __init__(self, max_pool_size=MAX_POOL_SIZE, min_pool_size=MIN_POOL_SIZE, @@ -318,7 +320,8 @@ class PoolOptions(object): wait_queue_multiple=None, ssl_context=None, ssl_match_hostname=True, socket_keepalive=True, event_listeners=None, appname=None, driver=None, - compression_settings=None, max_connecting=MAX_CONNECTING): + compression_settings=None, max_connecting=MAX_CONNECTING, + pause_enabled=True): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size @@ -335,6 +338,7 @@ class PoolOptions(object): self.__driver = driver self.__compression_settings = compression_settings self.__max_connecting = max_connecting + self.__pause_enabled = pause_enabled self.__metadata = copy.deepcopy(_METADATA) if appname: self.__metadata['application'] = {'name': appname} @@ -406,6 +410,10 @@ class PoolOptions(object): """ return self.__max_connecting + @property + def pause_enabled(self): + return self.__pause_enabled + @property def max_idle_time_seconds(self): """The maximum number of seconds that a connection can remain @@ -1058,6 +1066,12 @@ class _PoolClosedError(PyMongoError): pass +class PoolState(object): + PAUSED = 1 + READY = 2 + CLOSED = 3 + + # Do *not* explicitly inherit from object or Jython won't call __del__ # http://bugs.jython.org/issue1057 class Pool: @@ -1068,6 +1082,10 @@ class Pool: - `options`: a PoolOptions instance - `handshake`: whether to call ismaster for each new SocketInfo """ + if options.pause_enabled: + self.state = PoolState.PAUSED + else: + self.state = PoolState.READY # Check a socket's health with socket_closed() every once in a while. # Can override for testing: 0 to always check, None to never check. self._check_interval_seconds = 1 @@ -1079,7 +1097,6 @@ class Pool: self.active_sockets = 0 # Monotonically increasing connection ID required for CMAP Events. self.next_connection_id = 1 - self.closed = False # Track whether the sockets in this pool are writeable or not. self.is_writable = None @@ -1098,13 +1115,23 @@ class Pool: if (self.opts.wait_queue_multiple is None or self.opts.max_pool_size is None): - max_waiters = None + max_waiters = float('inf') else: max_waiters = ( self.opts.max_pool_size * self.opts.wait_queue_multiple) - - self._socket_semaphore = thread_util.create_semaphore( - self.opts.max_pool_size, max_waiters) + # The first portion of the wait queue. + # Enforces: maxPoolSize and waitQueueMultiple + # Also used for: clearing the wait queue + self.size_cond = threading.Condition(self.lock) + self.requests = 0 + self.max_pool_size = self.opts.max_pool_size + if self.max_pool_size is None: + self.max_pool_size = float('inf') + self.waiters = 0 + self.max_waiters = max_waiters + # The second portion of the wait queue. + # Enforces: maxConnecting + # Also used for: clearing the wait queue self._max_connecting_cond = threading.Condition(self.lock) self._max_connecting = self.opts.max_connecting self._pending = 0 @@ -1114,10 +1141,23 @@ class Pool: # Similar to active_sockets but includes threads in the wait queue. self.operation_count = 0 - def _reset(self, close): - with self.lock: + def ready(self): + old_state, self.state = self.state, PoolState.READY + if old_state != PoolState.READY: + if self.enabled_for_cmap: + self.opts.event_listeners.publish_pool_ready(self.address) + + @property + def closed(self): + return self.state == PoolState.CLOSED + + def _reset(self, close, pause=True): + old_state = self.state + with self.size_cond: if self.closed: return + if self.opts.pause_enabled and pause: + old_state, self.state = self.state, PoolState.PAUSED self.generation += 1 newpid = os.getpid() if self.pid != newpid: @@ -1126,7 +1166,10 @@ class Pool: self.operation_count = 0 sockets, self.sockets = self.sockets, collections.deque() if close: - self.closed = True + self.state = PoolState.CLOSED + # Clear the wait queue + self._max_connecting_cond.notify_all() + self.size_cond.notify_all() listeners = self.opts.event_listeners # CMAP spec says that close() MUST close sockets before publishing the @@ -1138,7 +1181,7 @@ class Pool: if self.enabled_for_cmap: listeners.publish_pool_closed(self.address) else: - if self.enabled_for_cmap: + if old_state != PoolState.PAUSED and self.enabled_for_cmap: listeners.publish_pool_cleared(self.address) for sock_info in sockets: sock_info.close_socket(ConnectionClosedReason.STALE) @@ -1155,6 +1198,9 @@ class Pool: def reset(self): self._reset(close=False) + def reset_without_pause(self): + self._reset(close=False, pause=False) + def close(self): self._reset(close=True) @@ -1164,6 +1210,9 @@ class Pool: `generation` at the point in time this operation was requested on the pool. """ + if self.state != PoolState.READY: + return + if self.opts.max_idle_time_seconds is not None: with self.lock: while (self.sockets and @@ -1172,15 +1221,14 @@ class Pool: sock_info.close_socket(ConnectionClosedReason.IDLE) while True: - with self.lock: + with self.size_cond: + # There are enough sockets in the pool. if (len(self.sockets) + self.active_sockets >= self.opts.min_pool_size): - # There are enough sockets in the pool. return - - # We must acquire the semaphore to respect max_pool_size. - if not self._socket_semaphore.acquire(False): - return + if self.requests >= self.opts.min_pool_size: + return + self.requests += 1 incremented = False try: with self._max_connecting_cond: @@ -1204,7 +1252,10 @@ class Pool: with self._max_connecting_cond: self._pending -= 1 self._max_connecting_cond.notify() - self._socket_semaphore.release() + + with self.size_cond: + self.requests -= 1 + self.size_cond.notify() def connect(self, all_credentials=None): """Connect to Mongo and return a new SocketInfo. @@ -1289,6 +1340,14 @@ class Pool: if not checkout: self.return_socket(sock_info) + def _raise_if_not_ready(self, emit_event): + if self.state != PoolState.READY: + if self.enabled_for_cmap and emit_event: + self.opts.event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR) + _raise_connection_failure( + self.address, AutoReconnect('connection pool paused')) + def _get_socket(self, all_credentials): """Get or create a SocketInfo. Can raise ConnectionFailure.""" # We use the pid here to avoid issues with fork / multiprocessing. @@ -1313,9 +1372,26 @@ class Pool: deadline = _time() + self.opts.wait_queue_timeout else: deadline = None - if not self._socket_semaphore.acquire( - True, self.opts.wait_queue_timeout): - self._raise_wait_queue_timeout() + + with self.size_cond: + self._raise_if_not_ready(emit_event=True) + if self.waiters >= self.max_waiters: + raise ExceededMaxWaiters( + 'exceeded max waiters: %s threads already waiting' % ( + self.waiters)) + self.waiters += 1 + try: + while not (self.requests < self.max_pool_size): + if not _cond_wait(self.size_cond, deadline): + # Timed out, notify the next thread to ensure a + # timeout doesn't consume the condition. + if self.requests < self.max_pool_size: + self.size_cond.notify() + self._raise_wait_queue_timeout() + self._raise_if_not_ready(emit_event=True) + finally: + self.waiters -= 1 + self.requests += 1 # We've now acquired the semaphore and must release it on error. sock_info = None @@ -1330,6 +1406,7 @@ class Pool: # CMAP: we MUST wait for either maxConnecting OR for a socket # to be checked back into the pool. with self._max_connecting_cond: + self._raise_if_not_ready(emit_event=False) while not (self.sockets or self._pending < self._max_connecting): if not _cond_wait(self._max_connecting_cond, deadline): @@ -1340,6 +1417,7 @@ class Pool: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout() + self._raise_if_not_ready(emit_event=False) try: sock_info = self.sockets.popleft() @@ -1361,11 +1439,11 @@ class Pool: if sock_info: # We checked out a socket but authentication failed. sock_info.close_socket(ConnectionClosedReason.ERROR) - self._socket_semaphore.release() - - if incremented: - with self.lock: + with self.size_cond: + self.requests -= 1 + if incremented: self.active_sockets -= 1 + self.size_cond.notify() if self.enabled_for_cmap and not emitted_event: self.opts.event_listeners.publish_connection_check_out_failed( @@ -1401,10 +1479,11 @@ class Pool: # Notify any threads waiting to create a connection. self._max_connecting_cond.notify() - self._socket_semaphore.release() - with self.lock: + with self.size_cond: + self.requests -= 1 self.active_sockets -= 1 self.operation_count -= 1 + self.size_cond.notify() def _perished(self, sock_info): """Return True and close the connection if it is "perished". diff --git a/pymongo/server.py b/pymongo/server.py index eb145d409..c5ddd9bea 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -61,7 +61,7 @@ class Server(object): self._events.put((self._listener.publish_server_closed, (self._description.address, self._topology_id))) self._monitor.close() - self._pool.reset() + self._pool.reset_without_pause() def request_check(self): """Check the server's state soon.""" diff --git a/pymongo/thread_util.py b/pymongo/thread_util.py deleted file mode 100644 index 3dac4e25f..000000000 --- a/pymongo/thread_util.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2012-2015 MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utilities for multi-threading support.""" - -import threading -try: - from time import monotonic as _time -except ImportError: - from time import time as _time - -from pymongo.monotonic import time as _time -from pymongo.errors import ExceededMaxWaiters - - -### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire -class Semaphore: - - # After Tim Peters' semaphore class, but not quite the same (no maximum) - - def __init__(self, value=1): - if value < 0: - raise ValueError("semaphore initial value must be >= 0") - self._cond = threading.Condition(threading.Lock()) - self._value = value - - def acquire(self, blocking=True, timeout=None): - if not blocking and timeout is not None: - raise ValueError("can't specify timeout for non-blocking acquire") - rc = False - endtime = None - with self._cond: - while self._value == 0: - if not blocking: - break - if timeout is not None: - if endtime is None: - endtime = _time() + timeout - else: - timeout = endtime - _time() - if timeout <= 0: - break - self._cond.wait(timeout) - else: - self._value = self._value - 1 - rc = True - return rc - - __enter__ = acquire - - def release(self): - with self._cond: - self._value = self._value + 1 - self._cond.notify() - - def __exit__(self, t, v, tb): - self.release() - - @property - def counter(self): - return self._value - - -class BoundedSemaphore(Semaphore): - """Semaphore that checks that # releases is <= # acquires""" - def __init__(self, value=1): - Semaphore.__init__(self, value) - self._initial_value = value - - def release(self): - if self._value >= self._initial_value: - raise ValueError("Semaphore released too many times") - return Semaphore.release(self) -### End backport from CPython 3.2 - - -class DummySemaphore(object): - def __init__(self, value=None): - pass - - def acquire(self, blocking=True, timeout=None): - return True - - def release(self): - pass - - -class MaxWaitersBoundedSemaphore(object): - def __init__(self, semaphore_class, value=1, max_waiters=1): - self.waiter_semaphore = semaphore_class(max_waiters) - self.semaphore = semaphore_class(value) - - def acquire(self, blocking=True, timeout=None): - if not self.waiter_semaphore.acquire(False): - raise ExceededMaxWaiters() - try: - return self.semaphore.acquire(blocking, timeout) - finally: - self.waiter_semaphore.release() - - def __getattr__(self, name): - return getattr(self.semaphore, name) - - -class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore): - def __init__(self, value=1, max_waiters=1): - MaxWaitersBoundedSemaphore.__init__( - self, BoundedSemaphore, value, max_waiters) - - -def create_semaphore(max_size, max_waiters): - if max_size is None: - return DummySemaphore() - else: - if max_waiters is None: - return BoundedSemaphore(max_size) - else: - return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters) diff --git a/pymongo/topology.py b/pymongo/topology.py index 20b8bbc08..db00280c5 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -39,6 +39,7 @@ from pymongo.errors import (ConnectionFailure, NetworkTimeout, NotMasterError, OperationFailure, + PyMongoError, ServerSelectionTimeoutError) from pymongo.monitor import SrvMonitor from pymongo.monotonic import time as _time @@ -282,6 +283,12 @@ class Topology(object): # This is a stale isMaster response. Ignore it. return + # CMAP: Ensure the pool is "ready" when the server is selectable. + if server_description.is_server_type_known: + server = self._servers.get(server_description.address) + if server: + server.pool.ready() + suppress_event = ((self._publish_server or self._publish_tp) and sd_old == server_description) if self._publish_server and not suppress_event: @@ -444,7 +451,13 @@ class Topology(object): servers.append((server, server._pool.generation)) for server, generation in servers: - server._pool.remove_stale_sockets(generation, all_credentials) + pool = server._pool + try: + pool.remove_stale_sockets(generation, all_credentials) + except PyMongoError as exc: + ctx = _ErrorContext(exc, 0, generation, False) + self.handle_error(pool.address, ctx) + raise def close(self): """Clear pools and terminate monitors. Topology reopens on demand.""" @@ -686,7 +699,9 @@ class Topology(object): ssl_match_hostname=options.ssl_match_hostname, event_listeners=options.event_listeners, appname=options.appname, - driver=options.driver) + driver=options.driver, + pause_enabled=False, + ) return self._settings.pool_class(address, monitor_pool_options, handshake=False) diff --git a/test/__init__.py b/test/__init__.py index d24601551..d9d362288 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -687,6 +687,21 @@ class ClientContext(object): "Sessions not supported", func=func) + def supports_retryable_writes(self): + if self.storage_engine == 'mmapv1': + return False + if not self.sessions_enabled: + return False + if self.version.at_least(3, 6): + return self.is_mongos or self.is_rs + return False + + def require_retryable_writes(self, func): + """Run a test only if the deployment supports retryable writes.""" + return self._require(self.supports_retryable_writes, + "This server does not support retryable writes", + func=func) + def supports_transactions(self): if self.storage_engine == 'mmapv1': return False diff --git a/test/cmap/connection-must-have-id.json b/test/cmap/connection-must-have-id.json index 7ed679022..f2d6fb95e 100644 --- a/test/cmap/connection-must-have-id.json +++ b/test/cmap/connection-must-have-id.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must have an ID number associated with it", "operations": [ + { + "name": "ready" + }, { "name": "checkOut" }, @@ -42,6 +45,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionPoolClosed", "ConnectionReady" ] diff --git a/test/cmap/connection-must-order-ids.json b/test/cmap/connection-must-order-ids.json index 9b839e8f0..b7c2751dd 100644 --- a/test/cmap/connection-must-order-ids.json +++ b/test/cmap/connection-must-order-ids.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must have IDs assigned in order of creation", "operations": [ + { + "name": "ready" + }, { "name": "checkOut" }, @@ -42,6 +45,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionPoolClosed", "ConnectionReady" ] diff --git a/test/cmap/pool-checkin-destroy-closed.json b/test/cmap/pool-checkin-destroy-closed.json index a73afbf75..55d0c0375 100644 --- a/test/cmap/pool-checkin-destroy-closed.json +++ b/test/cmap/pool-checkin-destroy-closed.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must destroy checked in connection if pool has been closed", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -39,6 +42,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionReady", "ConnectionCheckOutStarted" diff --git a/test/cmap/pool-checkin-destroy-stale.json b/test/cmap/pool-checkin-destroy-stale.json index 600c05207..6ffb8f53d 100644 --- a/test/cmap/pool-checkin-destroy-stale.json +++ b/test/cmap/pool-checkin-destroy-stale.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must destroy checked in connection if it is stale", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -39,6 +42,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionReady", "ConnectionCheckOutStarted" diff --git a/test/cmap/pool-checkin-make-available.json b/test/cmap/pool-checkin-make-available.json index 015928c50..41c522ae6 100644 --- a/test/cmap/pool-checkin-make-available.json +++ b/test/cmap/pool-checkin-make-available.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must make valid checked in connection available", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -34,6 +37,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionReady", "ConnectionCheckOutStarted" diff --git a/test/cmap/pool-checkin.json b/test/cmap/pool-checkin.json index 7073895ad..3b40cec6f 100644 --- a/test/cmap/pool-checkin.json +++ b/test/cmap/pool-checkin.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must have a method of allowing the driver to check in a connection", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -21,6 +24,7 @@ ], "ignore": [ "ConnectionPoolCreated", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionReady", "ConnectionClosed", diff --git a/test/cmap/pool-checkout-connection.json b/test/cmap/pool-checkout-connection.json index 4d39b1568..d89b34260 100644 --- a/test/cmap/pool-checkout-connection.json +++ b/test/cmap/pool-checkout-connection.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must be able to check out a connection", "operations": [ + { + "name": "ready" + }, { "name": "checkOut" } @@ -29,6 +32,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionPoolCreated" ] } diff --git a/test/cmap/pool-checkout-error-closed.json b/test/cmap/pool-checkout-error-closed.json index 3823c23a7..ee2926e1c 100644 --- a/test/cmap/pool-checkout-error-closed.json +++ b/test/cmap/pool-checkout-error-closed.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must throw error if checkOut is called on a closed pool", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn1" @@ -57,6 +60,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionCreated", "ConnectionReady", "ConnectionClosed" diff --git a/test/cmap/pool-checkout-maxConnecting-is-enforced.json b/test/cmap/pool-checkout-maxConnecting-is-enforced.json index 4b67b73ad..80797398f 100644 --- a/test/cmap/pool-checkout-maxConnecting-is-enforced.json +++ b/test/cmap/pool-checkout-maxConnecting-is-enforced.json @@ -26,6 +26,9 @@ "waitQueueTimeoutMS": 5000 }, "operations": [ + { + "name": "ready" + }, { "name": "start", "target": "thread1" @@ -98,6 +101,7 @@ "ConnectionCheckedIn", "ConnectionCheckedOut", "ConnectionClosed", - "ConnectionPoolCreated" + "ConnectionPoolCreated", + "ConnectionPoolReady" ] } diff --git a/test/cmap/pool-checkout-maxConnecting-timeout.json b/test/cmap/pool-checkout-maxConnecting-timeout.json index ef71216ef..9d97a6178 100644 --- a/test/cmap/pool-checkout-maxConnecting-timeout.json +++ b/test/cmap/pool-checkout-maxConnecting-timeout.json @@ -26,6 +26,9 @@ "waitQueueTimeoutMS": 50 }, "operations": [ + { + "name": "ready" + }, { "name": "start", "target": "thread1" @@ -93,6 +96,7 @@ "ConnectionCheckedIn", "ConnectionCheckedOut", "ConnectionClosed", - "ConnectionPoolCreated" + "ConnectionPoolCreated", + "ConnectionPoolReady" ] } diff --git a/test/cmap/pool-checkout-multiple.json b/test/cmap/pool-checkout-multiple.json index fee0d076c..07a4eda62 100644 --- a/test/cmap/pool-checkout-multiple.json +++ b/test/cmap/pool-checkout-multiple.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must be able to check out multiple connections at the same time", "operations": [ + { + "name": "ready" + }, { "name": "start", "target": "thread1" @@ -59,6 +62,7 @@ ], "ignore": [ "ConnectionCreated", + "ConnectionPoolReady", "ConnectionReady", "ConnectionPoolCreated", "ConnectionCheckOutStarted" diff --git a/test/cmap/pool-checkout-no-idle.json b/test/cmap/pool-checkout-no-idle.json index 74325d655..7e6563228 100644 --- a/test/cmap/pool-checkout-no-idle.json +++ b/test/cmap/pool-checkout-no-idle.json @@ -6,6 +6,9 @@ "maxIdleTimeMS": 10 }, "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -52,6 +55,7 @@ ], "ignore": [ "ConnectionReady", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionCheckOutStarted" ] diff --git a/test/cmap/pool-checkout-no-stale.json b/test/cmap/pool-checkout-no-stale.json index 67ee507fe..fcf20621e 100644 --- a/test/cmap/pool-checkout-no-stale.json +++ b/test/cmap/pool-checkout-no-stale.json @@ -3,6 +3,9 @@ "style": "unit", "description": "must destroy and must not check out a stale connection if found while iterating available connections", "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn" @@ -14,6 +17,9 @@ { "name": "clear" }, + { + "name": "ready" + }, { "name": "checkOut" } @@ -52,6 +58,7 @@ ], "ignore": [ "ConnectionReady", + "ConnectionPoolReady", "ConnectionCreated", "ConnectionCheckOutStarted" ] diff --git a/test/cmap/pool-checkout-returned-connection-maxConnecting.json b/test/cmap/pool-checkout-returned-connection-maxConnecting.json index 308d640f0..7ff59ab39 100644 --- a/test/cmap/pool-checkout-returned-connection-maxConnecting.json +++ b/test/cmap/pool-checkout-returned-connection-maxConnecting.json @@ -26,6 +26,9 @@ "waitQueueTimeoutMS": 5000 }, "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn0" @@ -111,6 +114,7 @@ } ], "ignore": [ + "ConnectionPoolReady", "ConnectionClosed", "ConnectionReady", "ConnectionPoolCreated", diff --git a/test/cmap/pool-clear-clears-waitqueue.json b/test/cmap/pool-clear-clears-waitqueue.json new file mode 100644 index 000000000..8df1bfdfb --- /dev/null +++ b/test/cmap/pool-clear-clears-waitqueue.json @@ -0,0 +1,104 @@ +{ + "version": 1, + "style": "unit", + "description": "clearing pool clears the WaitQueue", + "poolOptions": { + "maxPoolSize": 1, + "waitQueueTimeoutMS": 30000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "checkOut", + "thread": "thread3" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 4 + }, + { + "name": "clear" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 3, + "timeout": 1000 + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "connectionError", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "connectionError", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "connectionError", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolReady", + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady", + "ConnectionCheckedIn", + "ConnectionClosed" + ] +} diff --git a/test/cmap/pool-clear-min-size.json b/test/cmap/pool-clear-min-size.json new file mode 100644 index 000000000..00c477c62 --- /dev/null +++ b/test/cmap/pool-clear-min-size.json @@ -0,0 +1,67 @@ +{ + "version": 1, + "style": "unit", + "description": "pool clear halts background minPoolSize establishments", + "poolOptions": { + "minPoolSize": 1 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "waitForEvent", + "event": "ConnectionReady", + "count": 1 + }, + { + "name": "clear" + }, + { + "name": "wait", + "ms": 200 + }, + { + "name": "ready" + }, + { + "name": "waitForEvent", + "event": "ConnectionReady", + "count": 2 + } + ], + "events": [ + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionClosed" + ] +} diff --git a/test/cmap/pool-clear-paused.json b/test/cmap/pool-clear-paused.json new file mode 100644 index 000000000..847f08d84 --- /dev/null +++ b/test/cmap/pool-clear-paused.json @@ -0,0 +1,32 @@ +{ + "version": 1, + "style": "unit", + "description": "clearing a paused pool emits no events", + "operations": [ + { + "name": "clear" + }, + { + "name": "ready" + }, + { + "name": "clear" + }, + { + "name": "clear" + } + ], + "events": [ + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/pool-clear-ready.json b/test/cmap/pool-clear-ready.json new file mode 100644 index 000000000..800c3545a --- /dev/null +++ b/test/cmap/pool-clear-ready.json @@ -0,0 +1,69 @@ +{ + "version": 1, + "style": "unit", + "description": "after clear, cannot check out connections until pool ready", + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "clear" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + }, + { + "name": "ready" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42, + "connectionId": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "address": 42, + "reason": "connectionError" + }, + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionReady", + "ConnectionCheckOutStarted", + "ConnectionCreated" + ] +} diff --git a/test/cmap/pool-close-destroy-conns.json b/test/cmap/pool-close-destroy-conns.json index e1fb9d078..a3d58a213 100644 --- a/test/cmap/pool-close-destroy-conns.json +++ b/test/cmap/pool-close-destroy-conns.json @@ -3,6 +3,9 @@ "style": "unit", "description": "When a pool is closed, it MUST first destroy all available connections in that pool", "operations": [ + { + "name": "ready" + }, { "name": "checkOut" }, @@ -40,6 +43,7 @@ ], "ignore": [ "ConnectionCreated", + "ConnectionPoolReady", "ConnectionReady", "ConnectionPoolCreated", "ConnectionCheckOutStarted", diff --git a/test/cmap/pool-create-max-size.json b/test/cmap/pool-create-max-size.json index b585d0dae..e3a1fa8ed 100644 --- a/test/cmap/pool-create-max-size.json +++ b/test/cmap/pool-create-max-size.json @@ -6,6 +6,9 @@ "maxPoolSize": 3 }, "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn1" @@ -124,6 +127,7 @@ } ], "ignore": [ - "ConnectionReady" + "ConnectionReady", + "ConnectionPoolReady" ] } diff --git a/test/cmap/pool-create-min-size-error.json b/test/cmap/pool-create-min-size-error.json new file mode 100644 index 000000000..4b655123d --- /dev/null +++ b/test/cmap/pool-create-min-size-error.json @@ -0,0 +1,62 @@ +{ + "version": 1, + "style": "integration", + "description": "error during minPoolSize population clears pool", + "runOn": [ + { + "minServerVersion": "4.2.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 50 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "closeConnection": true + } + }, + "poolOptions": { + "minPoolSize": 1 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "waitForEvent", + "event": "ConnectionClosed", + "count": 1 + }, + { + "name": "wait", + "ms": 200 + } + ], + "events": [ + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionClosed", + "address": 42, + "connectionId": 42, + "reason": "error" + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/pool-create-min-size.json b/test/cmap/pool-create-min-size.json index 4fdc42f4e..43118f784 100644 --- a/test/cmap/pool-create-min-size.json +++ b/test/cmap/pool-create-min-size.json @@ -6,6 +6,13 @@ "minPoolSize": 3 }, "operations": [ + { + "name": "wait", + "ms": 200 + }, + { + "name": "ready" + }, { "name": "waitForEvent", "event": "ConnectionCreated", @@ -26,6 +33,10 @@ "address": 42, "options": 42 }, + { + "type": "ConnectionPoolReady", + "address": 42 + }, { "type": "ConnectionCreated", "connectionId": 42, diff --git a/test/cmap/pool-ready-ready.json b/test/cmap/pool-ready-ready.json new file mode 100644 index 000000000..25dfa9c97 --- /dev/null +++ b/test/cmap/pool-ready-ready.json @@ -0,0 +1,39 @@ +{ + "version": 1, + "style": "unit", + "description": "readying a ready pool emits no events", + "operations": [ + { + "name": "ready" + }, + { + "name": "ready" + }, + { + "name": "ready" + }, + { + "name": "clear" + }, + { + "name": "ready" + } + ], + "events": [ + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionPoolReady", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/pool-ready.json b/test/cmap/pool-ready.json new file mode 100644 index 000000000..29ce7326c --- /dev/null +++ b/test/cmap/pool-ready.json @@ -0,0 +1,57 @@ +{ + "version": 1, + "style": "unit", + "description": "pool starts as cleared and becomes ready", + "operations": [ + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + }, + { + "name": "ready" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "connectionError", + "address": 42 + }, + { + "type": "ConnectionPoolReady", + "address": 42 + }, + { + "type": "ConnectionCheckOutStarted", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionReady" + ] +} diff --git a/test/cmap/wait-queue-timeout.json b/test/cmap/wait-queue-timeout.json index ee7cf2795..993209a35 100644 --- a/test/cmap/wait-queue-timeout.json +++ b/test/cmap/wait-queue-timeout.json @@ -7,6 +7,9 @@ "waitQueueTimeoutMS": 20 }, "operations": [ + { + "name": "ready" + }, { "name": "checkOut", "label": "conn0" @@ -66,6 +69,7 @@ "ConnectionCreated", "ConnectionReady", "ConnectionClosed", - "ConnectionPoolCreated" + "ConnectionPoolCreated", + "ConnectionPoolReady" ] } diff --git a/test/discovery_and_monitoring_integration/isMaster-command-error.json b/test/discovery_and_monitoring_integration/isMaster-command-error.json index 4bdfd9adf..0a735dc33 100644 --- a/test/discovery_and_monitoring_integration/isMaster-command-error.json +++ b/test/discovery_and_monitoring_integration/isMaster-command-error.json @@ -39,14 +39,6 @@ "count": 1 } }, - { - "name": "waitForEvent", - "object": "testRunner", - "arguments": { - "event": "PoolClearedEvent", - "count": 1 - } - }, { "name": "insertMany", "object": "collection", diff --git a/test/discovery_and_monitoring_integration/isMaster-network-error.json b/test/discovery_and_monitoring_integration/isMaster-network-error.json index eb1f3eac1..2385a4164 100644 --- a/test/discovery_and_monitoring_integration/isMaster-network-error.json +++ b/test/discovery_and_monitoring_integration/isMaster-network-error.json @@ -38,14 +38,6 @@ "count": 1 } }, - { - "name": "waitForEvent", - "object": "testRunner", - "arguments": { - "event": "PoolClearedEvent", - "count": 1 - } - }, { "name": "insertMany", "object": "collection", diff --git a/test/discovery_and_monitoring_integration/isMaster-timeout.json b/test/discovery_and_monitoring_integration/isMaster-timeout.json index eeee612be..50ad48277 100644 --- a/test/discovery_and_monitoring_integration/isMaster-timeout.json +++ b/test/discovery_and_monitoring_integration/isMaster-timeout.json @@ -39,14 +39,6 @@ "count": 1 } }, - { - "name": "waitForEvent", - "object": "testRunner", - "arguments": { - "event": "PoolClearedEvent", - "count": 1 - } - }, { "name": "insertMany", "object": "collection", diff --git a/test/discovery_and_monitoring_integration/minPoolSize-error.json b/test/discovery_and_monitoring_integration/minPoolSize-error.json new file mode 100644 index 000000000..9605ee4f5 --- /dev/null +++ b/test/discovery_and_monitoring_integration/minPoolSize-error.json @@ -0,0 +1,101 @@ +{ + "runOn": [ + { + "minServerVersion": "4.9" + } + ], + "database_name": "sdam-tests", + "collection_name": "sdam-minPoolSize-error", + "data": [], + "tests": [ + { + "description": "Network error on minPoolSize background creation", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "skip": 3 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "SDAMminPoolSizeError", + "closeConnection": true + } + }, + "clientOptions": { + "heartbeatFrequencyMS": 10000, + "appname": "SDAMminPoolSizeError", + "minPoolSize": 10, + "serverSelectionTimeoutMS": 1000, + "directConnection": true + }, + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolReadyEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "runCommand", + "object": "database", + "command_name": "ping", + "arguments": { + "command": { + "ping": {} + } + }, + "error": true + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "off" + } + } + }, + { + "name": "runCommand", + "object": "database", + "command_name": "ping", + "arguments": { + "command": { + "ping": 1 + } + }, + "error": false + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolReadyEvent", + "count": 2 + } + } + ] + } + ] +} diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 388f89178..7520f2bfb 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -57,6 +57,27 @@ class MockPool(Pool): yield sock_info +class DummyMonitor(object): + def __init__(self, server_description, topology, pool, topology_settings): + self._server_description = server_description + self.opened = False + + def cancel_check(self): + pass + + def join(self): + pass + + def open(self): + self.opened = True + + def request_check(self): + pass + + def close(self): + self.opened = False + + class MockMonitor(Monitor): def __init__( self, diff --git a/test/test_client.py b/test/test_client.py index 6f2d587ed..337602883 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -62,6 +62,7 @@ from pymongo.server_selectors import (any_server_selector, writable_server_selector) from pymongo.server_type import SERVER_TYPE from pymongo.settings import TOPOLOGY_TYPE +from pymongo.topology import _ErrorContext from pymongo.srv_resolver import _HAVE_DNSPYTHON from pymongo.write_concern import WriteConcern from test import (client_context, @@ -1090,7 +1091,8 @@ class TestClient(IntegrationTest): client = rs_or_single_client(maxPoolSize=3, waitQueueMultiple=2) pool = get_pool(client) self.assertEqual(pool.opts.wait_queue_multiple, 2) - self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6) + self.assertEqual(pool.max_waiters, 6) + self.assertEqual(pool.max_pool_size, 3) def test_socketKeepAlive(self): for socketKeepAlive in [True, False]: @@ -1341,7 +1343,7 @@ class TestClient(IntegrationTest): self.assertTrue(sock_info.closed) # The semaphore was decremented despite the error. - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + self.assertEqual(0, pool.requests) @client_context.require_auth def test_auth_network_error(self): @@ -1546,7 +1548,9 @@ class TestClient(IntegrationTest): def run(self): while self.running: - self.pool.reset() + exc = AutoReconnect('mock pool error') + ctx = _ErrorContext(exc, 0, pool.generation, False) + client._topology.handle_error(pool.address, ctx) time.sleep(0.001) t = ResetPoolThread(pool) @@ -1680,7 +1684,7 @@ class TestExhaustCursor(IntegrationTest): # The socket was checked in and the semaphore was decremented. self.assertIn(sock_info, pool.sockets) - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + self.assertEqual(0, pool.requests) def test_exhaust_getmore_server_error(self): # When doing a getmore on an exhaust cursor, the socket stays checked @@ -1739,7 +1743,7 @@ class TestExhaustCursor(IntegrationTest): # The socket was closed and the semaphore was decremented. self.assertNotIn(sock_info, pool.sockets) - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + self.assertEqual(0, pool.requests) def test_exhaust_getmore_network_error(self): # When doing a getmore on an exhaust cursor, the socket stays checked @@ -1766,7 +1770,7 @@ class TestExhaustCursor(IntegrationTest): # The socket was closed and the semaphore was decremented. self.assertNotIn(sock_info, pool.sockets) - self.assertTrue(pool._socket_semaphore.acquire(blocking=False)) + self.assertEqual(0, pool.requests) class TestClientLazyConnect(IntegrationTest): diff --git a/test/test_cmap.py b/test/test_cmap.py index ef40d07bf..bf5328fcd 100644 --- a/test/test_cmap.py +++ b/test/test_cmap.py @@ -16,6 +16,7 @@ import os import sys +import threading import time sys.path[0:0] = [""] @@ -35,23 +36,27 @@ from pymongo.monitoring import (ConnectionCheckedInEvent, ConnectionCreatedEvent, ConnectionReadyEvent, PoolCreatedEvent, + PoolReadyEvent, PoolClearedEvent, PoolClosedEvent) from pymongo.read_preferences import ReadPreference -from pymongo.pool import _PoolClosedError +from pymongo.pool import _PoolClosedError, PoolState -from test import (IntegrationTest, +from test import (client_knobs, + IntegrationTest, unittest) from test.utils import (camel_to_snake, client_context, CMAPListener, get_pool, get_pools, + OvertCommandListener, rs_or_single_client, single_client, TestCreator, wait_until) from test.utils_spec_runner import SpecRunnerThread +from test.pymongo_mocks import DummyMonitor OBJECT_TYPES = { @@ -64,6 +69,7 @@ OBJECT_TYPES = { 'ConnectionReady': ConnectionReadyEvent, 'ConnectionCheckOutStarted': ConnectionCheckOutStartedEvent, 'ConnectionPoolCreated': PoolCreatedEvent, + 'ConnectionPoolReady': PoolReadyEvent, 'ConnectionPoolCleared': PoolClearedEvent, 'ConnectionPoolClosed': PoolClosedEvent, # Error types. @@ -98,13 +104,15 @@ class TestCMAP(IntegrationTest): thread.join() if thread.exc: raise thread.exc + self.assertFalse(thread.ops) def wait_for_event(self, op): """Run the 'waitForEvent' operation.""" event = OBJECT_TYPES[op['event']] count = op['count'] + timeout = op.get('timeout', 10000) / 1000.0 wait_until(lambda: self.listener.event_count(event) >= count, - 'find %s %s event(s)' % (count, event)) + 'find %s %s event(s)' % (count, event), timeout=timeout) def check_out(self, op): """Run the 'checkOut' operation.""" @@ -121,6 +129,10 @@ class TestCMAP(IntegrationTest): sock_info = self.labels[label] self.pool.return_socket(sock_info) + def ready(self, op): + """Run the 'ready' operation.""" + self.pool.ready() + def clear(self, op): """Run the 'clear' operation.""" self.pool.reset() @@ -213,9 +225,13 @@ class TestCMAP(IntegrationTest): opts = test['poolOptions'].copy() opts['event_listeners'] = [self.listener] - client = single_client(**opts) + opts['_monitor_class'] = DummyMonitor + with client_knobs(kill_cursor_frequency=.05, + min_heartbeat_interval=.05): + client = single_client(**opts) self.addCleanup(client.close) - self.pool = get_pool(client) + # self.pool = get_pools(client)[0] + self.pool = list(client._get_topology()._servers.values())[0].pool # Map of target names to Thread objects. self.targets = dict() @@ -342,13 +358,14 @@ class TestCMAP(IntegrationTest): client.admin.command('isMaster') self.assertIsInstance(listener.events[0], PoolCreatedEvent) - self.assertIsInstance(listener.events[1], - ConnectionCheckOutStartedEvent) + self.assertIsInstance(listener.events[1], PoolReadyEvent) self.assertIsInstance(listener.events[2], + ConnectionCheckOutStartedEvent) + self.assertIsInstance(listener.events[3], ConnectionCheckOutFailedEvent) - self.assertIsInstance(listener.events[3], PoolClearedEvent) + self.assertIsInstance(listener.events[4], PoolClearedEvent) - failed_event = listener.events[2] + failed_event = listener.events[3] self.assertEqual( failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR) @@ -363,17 +380,16 @@ class TestCMAP(IntegrationTest): client.admin.command('isMaster') self.assertIsInstance(listener.events[0], PoolCreatedEvent) - self.assertIsInstance(listener.events[1], + self.assertIsInstance(listener.events[1], PoolReadyEvent) + self.assertIsInstance(listener.events[2], ConnectionCheckOutStartedEvent) - self.assertIsInstance(listener.events[2], ConnectionCreatedEvent) + self.assertIsInstance(listener.events[3], ConnectionCreatedEvent) # Error happens here. - self.assertIsInstance(listener.events[3], ConnectionClosedEvent) - self.assertIsInstance(listener.events[4], + self.assertIsInstance(listener.events[4], ConnectionClosedEvent) + self.assertIsInstance(listener.events[5], ConnectionCheckOutFailedEvent) - - failed_event = listener.events[4] - self.assertEqual( - failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR) + self.assertEqual(listener.events[5].reason, + ConnectionCheckOutFailedReason.CONN_ERROR) # # Extra non-spec tests @@ -398,6 +414,73 @@ class TestCMAP(IntegrationTest): self.assertRepr(PoolClearedEvent(host)) self.assertRepr(PoolClosedEvent(host)) + def test_close_leaves_pool_unpaused(self): + # Needed until we implement PYTHON-2463. This test is related to + # test_threads.TestThreads.test_client_disconnect + listener = CMAPListener() + client = single_client(event_listeners=[listener]) + client.admin.command('ping') + pool = get_pool(client) + client.close() + self.assertEqual(1, listener.event_count(PoolClearedEvent)) + self.assertEqual(PoolState.READY, pool.state) + # Checking out a connection should succeed + with pool.get_socket({}): + pass + + @client_context.require_version_max(4, 3) # Remove after SERVER-53624. + @client_context.require_retryable_writes + @client_context.require_failCommand_fail_point + def test_pool_paused_error_is_retryable(self): + cmap_listener = CMAPListener() + cmd_listener = OvertCommandListener() + client = rs_or_single_client( + maxPoolSize=1, + heartbeatFrequencyMS=500, + event_listeners=[cmap_listener, cmd_listener]) + self.addCleanup(client.close) + threads = [InsertThread(client.pymongo_test.test) for _ in range(3)] + fail_command = { + 'mode': {'times': 1}, + 'data': { + 'failCommands': ['insert'], + 'blockConnection': True, + 'blockTimeMS': 1000, + 'errorCode': 91 + }, + } + with self.fail_point(fail_command): + for thread in threads: + thread.start() + for thread in threads: + thread.join() + for thread in threads: + self.assertTrue(thread.passed) + + # The two threads in the wait queue fail the initial connection check + # out attempt and then succeed on retry. + self.assertEqual( + 2, cmap_listener.event_count(ConnectionCheckOutFailedEvent)) + + # Connection check out failures are not reflected in command + # monitoring because we only publish command events _after_ checking + # out a connection. + self.assertEqual(4, len(cmd_listener.results['started'])) + self.assertEqual(3, len(cmd_listener.results['succeeded'])) + self.assertEqual(1, len(cmd_listener.results['failed'])) + + +class InsertThread(threading.Thread): + def __init__(self, collection): + super(InsertThread, self).__init__() + self.daemon = True + self.collection = collection + self.passed = False + + def run(self): + self.collection.insert_one({}) + self.passed = True + def create_test(scenario_def, test, name): def run_scenario(self): diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index c676647e6..601c0b515 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -39,14 +39,18 @@ from pymongo.uri_parser import parse_uri from test import unittest, IntegrationTest from test.utils import (assertion_context, cdecimal_patched, + CMAPListener, client_context, Barrier, get_pool, + HeartbeatEventListener, server_name_to_type, rs_or_single_client, + single_client, TestCreator, wait_until) from test.utils_spec_runner import SpecRunner, SpecRunnerThread +from test.pymongo_mocks import DummyMonitor # Location of JSON test specifications. @@ -54,27 +58,7 @@ _TEST_PATH = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'discovery_and_monitoring') -class MockMonitor(object): - def __init__(self, server_description, topology, pool, topology_settings): - self._server_description = server_description - - def cancel_check(self): - pass - - def open(self): - pass - - def close(self): - pass - - def join(self): - pass - - def request_check(self): - pass - - -def create_mock_topology(uri, monitor_class=MockMonitor): +def create_mock_topology(uri, monitor_class=DummyMonitor): parsed_uri = parse_uri(uri) replica_set_name = None direct_connection = None @@ -318,6 +302,46 @@ class TestIgnoreStaleErrors(IntegrationTest): client.admin.command('ping') +class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener): + pass + + +class TestPoolManagement(IntegrationTest): + @client_context.require_failCommand_appName + def test_pool_unpause(self): + # This test implements the prose test "Connection Pool Management" + listener = CMAPHeartbeatListener() + client = single_client(appName="SDAMPoolManagementTest", + heartbeatFrequencyMS=500, + event_listeners=[listener]) + self.addCleanup(client.close) + # Assert that ConnectionPoolReadyEvent occurs after the first + # ServerHeartbeatSucceededEvent. + listener.wait_for_event(monitoring.PoolReadyEvent, 1) + pool_ready = listener.events_by_type(monitoring.PoolReadyEvent)[0] + hb_succeeded = listener.events_by_type( + monitoring.ServerHeartbeatSucceededEvent)[0] + self.assertGreater( + listener.events.index(pool_ready), + listener.events.index(hb_succeeded)) + + listener.reset() + fail_ismaster = { + 'mode': {'times': 2}, + 'data': { + 'failCommands': ['isMaster'], + 'errorCode': 1234, + 'appName': 'SDAMPoolManagementTest', + }, + } + with self.fail_point(fail_ismaster): + listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) + listener.wait_for_event(monitoring.PoolClearedEvent, 1) + listener.wait_for_event( + monitoring.ServerHeartbeatSucceededEvent, 1) + listener.wait_for_event(monitoring.PoolReadyEvent, 1) + + class TestIntegration(SpecRunner): # Location of JSON test specifications. TEST_PATH = os.path.join( diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py index 61a0afc15..3929412f3 100644 --- a/test/test_heartbeat_monitoring.py +++ b/test/test_heartbeat_monitoring.py @@ -51,12 +51,12 @@ class TestHeartbeatMonitoring(unittest.TestCase): # monitor thread may run multiple times during the execution # of this test. wait_until( - lambda: len(listener.results) >= expected_len, + lambda: len(listener.events) >= expected_len, "publish all events") try: # zip gives us len(expected_results) pairs. - for expected, actual in zip(expected_results, listener.results): + for expected, actual in zip(expected_results, listener.events): self.assertEqual(expected, actual.__class__.__name__) self.assertEqual(actual.connection_id, diff --git a/test/test_pooling.py b/test/test_pooling.py index d10399160..024996a03 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -176,7 +176,9 @@ class _TestPoolingBase(unittest.TestCase): pool_options = client_context.client._topology_settings.pool_options kwargs['ssl_context'] = pool_options.ssl_context kwargs['ssl_match_hostname'] = pool_options.ssl_match_hostname - return Pool(pair, PoolOptions(*args, **kwargs)) + pool = Pool(pair, PoolOptions(*args, **kwargs)) + pool.ready() + return pool class TestPooling(_TestPoolingBase): @@ -483,7 +485,7 @@ class TestPoolMaxSize(_TestPoolingBase): joinall(threads) self.assertEqual(nthreads, self.n_passed) self.assertTrue(len(cx_pool.sockets) > 1) - self.assertEqual(max_pool_size, cx_pool._socket_semaphore.counter) + self.assertEqual(0, cx_pool.requests) def test_max_pool_size_none(self): c = rs_or_single_client(maxPoolSize=None) @@ -529,6 +531,7 @@ class TestPoolMaxSize(_TestPoolingBase): connect_timeout=1, socket_timeout=1, wait_queue_timeout=1)) + test_pool.ready() # First call to get_socket fails; if pool doesn't release its semaphore # then the second call raises "ConnectionFailure: Timed out waiting for diff --git a/test/test_streaming_protocol.py b/test/test_streaming_protocol.py index 8b815a84e..769dc6c67 100644 --- a/test/test_streaming_protocol.py +++ b/test/test_streaming_protocol.py @@ -203,7 +203,7 @@ class TestStreamingProtocol(IntegrationTest): self.assertTrue(hb_failed_events[0].awaited) # Depending on thread scheduling, the failed heartbeat could occur on # the second or third check. - events = [type(e) for e in hb_listener.results[:4]] + events = [type(e) for e in hb_listener.events[:4]] if events == [monitoring.ServerHeartbeatStartedEvent, monitoring.ServerHeartbeatSucceededEvent, monitoring.ServerHeartbeatStartedEvent, diff --git a/test/test_topology.py b/test/test_topology.py index c593c0638..1b3bfe5ab 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -37,24 +37,7 @@ from pymongo.server_selectors import (any_server_selector, from pymongo.settings import TopologySettings from test import client_knobs, unittest from test.utils import MockPool, wait_until - - -class MockMonitor(object): - def __init__(self, server_description, topology, pool, topology_settings): - self._server_description = server_description - self.opened = False - - def cancel_check(self): - pass - - def open(self): - self.opened = True - - def request_check(self): - pass - - def close(self): - self.opened = False +from test.pymongo_mocks import DummyMonitor class SetNameDiscoverySettings(TopologySettings): @@ -68,7 +51,7 @@ address = ('a', 27017) def create_mock_topology( seeds=None, replica_set_name=None, - monitor_class=MockMonitor): + monitor_class=DummyMonitor): partitioned_seeds = list(imap(common.partition_node, seeds or ['a'])) topology_settings = TopologySettings( partitioned_seeds, @@ -501,7 +484,7 @@ class TestMultiServerTopology(TopologyTest): topology_settings = SetNameDiscoverySettings( seeds=[address], pool_class=MockPool, - monitor_class=MockMonitor) + monitor_class=DummyMonitor) t = Topology(topology_settings) self.assertEqual(t.description.replica_set_name, None) @@ -537,7 +520,7 @@ class TestMultiServerTopology(TopologyTest): topology_settings = SetNameDiscoverySettings( seeds=[address], pool_class=MockPool, - monitor_class=MockMonitor) + monitor_class=DummyMonitor) t = Topology(topology_settings) self.assertEqual(t.description.replica_set_name, None) diff --git a/test/utils.py b/test/utils.py index cf38e76e6..ff301540c 100644 --- a/test/utils.py +++ b/test/utils.py @@ -63,7 +63,7 @@ else: IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=50) -class CMAPListener(ConnectionPoolListener): +class BaseListener(object): def __init__(self): self.events = [] @@ -74,9 +74,26 @@ class CMAPListener(ConnectionPoolListener): self.events.append(event) def event_count(self, event_type): - return len([event for event in self.events[:] - if isinstance(event, event_type)]) + return len(self.events_by_type(event_type)) + def events_by_type(self, event_type): + """Return the matching events by event class. + + event_type can be a single class or a tuple of classes. + """ + return self.matching(lambda e: isinstance(e, event_type)) + + def matching(self, matcher): + """Return the matching events.""" + return [event for event in self.events[:] if matcher(event)] + + def wait_for_event(self, event, count): + """Wait for a number of events to be published, or fail.""" + wait_until(lambda: self.event_count(event) >= count, + 'find %s %s event(s)' % (count, event)) + + +class CMAPListener(BaseListener, monitoring.ConnectionPoolListener): def connection_created(self, event): self.add_event(event) @@ -101,6 +118,9 @@ class CMAPListener(ConnectionPoolListener): def pool_created(self, event): self.add_event(event) + def pool_ready(self, event): + self.add_event(event) + def pool_cleared(self, event): self.add_event(event) @@ -199,25 +219,17 @@ class ServerAndTopologyEventListener(ServerEventListener, """Listens to Server and Topology events.""" -class HeartbeatEventListener(monitoring.ServerHeartbeatListener): +class HeartbeatEventListener(BaseListener, monitoring.ServerHeartbeatListener): """Listens to only server heartbeat events.""" - def __init__(self): - self.results = [] - def started(self, event): - self.results.append(event) + self.add_event(event) def succeeded(self, event): - self.results.append(event) + self.add_event(event) def failed(self, event): - self.results.append(event) - - def matching(self, matcher): - """Return the matching events.""" - results = self.results[:] - return [event for event in results if matcher(event)] + self.add_event(event) class MockSocketInfo(object): @@ -252,9 +264,15 @@ class MockPool(object): with self._lock: self.generation += 1 + def ready(self): + pass + def reset(self): self._reset() + def reset_without_pause(self): + self._reset() + def close(self): self._reset() diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index 0d4edb085..21a4332f4 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -30,23 +30,7 @@ from pymongo.server_selectors import writable_server_selector from pymongo.topology import Topology from test import unittest from test.utils import MockPool, parse_read_preference - - -class MockMonitor(object): - def __init__(self, server_description, topology, pool, topology_settings): - pass - - def cancel_check(self): - pass - - def open(self): - pass - - def request_check(self): - pass - - def close(self): - pass +from test.pymongo_mocks import DummyMonitor def get_addresses(server_list): @@ -122,7 +106,7 @@ def get_topology_type_name(scenario_def): def get_topology_settings_dict(**kwargs): settings = dict( - monitor_class=MockMonitor, + monitor_class=DummyMonitor, heartbeat_frequency=HEARTBEAT_FREQUENCY, pool_class=MockPool )