From f588412b678009e803395ee8732b03cb720c1ffd Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 22 May 2019 15:06:11 -0700 Subject: [PATCH] PYTHON-1842 Implement Connection Monitoring and Pooling spec --- doc/api/pymongo/monitoring.rst | 44 ++ doc/changelog.rst | 12 +- pymongo/client_options.py | 8 +- pymongo/common.py | 6 + pymongo/cursor.py | 4 +- pymongo/errors.py | 2 +- pymongo/monitoring.py | 593 ++++++++++++++++++++- pymongo/pool.py | 192 +++++-- test/cmap/connection-must-have-id.json | 42 ++ test/cmap/connection-must-order-ids.json | 42 ++ test/cmap/pool-checkin-destroy-closed.json | 43 ++ test/cmap/pool-checkin-destroy-stale.json | 43 ++ test/cmap/pool-checkin-make-available.json | 38 ++ test/cmap/pool-checkin.json | 29 + test/cmap/pool-checkout-connection.json | 24 + test/cmap/pool-checkout-error-closed.json | 55 ++ test/cmap/pool-checkout-multiple.json | 63 +++ test/cmap/pool-checkout-no-idle.json | 54 ++ test/cmap/pool-checkout-no-stale.json | 54 ++ test/cmap/pool-close-destroy-conns.json | 46 ++ test/cmap/pool-close.json | 21 + test/cmap/pool-create-max-size.json | 114 ++++ test/cmap/pool-create-min-size.json | 46 ++ test/cmap/pool-create-with-options.json | 32 ++ test/cmap/pool-create.json | 19 + test/cmap/wait-queue-fairness.json | 162 ++++++ test/cmap/wait-queue-timeout.json | 66 +++ test/test_client.py | 2 +- test/test_cmap.py | 404 ++++++++++++++ test/test_collation.py | 4 - test/test_collection.py | 138 +++-- test/test_command_monitoring_spec.py | 7 - test/test_cursor.py | 150 +++--- test/test_discovery_and_monitoring.py | 8 +- test/test_heartbeat_monitoring.py | 17 +- test/test_monitoring.py | 7 - test/test_pooling.py | 14 +- test/test_read_concern.py | 8 - test/test_sdam_monitoring_spec.py | 6 - test/test_topology.py | 8 +- test/utils.py | 6 +- test/utils_selection_tests.py | 3 + 42 files changed, 2365 insertions(+), 271 deletions(-) create mode 100644 test/cmap/connection-must-have-id.json create mode 100644 test/cmap/connection-must-order-ids.json create mode 100644 test/cmap/pool-checkin-destroy-closed.json create mode 100644 test/cmap/pool-checkin-destroy-stale.json create mode 100644 test/cmap/pool-checkin-make-available.json create mode 100644 test/cmap/pool-checkin.json create mode 100644 test/cmap/pool-checkout-connection.json create mode 100644 test/cmap/pool-checkout-error-closed.json create mode 100644 test/cmap/pool-checkout-multiple.json create mode 100644 test/cmap/pool-checkout-no-idle.json create mode 100644 test/cmap/pool-checkout-no-stale.json create mode 100644 test/cmap/pool-close-destroy-conns.json create mode 100644 test/cmap/pool-close.json create mode 100644 test/cmap/pool-create-max-size.json create mode 100644 test/cmap/pool-create-min-size.json create mode 100644 test/cmap/pool-create-with-options.json create mode 100644 test/cmap/pool-create.json create mode 100644 test/cmap/wait-queue-fairness.json create mode 100644 test/cmap/wait-queue-timeout.json create mode 100644 test/test_cmap.py diff --git a/doc/api/pymongo/monitoring.rst b/doc/api/pymongo/monitoring.rst index 6216b1f1a..6ee48173a 100644 --- a/doc/api/pymongo/monitoring.rst +++ b/doc/api/pymongo/monitoring.rst @@ -17,6 +17,10 @@ .. autoclass:: TopologyListener :members: :inherited-members: + .. autoclass:: ConnectionPoolListener + :members: + :inherited-members: + .. autoclass:: CommandStartedEvent :members: :inherited-members: @@ -53,3 +57,43 @@ .. autoclass:: ServerHeartbeatFailedEvent :members: :inherited-members: + + .. autoclass:: PoolCreatedEvent + :members: + :inherited-members: + .. autoclass:: PoolClearedEvent + :members: + :inherited-members: + .. autoclass:: PoolClosedEvent + :members: + :inherited-members: + + .. autoclass:: ConnectionCreatedEvent + :members: + :inherited-members: + .. autoclass:: ConnectionReadyEvent + :members: + :inherited-members: + + .. autoclass:: ConnectionClosedReason + :members: + + .. autoclass:: ConnectionClosedEvent + :members: + :inherited-members: + .. autoclass:: ConnectionCheckOutStartedEvent + :members: + :inherited-members: + + .. autoclass:: ConnectionCheckOutFailedReason + :members: + + .. autoclass:: ConnectionCheckOutFailedEvent + :members: + :inherited-members: + .. autoclass:: ConnectionCheckedOutEvent + :members: + :inherited-members: + .. autoclass:: ConnectionCheckedInEvent + :members: + :inherited-members: diff --git a/doc/changelog.rst b/doc/changelog.rst index 1a0338c56..b0085346f 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -37,16 +37,18 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include: time, with at-most-once semantics. - Support for retryable reads and the ``retryReads`` URI option which is enabled by default. See the :class:`~pymongo.mongo_client.MongoClient` - documentation for details. + documentation for details. Now that supported operations are retried + automatically and transparently, users should consider adjusting any custom + retry logic to prevent an application from inadvertently retrying for too + long. - Support zstandard for wire protocol compression. - Support for periodically polling DNS SRV records to update the mongos proxy list without having to change client configuration. - New method :meth:`pymongo.database.Database.aggregate` to support running database level aggregations. - - Now that supported operations are retried automatically and transparently, - users should consider adjusting any custom retry logic to prevent - an application from inadvertently retrying for too long. +- Support for publishing Connection Monitoring and Pooling events via the new + :class:`~pymongo.monitoring.ConnectionPoolListener` class. See + :mod:`~pymongo.monitoring` for an example. .. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 040284895..c757c83c0 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -110,15 +110,15 @@ def _parse_pool_options(options): """Parse connection pool options.""" max_pool_size = options.get('maxpoolsize', common.MAX_POOL_SIZE) min_pool_size = options.get('minpoolsize', common.MIN_POOL_SIZE) - default_idle_seconds = common.validate_timeout_or_none( - 'maxidletimems', common.MAX_IDLE_TIME_MS) - max_idle_time_seconds = options.get('maxidletimems', default_idle_seconds) + max_idle_time_seconds = options.get( + 'maxidletimems', common.MAX_IDLE_TIME_SEC) if max_pool_size is not None and min_pool_size > max_pool_size: raise ValueError("minPoolSize must be smaller or equal to maxPoolSize") connect_timeout = options.get('connecttimeoutms', common.CONNECT_TIMEOUT) socket_keepalive = options.get('socketkeepalive', True) socket_timeout = options.get('sockettimeoutms') - wait_queue_timeout = options.get('waitqueuetimeoutms') + wait_queue_timeout = options.get( + 'waitqueuetimeoutms', common.WAIT_QUEUE_TIMEOUT) wait_queue_multiple = options.get('waitqueuemultiple') event_listeners = options.get('event_listeners') appname = options.get('appname') diff --git a/pymongo/common.py b/pymongo/common.py index 62ef6761c..df8434c06 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -88,6 +88,12 @@ MIN_POOL_SIZE = 0 # Default value for maxIdleTimeMS. MAX_IDLE_TIME_MS = None +# Default value for maxIdleTimeMS in seconds. +MAX_IDLE_TIME_SEC = None + +# Default value for waitQueueTimeoutMS in seconds. +WAIT_QUEUE_TIMEOUT = None + # Default value for localThresholdMS. LOCAL_THRESHOLD_MS = 15 diff --git a/pymongo/cursor.py b/pymongo/cursor.py index c4a1787a5..92deb9d14 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -37,6 +37,7 @@ from pymongo.message import (_CursorAddress, _RawBatchGetMore, _Query, _RawBatchQuery) +from pymongo.monitoring import ConnectionClosedReason _QUERY_OPTIONS = { @@ -303,7 +304,8 @@ class Cursor(object): # If this is an exhaust cursor and we haven't completely # exhausted the result set we *must* close the socket # to stop the server from sending more data. - self.__exhaust_mgr.sock.close() + self.__exhaust_mgr.sock.close_socket( + ConnectionClosedReason.ERROR) else: address = _CursorAddress( self.__address, self.__collection.full_name) diff --git a/pymongo/errors.py b/pymongo/errors.py index 74e646f68..e5c2fc812 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -234,7 +234,7 @@ class InvalidURI(ConfigurationError): """Raised when trying to parse an invalid mongodb URI.""" -class ExceededMaxWaiters(Exception): +class ExceededMaxWaiters(PyMongoError): """Raised when a thread tries to get a connection from a pool and ``maxPoolSize * waitQueueMultiple`` threads are already waiting. diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 92b838cbe..8b2210665 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -114,6 +114,48 @@ Server discovery and monitoring events are also available. For example:: logging.info("Topology with id {0.topology_id} " "closed".format(event)) +Connection monitoring and pooling events are also available. For example:: + + class ConnectionPoolLogger(ConnectionPoolListener): + + def pool_created(self, event): + logging.info("[pool {0.address}] pool created".format(event)) + + def pool_cleared(self, event): + logging.info("[pool {0.address}] pool cleared".format(event)) + + def pool_closed(self, event): + logging.info("[pool {0.address}] pool closed".format(event)) + + def connection_created(self, event): + logging.info("[pool {0.address}][conn #{0.connection_id}] " + "connection created".format(event)) + + def connection_ready(self, event): + logging.info("[pool {0.address}][conn #{0.connection_id}] " + "connection setup succeeded".format(event)) + + def connection_closed(self, event): + logging.info("[pool {0.address}][conn #{0.connection_id}] " + "connection closed, reason: " + "{0.reason}".format(event)) + + def connection_check_out_started(self, event): + logging.info("[pool {0.address}] connection check out " + "started".format(event)) + + def connection_check_out_failed(self, event): + logging.info("[pool {0.address}] connection check out " + "failed, reason: {0.reason}".format(event)) + + def connection_checked_out(self, event): + logging.info("[pool {0.address}][conn #{0.connection_id}] " + "connection checked out of pool".format(event)) + + def connection_checked_in(self, event): + logging.info("[pool {0.address}][conn #{0.connection_id}] " + "connection checked into pool".format(event)) + Event listeners can also be registered per instance of :class:`~pymongo.mongo_client.MongoClient`:: @@ -134,9 +176,6 @@ will not add that listener to existing client instances. handler first. """ -import sys -import traceback - from collections import namedtuple from bson.py3compat import abc @@ -144,9 +183,10 @@ from pymongo.helpers import _handle_exception _Listeners = namedtuple('Listeners', ('command_listeners', 'server_listeners', - 'server_heartbeat_listeners', 'topology_listeners')) + 'server_heartbeat_listeners', 'topology_listeners', + 'cmap_listeners')) -_LISTENERS = _Listeners([], [], [], []) +_LISTENERS = _Listeners([], [], [], [], []) class _EventListener(object): @@ -155,8 +195,10 @@ class _EventListener(object): class CommandListener(_EventListener): """Abstract base class for command listeners. + Handles `CommandStartedEvent`, `CommandSucceededEvent`, - and `CommandFailedEvent`.""" + and `CommandFailedEvent`. + """ def started(self, event): """Abstract method to handle a `CommandStartedEvent`. @@ -183,8 +225,128 @@ class CommandListener(_EventListener): raise NotImplementedError +class ConnectionPoolListener(_EventListener): + """Abstract base class for connection pool listeners. + + Handles all of the connection pool events defined in the Connection + Monitoring and Pooling Specification: + :class:`PoolCreatedEvent`, :class:`PoolClearedEvent`, + :class:`PoolClosedEvent`, :class:`ConnectionCreatedEvent`, + :class:`ConnectionReadyEvent`, :class:`ConnectionClosedEvent`, + :class:`ConnectionCheckOutStartedEvent`, + :class:`ConnectionCheckOutFailedEvent`, + :class:`ConnectionCheckedOutEvent`, + and :class:`ConnectionCheckedInEvent`. + + .. versionadded:: 3.9 + """ + + def pool_created(self, event): + """Abstract method to handle a :class:`PoolCreatedEvent`. + + Emitted when a Connection Pool is created. + + :Parameters: + - `event`: An instance of :class:`PoolCreatedEvent`. + """ + raise NotImplementedError + + def pool_cleared(self, event): + """Abstract method to handle a `PoolClearedEvent`. + + Emitted when a Connection Pool is cleared. + + :Parameters: + - `event`: An instance of :class:`PoolClearedEvent`. + """ + raise NotImplementedError + + def pool_closed(self, event): + """Abstract method to handle a `PoolClosedEvent`. + + Emitted when a Connection Pool is closed. + + :Parameters: + - `event`: An instance of :class:`PoolClosedEvent`. + """ + raise NotImplementedError + + def connection_created(self, event): + """Abstract method to handle a :class:`ConnectionCreatedEvent`. + + Emitted when a Connection Pool creates a Connection object. + + :Parameters: + - `event`: An instance of :class:`ConnectionCreatedEvent`. + """ + raise NotImplementedError + + def connection_ready(self, event): + """Abstract method to handle a :class:`ConnectionReadyEvent`. + + Emitted when a Connection has finished its setup, and is now ready to + use. + + :Parameters: + - `event`: An instance of :class:`ConnectionReadyEvent`. + """ + raise NotImplementedError + + def connection_closed(self, event): + """Abstract method to handle a :class:`ConnectionClosedEvent`. + + Emitted when a Connection Pool closes a Connection. + + :Parameters: + - `event`: An instance of :class:`ConnectionClosedEvent`. + """ + raise NotImplementedError + + def connection_check_out_started(self, event): + """Abstract method to handle a :class:`ConnectionCheckOutStartedEvent`. + + Emitted when the driver starts attempting to check out a connection. + + :Parameters: + - `event`: An instance of :class:`ConnectionCheckOutStartedEvent`. + """ + raise NotImplementedError + + def connection_check_out_failed(self, event): + """Abstract method to handle a :class:`ConnectionCheckOutFailedEvent`. + + Emitted when the driver's attempt to check out a connection fails. + + :Parameters: + - `event`: An instance of :class:`ConnectionCheckOutFailedEvent`. + """ + raise NotImplementedError + + def connection_checked_out(self, event): + """Abstract method to handle a :class:`ConnectionCheckedOutEvent`. + + Emitted when the driver successfully checks out a Connection. + + :Parameters: + - `event`: An instance of :class:`ConnectionCheckedOutEvent`. + """ + raise NotImplementedError + + def connection_checked_in(self, event): + """Abstract method to handle a :class:`ConnectionCheckedInEvent`. + + Emitted when the driver checks in a Connection back to the Connection + Pool. + + :Parameters: + - `event`: An instance of :class:`ConnectionCheckedInEvent`. + """ + raise NotImplementedError + + class ServerHeartbeatListener(_EventListener): """Abstract base class for server heartbeat listeners. + Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`, and `ServerHeartbeatFailedEvent`. @@ -295,7 +457,8 @@ def _validate_event_listeners(option, listeners): if not isinstance(listener, _EventListener): raise TypeError("Listeners for %s must be either a " "CommandListener, ServerHeartbeatListener, " - "ServerListener, or TopologyListener." % (option,)) + "ServerListener, TopologyListener, or " + "ConnectionPoolListener." % (option,)) return listeners @@ -304,13 +467,14 @@ def register(listener): :Parameters: - `listener`: A subclasses of :class:`CommandListener`, - :class:`ServerHeartbeatListener`, :class:`ServerListener`, or - :class:`TopologyListener`. + :class:`ServerHeartbeatListener`, :class:`ServerListener`, + :class:`TopologyListener`, or :class:`ConnectionPoolListener`. """ if not isinstance(listener, _EventListener): raise TypeError("Listeners for %s must be either a " "CommandListener, ServerHeartbeatListener, " - "ServerListener, or TopologyListener." % (listener,)) + "ServerListener, TopologyListener, or " + "ConnectionPoolListener." % (listener,)) if isinstance(listener, CommandListener): _LISTENERS.command_listeners.append(listener) if isinstance(listener, ServerHeartbeatListener): @@ -319,7 +483,8 @@ def register(listener): _LISTENERS.server_listeners.append(listener) if isinstance(listener, TopologyListener): _LISTENERS.topology_listeners.append(listener) - + if isinstance(listener, ConnectionPoolListener): + _LISTENERS.cmap_listeners.append(listener) # Note - to avoid bugs from forgetting which if these is all lowercase and # which are camelCase, and at the same time avoid having to add a test for @@ -462,6 +627,283 @@ class CommandFailedEvent(_CommandEvent): return self.__failure +class _PoolEvent(object): + """Base class for pool events.""" + __slots__ = ("__address",) + + def __init__(self, address): + self.__address = address + + @property + def address(self): + """The address (host, port) pair of the server the pool is attempting + to connect to. + """ + return self.__address + + def __repr__(self): + return '%s(%r)' % (self.__class__.__name__, self.__address) + + +class PoolCreatedEvent(_PoolEvent): + """Published when a Connection Pool is created. + + :Parameters: + - `address`: The address (host, port) pair of the server this Pool is + attempting to connect to. + + .. versionadded:: 3.9 + """ + __slots__ = ("__options",) + + def __init__(self, address, options): + super(PoolCreatedEvent, self).__init__(address) + self.__options = options + + @property + def options(self): + """Any non-default pool options that were set on this Connection Pool. + """ + return self.__options + + def __repr__(self): + return '%s(%r, %r)' % ( + self.__class__.__name__, self.address, self.__options) + + +class PoolClearedEvent(_PoolEvent): + """Published when a Connection Pool is cleared. + + :Parameters: + - `address`: The address (host, port) pair of the server this Pool is + attempting to connect to. + + .. versionadded:: 3.9 + """ + __slots__ = () + + +class PoolClosedEvent(_PoolEvent): + """Published when a Connection Pool is closed. + + :Parameters: + - `address`: The address (host, port) pair of the server this Pool is + attempting to connect to. + + .. versionadded:: 3.9 + """ + __slots__ = () + + +class ConnectionClosedReason(object): + """An enum that defines values for `reason` on a + :class:`ConnectionClosedEvent`. + + .. versionadded:: 3.9 + """ + + STALE = 'stale' + """The pool was cleared, making the connection no longer valid.""" + + IDLE = 'idle' + """The connection became stale by being idle for too long (maxIdleTimeMS). + """ + + ERROR = 'error' + """The connection experienced an error, making it no longer valid.""" + + POOL_CLOSED = 'poolClosed' + """The pool was closed, making the connection no longer valid.""" + + +class ConnectionCheckOutFailedReason(object): + """An enum that defines values for `reason` on a + :class:`ConnectionCheckOutFailedEvent`. + + .. versionadded:: 3.9 + """ + + TIMEOUT = 'timeout' + """The connection check out attempt exceeded the specified timeout.""" + + POOL_CLOSED = 'poolClosed' + """The pool was previously closed, and cannot provide new connections.""" + + +class _ConnectionEvent(object): + """Private base class for some connection events.""" + __slots__ = ("__address", "__connection_id") + + def __init__(self, address, connection_id): + self.__address = address + self.__connection_id = connection_id + + @property + def address(self): + """The address (host, port) pair of the server this connection is + attempting to connect to. + """ + return self.__address + + @property + def connection_id(self): + """The ID of the Connection.""" + return self.__connection_id + + def __repr__(self): + return '%s(%r, %r)' % ( + self.__class__.__name__, self.__address, self.__connection_id) + + +class ConnectionCreatedEvent(_ConnectionEvent): + """Published when a Connection Pool creates a Connection object. + + NOTE: This connection is not ready for use until the + :class:`ConnectionReadyEvent` is published. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `connection_id`: The integer ID of the Connection in this Pool. + + .. versionadded:: 3.9 + """ + __slots__ = () + + +class ConnectionReadyEvent(_ConnectionEvent): + """Published when a Connection has finished its setup, and is ready to use. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `connection_id`: The integer ID of the Connection in this Pool. + + .. versionadded:: 3.9 + """ + __slots__ = () + + +class ConnectionClosedEvent(_ConnectionEvent): + """Published when a Connection is closed. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `connection_id`: The integer ID of the Connection in this Pool. + - `reason`: A reason explaining why this connection was closed. + + .. versionadded:: 3.9 + """ + __slots__ = ("__reason",) + + def __init__(self, address, connection_id, reason): + super(ConnectionClosedEvent, self).__init__(address, connection_id) + self.__reason = reason + + @property + def reason(self): + """A reason explaining why this connection was closed. + + The reason must be one of the strings from the + :class:`ConnectionClosedReason` enum. + """ + return self.__reason + + def __repr__(self): + return '%s(%r, %r, %r)' % ( + self.__class__.__name__, self.address, self.connection_id, + self.__reason) + + +class ConnectionCheckOutStartedEvent(object): + """Published when the driver starts attempting to check out a connection. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + + .. versionadded:: 3.9 + """ + __slots__ = ("__address",) + + def __init__(self, address): + self.__address = address + + @property + def address(self): + """The address (host, port) pair of the server this connection is + attempting to connect to. + """ + return self.__address + + def __repr__(self): + return '%s(%r)' % (self.__class__.__name__, self.__address) + + +class ConnectionCheckOutFailedEvent(object): + """Published when the driver's attempt to check out a connection fails. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `reason`: A reason explaining why connection check out failed. + + .. versionadded:: 3.9 + """ + __slots__ = ("__address", "__reason") + + def __init__(self, address, reason): + self.__address = address + self.__reason = reason + + @property + def address(self): + """The address (host, port) pair of the server this connection is + attempting to connect to. + """ + return self.__address + + @property + def reason(self): + """A reason explaining why connection check out failed. + + The reason must be one of the strings from the + :class:`ConnectionCheckOutFailedReason` enum. + """ + return self.__reason + + def __repr__(self): + return '%s(%r, %r)' % ( + self.__class__.__name__, self.__address, self.__reason) + + +class ConnectionCheckedOutEvent(_ConnectionEvent): + """Published when the driver successfully checks out a Connection. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `connection_id`: The integer ID of the Connection in this Pool. + + .. versionadded:: 3.9 + """ + __slots__ = () + + +class ConnectionCheckedInEvent(_ConnectionEvent): + """Published when the driver checks in a Connection into the Pool. + + :Parameters: + - `address`: The address (host, port) pair of the server this + Connection is attempting to connect to. + - `connection_id`: The integer ID of the Connection in this Pool. + + .. versionadded:: 3.9 + """ + __slots__ = () + + class _ServerEvent(object): """Base class for server events.""" @@ -473,7 +915,7 @@ class _ServerEvent(object): @property def server_address(self): - """The address (host/port pair) of the server""" + """The address (host, port) pair of the server""" return self.__server_address @property @@ -671,6 +1113,7 @@ class _EventListeners(object): lst = _LISTENERS.server_heartbeat_listeners self.__server_heartbeat_listeners = lst[:] self.__topology_listeners = _LISTENERS.topology_listeners[:] + self.__cmap_listeners = _LISTENERS.cmap_listeners[:] if listeners is not None: for lst in listeners: if isinstance(lst, CommandListener): @@ -681,11 +1124,14 @@ class _EventListeners(object): self.__server_heartbeat_listeners.append(lst) if isinstance(lst, TopologyListener): self.__topology_listeners.append(lst) + if isinstance(lst, ConnectionPoolListener): + self.__cmap_listeners.append(lst) self.__enabled_for_commands = bool(self.__command_listeners) self.__enabled_for_server = bool(self.__server_listeners) self.__enabled_for_server_heartbeat = bool( self.__server_heartbeat_listeners) self.__enabled_for_topology = bool(self.__topology_listeners) + self.__enabled_for_cmap = bool(self.__cmap_listeners) @property def enabled_for_commands(self): @@ -707,6 +1153,11 @@ class _EventListeners(object): """Are any TopologyListener instances registered?""" return self.__enabled_for_topology + @property + def enabled_for_cmap(self): + """Are any ConnectionPoolListener instances registered?""" + return self.__enabled_for_cmap + def event_listeners(self): """List of registered event listeners.""" return (self.__command_listeners[:], @@ -789,7 +1240,7 @@ class _EventListeners(object): listeners. :Parameters: - - `connection_id`: The address (host/port pair) of the connection. + - `connection_id`: The address (host, port) pair of the connection. """ event = ServerHeartbeatStartedEvent(connection_id) for subscriber in self.__server_heartbeat_listeners: @@ -804,7 +1255,7 @@ class _EventListeners(object): listeners. :Parameters: - - `connection_id`: The address (host/port pair) of the connection. + - `connection_id`: The address (host, port) pair of the connection. - `duration`: The execution time of the event in the highest possible resolution for the platform. - `reply`: The command reply. @@ -821,7 +1272,7 @@ class _EventListeners(object): listeners. :Parameters: - - `connection_id`: The address (host/port pair) of the connection. + - `connection_id`: The address (host, port) pair of the connection. - `duration`: The execution time of the event in the highest possible resolution for the platform. - `reply`: The command reply. @@ -837,7 +1288,7 @@ class _EventListeners(object): """Publish a ServerOpeningEvent to all server listeners. :Parameters: - - `server_address`: The address (host/port pair) of the server. + - `server_address`: The address (host, port) pair of the server. - `topology_id`: A unique identifier for the topology this server is a part of. """ @@ -852,7 +1303,7 @@ class _EventListeners(object): """Publish a ServerClosedEvent to all server listeners. :Parameters: - - `server_address`: The address (host/port pair) of the server. + - `server_address`: The address (host, port) pair of the server. - `topology_id`: A unique identifier for the topology this server is a part of. """ @@ -870,7 +1321,7 @@ class _EventListeners(object): :Parameters: - `previous_description`: The previous server description. - - `server_address`: The address (host/port pair) of the server. + - `server_address`: The address (host, port) pair of the server. - `new_description`: The new server description. - `topology_id`: A unique identifier for the topology this server is a part of. @@ -929,3 +1380,109 @@ class _EventListeners(object): subscriber.description_changed(event) except Exception: _handle_exception() + + def publish_pool_created(self, address, options): + """Publish a :class:`PoolCreatedEvent` to all pool listeners. + """ + event = PoolCreatedEvent(address, options) + for subscriber in self.__cmap_listeners: + try: + subscriber.pool_created(event) + except Exception: + _handle_exception() + + def publish_pool_cleared(self, address): + """Publish a :class:`PoolClearedEvent` to all pool listeners. + """ + event = PoolClearedEvent(address) + for subscriber in self.__cmap_listeners: + try: + subscriber.pool_cleared(event) + except Exception: + _handle_exception() + + def publish_pool_closed(self, address): + """Publish a :class:`PoolClosedEvent` to all pool listeners. + """ + event = PoolClosedEvent(address) + for subscriber in self.__cmap_listeners: + try: + subscriber.pool_closed(event) + except Exception: + _handle_exception() + + def publish_connection_created(self, address, connection_id): + """Publish a :class:`ConnectionCreatedEvent` to all connection + listeners. + """ + event = ConnectionCreatedEvent(address, connection_id) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_created(event) + except Exception: + _handle_exception() + + def publish_connection_ready(self, address, connection_id): + """Publish a :class:`ConnectionReadyEvent` to all connection listeners. + """ + event = ConnectionReadyEvent(address, connection_id) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_ready(event) + except Exception: + _handle_exception() + + def publish_connection_closed(self, address, connection_id, reason): + """Publish a :class:`ConnectionClosedEvent` to all connection + listeners. + """ + event = ConnectionClosedEvent(address, connection_id, reason) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_closed(event) + except Exception: + _handle_exception() + + def publish_connection_check_out_started(self, address): + """Publish a :class:`ConnectionCheckOutStartedEvent` to all connection + listeners. + """ + event = ConnectionCheckOutStartedEvent(address) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_check_out_started(event) + except Exception: + _handle_exception() + + def publish_connection_check_out_failed(self, address, reason): + """Publish a :class:`ConnectionCheckOutFailedEvent` to all connection + listeners. + """ + event = ConnectionCheckOutFailedEvent(address, reason) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_check_out_started(event) + except Exception: + _handle_exception() + + def publish_connection_checked_out(self, address, connection_id): + """Publish a :class:`ConnectionCheckedOutEvent` to all connection + listeners. + """ + event = ConnectionCheckedOutEvent(address, connection_id) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_checked_out(event) + except Exception: + _handle_exception() + + def publish_connection_checked_in(self, address, connection_id): + """Publish a :class:`ConnectionCheckedInEvent` to all connection + listeners. + """ + event = ConnectionCheckedInEvent(address, connection_id) + for subscriber in self.__cmap_listeners: + try: + subscriber.connection_checked_in(event) + except Exception: + _handle_exception() diff --git a/pymongo/pool.py b/pymongo/pool.py index 14d9f7348..9356a1a1b 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -43,10 +43,14 @@ from bson.son import SON from pymongo import auth, helpers, thread_util, __version__ from pymongo.client_session import _validate_session_write_concern from pymongo.common import (MAX_BSON_SIZE, + MAX_IDLE_TIME_SEC, MAX_MESSAGE_SIZE, + MAX_POOL_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, - ORDERED_TYPES) + MIN_POOL_SIZE, + ORDERED_TYPES, + WAIT_QUEUE_TIMEOUT) from pymongo.errors import (AutoReconnect, ConnectionFailure, ConfigurationError, @@ -54,9 +58,12 @@ from pymongo.errors import (AutoReconnect, DocumentTooLarge, NetworkTimeout, NotMasterError, - OperationFailure) + OperationFailure, + PyMongoError) from pymongo.ismaster import IsMaster from pymongo.monotonic import time as _time +from pymongo.monitoring import (ConnectionCheckOutFailedReason, + ConnectionClosedReason) from pymongo.network import (command, receive_message, SocketChecker) @@ -293,9 +300,10 @@ class PoolOptions(object): '__event_listeners', '__appname', '__driver', '__metadata', '__compression_settings') - def __init__(self, max_pool_size=100, min_pool_size=0, - max_idle_time_seconds=None, connect_timeout=None, - socket_timeout=None, wait_queue_timeout=None, + def __init__(self, max_pool_size=MAX_POOL_SIZE, + min_pool_size=MIN_POOL_SIZE, + max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None, + socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT, wait_queue_multiple=None, ssl_context=None, ssl_match_hostname=True, socket_keepalive=True, event_listeners=None, appname=None, driver=None, @@ -338,6 +346,23 @@ class PoolOptions(object): self.__metadata['platform'] = "%s|%s" % ( _METADATA['platform'], driver.platform) + @property + def non_default_options(self): + """The non-default options this pool was created with. + + Added for CMAP's :class:`PoolCreatedEvent`. + """ + opts = {} + if self.__max_pool_size != MAX_POOL_SIZE: + opts['maxPoolSize'] = self.__max_pool_size + if self.__min_pool_size != MIN_POOL_SIZE: + opts['minPoolSize'] = self.__min_pool_size + if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC: + opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000 + if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT: + opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000 + return opts + @property def max_pool_size(self): """The maximum allowable number of concurrent connections to each @@ -449,10 +474,12 @@ class SocketInfo(object): - `sock`: a raw socket object - `pool`: a Pool instance - `address`: the server's (host, port) + - `id`: the id of this socket in it's pool """ - def __init__(self, sock, pool, address): + def __init__(self, sock, pool, address, id): self.sock = sock self.address = address + self.id = id self.authset = set() self.closed = False self.last_checkin_time = _time() @@ -466,6 +493,7 @@ class SocketInfo(object): self.is_mongos = False self.op_msg_enabled = False self.listeners = pool.opts.event_listeners + self.enabled_for_cmap = pool.enabled_for_cmap self.compression_settings = pool.opts.compression_settings self.compression_context = None @@ -709,7 +737,10 @@ class SocketInfo(object): 'Cannot use session after authenticating with different' ' credentials') - def close(self): + def close_socket(self, reason): + """Close this connection with a reason.""" + if self.closed: + return self.closed = True # Avoid exceptions on interpreter shutdown. try: @@ -717,6 +748,10 @@ class SocketInfo(object): except Exception: pass + if reason and self.enabled_for_cmap: + self.listeners.publish_connection_closed( + self.address, self.id, reason) + def send_cluster_time(self, command, session, client): """Add cluster time for MongoDB >= 3.6.""" if self.max_wire_version >= 6 and client: @@ -743,7 +778,7 @@ class SocketInfo(object): # ...) is called in Python code, which experiences the signal as a # KeyboardInterrupt from the start, rather than as an initial # socket.error, so we catch that, close the socket, and reraise it. - self.close() + self.close_socket(ConnectionClosedReason.ERROR) if isinstance(error, socket.error): _raise_connection_failure(self.address, error) else: @@ -886,6 +921,13 @@ def _configured_socket(address, options): return sock +class _PoolClosedError(PyMongoError): + """Internal error raised when a thread tries to get a connection from a + closed pool. + """ + pass + + # Do *not* explicitly inherit from object or Jython won't call __del__ # http://bugs.jython.org/issue1057 class Pool: @@ -905,6 +947,9 @@ class Pool: self.sockets = collections.deque() self.lock = threading.Lock() self.active_sockets = 0 + # Monotonically increasing connection ID required for CMAP Events. + self.next_connection_id = 1 + self.closed = False # Keep track of resets, so we notice sockets created before the most # recent reset and close them. @@ -913,6 +958,11 @@ class Pool: self.address = address self.opts = options self.handshake = handshake + # Don't publish events in Monitor pools. + self.enabled_for_cmap = ( + self.handshake and + self.opts.event_listeners is not None and + self.opts.event_listeners.enabled_for_cmap) if (self.opts.wait_queue_multiple is None or self.opts.max_pool_size is None): @@ -924,16 +974,41 @@ class Pool: self._socket_semaphore = thread_util.create_semaphore( self.opts.max_pool_size, max_waiters) self.socket_checker = SocketChecker() + if self.enabled_for_cmap: + self.opts.event_listeners.publish_pool_created( + self.address, self.opts.non_default_options) - def reset(self): + def _reset(self, close): with self.lock: + if self.closed: + return self.pool_id += 1 self.pid = os.getpid() sockets, self.sockets = self.sockets, collections.deque() self.active_sockets = 0 + if close: + self.closed = True - for sock_info in sockets: - sock_info.close() + listeners = self.opts.event_listeners + # CMAP spec says that close() MUST close sockets before publishing the + # PoolClosedEvent but that reset() SHOULD close sockets *after* + # publishing the PoolClearedEvent. + if close: + for sock_info in sockets: + sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED) + if self.enabled_for_cmap: + listeners.publish_pool_closed(self.address) + else: + if self.enabled_for_cmap: + listeners.publish_pool_cleared(self.address) + for sock_info in sockets: + sock_info.close_socket(ConnectionClosedReason.STALE) + + def reset(self): + self._reset(close=False) + + def close(self): + self._reset(close=True) def remove_stale_sockets(self): """Removes stale sockets then adds new ones if pool is too small.""" @@ -942,7 +1017,7 @@ class Pool: while (self.sockets and self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds): sock_info = self.sockets.pop() - sock_info.close() + sock_info.close_socket(ConnectionClosedReason.IDLE) while True: with self.lock: if (len(self.sockets) + self.active_sockets >= @@ -968,17 +1043,34 @@ class Pool: Note that the pool does not keep a reference to the socket -- you must call return_socket() when you're done with it. """ + with self.lock: + conn_id = self.next_connection_id + self.next_connection_id += 1 + + listeners = self.opts.event_listeners + if self.enabled_for_cmap: + listeners.publish_connection_created(self.address, conn_id) + sock = None try: sock = _configured_socket(self.address, self.opts) except socket.error as error: if sock is not None: sock.close() + + if self.enabled_for_cmap: + listeners.publish_connection_closed( + self.address, conn_id, ConnectionClosedReason.ERROR) + _raise_connection_failure(self.address, error) - sock_info = SocketInfo(sock, self, self.address) + sock_info = SocketInfo(sock, self, self.address, conn_id) if self.handshake: sock_info.ismaster(self.opts.metadata, None) + + if self.enabled_for_cmap: + listeners.publish_connection_ready(self.address, conn_id) + return sock_info @contextlib.contextmanager @@ -1004,11 +1096,17 @@ class Pool: - `all_credentials`: dict, maps auth source to MongoCredential. - `checkout` (optional): keep socket checked out. """ + listeners = self.opts.event_listeners + if self.enabled_for_cmap: + listeners.publish_connection_check_out_started(self.address) # First get a socket, then attempt authentication. Simplifies # semaphore management in the face of network errors during auth. sock_info = self._get_socket_no_auth() try: sock_info.check_auth(all_credentials) + if self.enabled_for_cmap: + listeners.publish_connection_checked_out( + self.address, sock_info.id) yield sock_info except: # Exception in caller. Decrement semaphore. @@ -1026,6 +1124,14 @@ class Pool: if self.pid != os.getpid(): self.reset() + if self.closed: + if self.enabled_for_cmap: + self.opts.event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.POOL_CLOSED) + raise _PoolClosedError( + 'Attempted to check out a connection from closed connection ' + 'pool') + # Get a free socket or create one. if not self._socket_semaphore.acquire( True, self.opts.wait_queue_timeout): @@ -1035,18 +1141,17 @@ class Pool: # We've now acquired the semaphore and must release it on error. try: - try: - # set.pop() isn't atomic in Jython less than 2.7, see - # http://bugs.jython.org/issue1854 - with self.lock: - # Can raise ConnectionFailure. - sock_info = self.sockets.popleft() - except IndexError: - # Can raise ConnectionFailure or CertificateError. - sock_info = self.connect() - else: - # Can raise ConnectionFailure. - sock_info = self._check(sock_info) + sock_info = None + while sock_info is None: + try: + with self.lock: + sock_info = self.sockets.popleft() + except IndexError: + # Can raise ConnectionFailure or CertificateError. + sock_info = self.connect() + else: + if self._perished(sock_info): + sock_info = None except Exception: self._socket_semaphore.release() with self.lock: @@ -1057,11 +1162,16 @@ class Pool: def return_socket(self, sock_info): """Return the socket to the pool, or if it's closed discard it.""" + listeners = self.opts.event_listeners + if self.enabled_for_cmap: + listeners.publish_connection_checked_in(self.address, sock_info.id) if self.pid != os.getpid(): self.reset() else: - if sock_info.pool_id != self.pool_id: - sock_info.close() + if self.closed: + sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED) + elif sock_info.pool_id != self.pool_id: + sock_info.close_socket(ConnectionClosedReason.STALE) elif not sock_info.closed: sock_info.update_last_checkin_time() with self.lock: @@ -1071,12 +1181,10 @@ class Pool: with self.lock: self.active_sockets -= 1 - def _check(self, sock_info): + def _perished(self, sock_info): """This side-effecty function checks if this socket has been idle for for longer than the max idle time, or if the socket has been closed by - some external network error, and if so, attempts to create a new - socket. If this connection attempt fails we raise the - ConnectionFailure. + some external network error. Checking sockets lets us avoid seeing *some* :class:`~pymongo.errors.AutoReconnect` exceptions on server @@ -1089,25 +1197,31 @@ class Pool: # If socket is idle, open a new one. if (self.opts.max_idle_time_seconds is not None and idle_time_seconds > self.opts.max_idle_time_seconds): - sock_info.close() - return self.connect() + sock_info.close_socket(ConnectionClosedReason.IDLE) + return True if (self._check_interval_seconds is not None and ( 0 == self._check_interval_seconds or idle_time_seconds > self._check_interval_seconds)): if self.socket_checker.socket_closed(sock_info.sock): - sock_info.close() - return self.connect() + sock_info.close_socket(ConnectionClosedReason.ERROR) + return True - return sock_info + return False def _raise_wait_queue_timeout(self): + listeners = self.opts.event_listeners + if self.enabled_for_cmap: + listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.TIMEOUT) raise ConnectionFailure( - 'Timed out waiting for socket from pool with max_size %r and' - ' wait_queue_timeout %r' % ( + 'Timed out while checking out a connection from connection pool ' + 'with max_size %r and wait_queue_timeout %r' % ( self.opts.max_pool_size, self.opts.wait_queue_timeout)) def __del__(self): # Avoid ResourceWarnings in Python 3 + # Close all sockets without calling reset() or close() because it is + # not safe to acquire a lock in __del__. for sock_info in self.sockets: - sock_info.close() + sock_info.close_socket(None) diff --git a/test/cmap/connection-must-have-id.json b/test/cmap/connection-must-have-id.json new file mode 100644 index 000000000..487a5979d --- /dev/null +++ b/test/cmap/connection-must-have-id.json @@ -0,0 +1,42 @@ +{ + "version": 1, + "style": "unit", + "description": "must have an ID number associated with it", + "operations": [ + { + "name": "checkOut" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionPoolClosed", + "ConnectionReady" + ] +} diff --git a/test/cmap/connection-must-order-ids.json b/test/cmap/connection-must-order-ids.json new file mode 100644 index 000000000..dda515c1a --- /dev/null +++ b/test/cmap/connection-must-order-ids.json @@ -0,0 +1,42 @@ +{ + "version": 1, + "style": "unit", + "description": "must have IDs assigned in order of creation", + "operations": [ + { + "name": "checkOut" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 1 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 2 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionPoolClosed", + "ConnectionReady" + ] +} diff --git a/test/cmap/pool-checkin-destroy-closed.json b/test/cmap/pool-checkin-destroy-closed.json new file mode 100644 index 000000000..3b6f1d248 --- /dev/null +++ b/test/cmap/pool-checkin-destroy-closed.json @@ -0,0 +1,43 @@ +{ + "version": 1, + "style": "unit", + "description": "must destroy checked in connection if pool has been closed", + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "close" + }, + { + "name": "checkIn", + "connection": "conn" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1 + }, + { + "type": "ConnectionClosed", + "connectionId": 1, + "reason": "poolClosed" + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkin-destroy-stale.json b/test/cmap/pool-checkin-destroy-stale.json new file mode 100644 index 000000000..7faa44d33 --- /dev/null +++ b/test/cmap/pool-checkin-destroy-stale.json @@ -0,0 +1,43 @@ +{ + "version": 1, + "style": "unit", + "description": "must destroy checked in connection if it is stale", + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "clear" + }, + { + "name": "checkIn", + "connection": "conn" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1 + }, + { + "type": "ConnectionClosed", + "connectionId": 1, + "reason": "stale" + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkin-make-available.json b/test/cmap/pool-checkin-make-available.json new file mode 100644 index 000000000..838194fe8 --- /dev/null +++ b/test/cmap/pool-checkin-make-available.json @@ -0,0 +1,38 @@ +{ + "version": 1, + "style": "unit", + "description": "must make valid checked in connection available", + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkin.json b/test/cmap/pool-checkin.json new file mode 100644 index 000000000..5e93c207a --- /dev/null +++ b/test/cmap/pool-checkin.json @@ -0,0 +1,29 @@ +{ + "version": 1, + "style": "unit", + "description": "must have a method of allowing the driver to check in a connection", + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + } + ], + "events": [ + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady", + "ConnectionClosed", + "ConnectionCheckOutStarted", + "ConnectionCheckedOut" + ] +} diff --git a/test/cmap/pool-checkout-connection.json b/test/cmap/pool-checkout-connection.json new file mode 100644 index 000000000..e6e108ce5 --- /dev/null +++ b/test/cmap/pool-checkout-connection.json @@ -0,0 +1,24 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to check out a connection", + "operations": [ + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionReady" + ] +} diff --git a/test/cmap/pool-checkout-error-closed.json b/test/cmap/pool-checkout-error-closed.json new file mode 100644 index 000000000..78c1ea792 --- /dev/null +++ b/test/cmap/pool-checkout-error-closed.json @@ -0,0 +1,55 @@ +{ + "version": 1, + "style": "unit", + "description": "must throw error if checkOut is called on a closed pool", + "operations": [ + { + "name": "checkOut", + "label": "conn1" + }, + { + "name": "checkIn", + "connection": "conn1" + }, + { + "name": "close" + }, + { + "name": "checkOut" + } + ], + "error": { + "type": "PoolClosedError", + "message": "Attempted to check out a connection from closed connection pool" + }, + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + }, + { + "type": "ConnectionCheckOutFailed", + "address": 42, + "reason": "poolClosed" + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady", + "ConnectionClosed", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkout-multiple.json b/test/cmap/pool-checkout-multiple.json new file mode 100644 index 000000000..f3ecdb9be --- /dev/null +++ b/test/cmap/pool-checkout-multiple.json @@ -0,0 +1,63 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to check out multiple connections at the same time", + "operations": [ + { + "name": "start", + "target": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "checkOut", + "thread": "thread3" + }, + { + "name": "waitForThread", + "target": "thread1" + }, + { + "name": "waitForThread", + "target": "thread2" + }, + { + "name": "waitForThread", + "target": "thread3" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady", + "ConnectionPoolCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkout-no-idle.json b/test/cmap/pool-checkout-no-idle.json new file mode 100644 index 000000000..77ce40dea --- /dev/null +++ b/test/cmap/pool-checkout-no-idle.json @@ -0,0 +1,54 @@ +{ + "version": 1, + "style": "unit", + "description": "must destroy and must not check out an idle connection if found while iterating available connections", + "poolOptions": { + "maxIdleTimeMS": 10 + }, + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "wait", + "ms": 50 + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1 + }, + { + "type": "ConnectionClosed", + "connectionId": 1, + "reason": "idle" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2 + } + ], + "ignore": [ + "ConnectionReady", + "ConnectionCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-checkout-no-stale.json b/test/cmap/pool-checkout-no-stale.json new file mode 100644 index 000000000..e5ebedfbe --- /dev/null +++ b/test/cmap/pool-checkout-no-stale.json @@ -0,0 +1,54 @@ +{ + "version": 1, + "style": "unit", + "description": "must destroy and must not check out a stale connection if found while iterating available connections", + "operations": [ + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "clear" + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 1 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 1 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionClosed", + "connectionId": 1, + "reason": "stale" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2 + } + ], + "ignore": [ + "ConnectionReady", + "ConnectionCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-close-destroy-conns.json b/test/cmap/pool-close-destroy-conns.json new file mode 100644 index 000000000..2bc50419b --- /dev/null +++ b/test/cmap/pool-close-destroy-conns.json @@ -0,0 +1,46 @@ +{ + "version": 1, + "style": "unit", + "description": "When a pool is closed, it MUST first destroy all available connections in that pool", + "operations": [ + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkOut" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionCheckedIn", + "connectionId": 2 + }, + { + "type": "ConnectionClosed", + "connectionId": 2, + "reason": "poolClosed" + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady", + "ConnectionPoolCreated", + "ConnectionCheckOutStarted", + "ConnectionCheckedOut" + ] +} diff --git a/test/cmap/pool-close.json b/test/cmap/pool-close.json new file mode 100644 index 000000000..fe083d73e --- /dev/null +++ b/test/cmap/pool-close.json @@ -0,0 +1,21 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to manually close a pool", + "operations": [ + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ] +} diff --git a/test/cmap/pool-create-max-size.json b/test/cmap/pool-create-max-size.json new file mode 100644 index 000000000..2ba7bdf62 --- /dev/null +++ b/test/cmap/pool-create-max-size.json @@ -0,0 +1,114 @@ +{ + "version": 1, + "style": "unit", + "description": "must never exceed maxPoolSize total connections", + "poolOptions": { + "maxPoolSize": 3 + }, + "operations": [ + { + "name": "checkOut", + "label": "conn1" + }, + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn2" + }, + { + "name": "checkIn", + "connection": "conn2" + }, + { + "name": "checkOut" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 5 + }, + { + "name": "checkIn", + "connection": "conn1" + }, + { + "name": "waitForThread", + "target": "thread1" + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionReady" + ] +} diff --git a/test/cmap/pool-create-min-size.json b/test/cmap/pool-create-min-size.json new file mode 100644 index 000000000..470988043 --- /dev/null +++ b/test/cmap/pool-create-min-size.json @@ -0,0 +1,46 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to start a pool with minPoolSize connections", + "poolOptions": { + "minPoolSize": 3 + }, + "operations": [ + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 3 + }, + { + "name": "checkOut" + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCreated", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionReady", + "ConnectionClosed", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/cmap/pool-create-with-options.json b/test/cmap/pool-create-with-options.json new file mode 100644 index 000000000..4e8223f91 --- /dev/null +++ b/test/cmap/pool-create-with-options.json @@ -0,0 +1,32 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to start a pool with various options set", + "poolOptions": { + "maxPoolSize": 50, + "minPoolSize": 5, + "maxIdleTimeMS": 100 + }, + "operations": [ + { + "name": "waitForEvent", + "event": "ConnectionPoolCreated", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": { + "maxPoolSize": 50, + "minPoolSize": 5, + "maxIdleTimeMS": 100 + } + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady" + ] +} diff --git a/test/cmap/pool-create.json b/test/cmap/pool-create.json new file mode 100644 index 000000000..8c1f85537 --- /dev/null +++ b/test/cmap/pool-create.json @@ -0,0 +1,19 @@ +{ + "version": 1, + "style": "unit", + "description": "must be able to create a pool", + "operations": [ + { + "name": "waitForEvent", + "event": "ConnectionPoolCreated", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionPoolCreated", + "address": 42, + "options": 42 + } + ] +} diff --git a/test/cmap/wait-queue-fairness.json b/test/cmap/wait-queue-fairness.json new file mode 100644 index 000000000..36c8a6dc1 --- /dev/null +++ b/test/cmap/wait-queue-fairness.json @@ -0,0 +1,162 @@ +{ + "version": 1, + "style": "unit", + "description": "must issue Connections to threads in the order that the threads entered the queue", + "poolOptions": { + "maxPoolSize": 1, + "waitQueueTimeoutMS": 1000 + }, + "operations": [ + { + "name": "checkOut", + "label": "conn0" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1", + "label": "conn1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 2 + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "checkOut", + "thread": "thread2", + "label": "conn2" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 3 + }, + { + "name": "start", + "target": "thread3" + }, + { + "name": "checkOut", + "thread": "thread3", + "label": "conn3" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 4 + }, + { + "name": "start", + "target": "thread4" + }, + { + "name": "checkOut", + "thread": "thread4", + "label": "conn4" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutStarted", + "count": 5 + }, + { + "name": "checkIn", + "connection": "conn0" + }, + { + "name": "waitForThread", + "target": "thread1" + }, + { + "name": "checkIn", + "connection": "conn1" + }, + { + "name": "waitForThread", + "target": "thread2" + }, + { + "name": "checkIn", + "connection": "conn2" + }, + { + "name": "waitForThread", + "target": "thread3" + }, + { + "name": "checkIn", + "connection": "conn3" + }, + { + "name": "waitForThread", + "target": "thread4" + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady", + "ConnectionClosed", + "ConnectionPoolCreated" + ] +} diff --git a/test/cmap/wait-queue-timeout.json b/test/cmap/wait-queue-timeout.json new file mode 100644 index 000000000..90ec2f62d --- /dev/null +++ b/test/cmap/wait-queue-timeout.json @@ -0,0 +1,66 @@ +{ + "version": 1, + "style": "unit", + "description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS", + "poolOptions": { + "maxPoolSize": 1, + "waitQueueTimeoutMS": 20 + }, + "operations": [ + { + "name": "checkOut", + "label": "conn0" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + }, + { + "name": "checkIn", + "connection": "conn0" + }, + { + "name": "waitForThread", + "target": "thread1" + } + ], + "error": { + "type": "WaitQueueTimeoutError", + "message": "Timed out while checking out a connection from connection pool" + }, + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 42 + }, + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCheckOutFailed", + "reason": "timeout" + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionReady", + "ConnectionClosed", + "ConnectionPoolCreated" + ] +} diff --git a/test/test_client.py b/test/test_client.py index 5bb57464f..b7f445f7a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -453,7 +453,7 @@ class TestClient(IntegrationTest): # Assert that if a socket is closed, a new one takes its place with server._pool.get_socket({}) as sock_info: - sock_info.close() + sock_info.close_socket(None) wait_until(lambda: 10 == len(server._pool.sockets), "a closed socket gets replaced from the pool") self.assertFalse(sock_info in server._pool.sockets) diff --git a/test/test_cmap.py b/test/test_cmap.py new file mode 100644 index 000000000..a58681e1e --- /dev/null +++ b/test/test_cmap.py @@ -0,0 +1,404 @@ +# Copyright 2019-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Execute Transactions Spec tests.""" + +import os +import sys +import time +import threading + +sys.path[0:0] = [""] + +from pymongo.errors import (ConnectionFailure, + PyMongoError) +from pymongo.monitoring import (ConnectionPoolListener, + ConnectionCheckedInEvent, + ConnectionCheckedOutEvent, + ConnectionCheckOutFailedEvent, + ConnectionCheckOutFailedReason, + ConnectionCheckOutStartedEvent, + ConnectionClosedEvent, + ConnectionClosedReason, + ConnectionCreatedEvent, + ConnectionReadyEvent, + PoolCreatedEvent, + PoolClearedEvent, + PoolClosedEvent) +from pymongo.read_preferences import ReadPreference +from pymongo.pool import _PoolClosedError + +from test import (IntegrationTest, + unittest) +from test.utils import (camel_to_snake, + client_context, + get_pool, + get_pools, + rs_or_single_client, + single_client, + TestCreator, + wait_until) + + +OBJECT_TYPES = { + # Event types. + 'ConnectionCheckedIn': ConnectionCheckedInEvent, + 'ConnectionCheckedOut': ConnectionCheckedOutEvent, + 'ConnectionCheckOutFailed': ConnectionCheckOutFailedEvent, + 'ConnectionClosed': ConnectionClosedEvent, + 'ConnectionCreated': ConnectionCreatedEvent, + 'ConnectionReady': ConnectionReadyEvent, + 'ConnectionCheckOutStarted': ConnectionCheckOutStartedEvent, + 'ConnectionPoolCreated': PoolCreatedEvent, + 'ConnectionPoolCleared': PoolClearedEvent, + 'ConnectionPoolClosed': PoolClosedEvent, + # Error types. + 'PoolClosedError': _PoolClosedError, + 'WaitQueueTimeoutError': ConnectionFailure, +} + + +class CMAPListener(ConnectionPoolListener): + def __init__(self): + self.events = [] + + def add_event(self, event): + self.events.append(event) + + def event_count(self, event_type): + return len([event for event in self.events[:] + if isinstance(event, event_type)]) + + def connection_created(self, event): + self.add_event(event) + + def connection_ready(self, event): + self.add_event(event) + + def connection_closed(self, event): + self.add_event(event) + + def connection_check_out_started(self, event): + self.add_event(event) + + def connection_check_out_failed(self, event): + self.add_event(event) + + def connection_checked_out(self, event): + self.add_event(event) + + def connection_checked_in(self, event): + self.add_event(event) + + def pool_created(self, event): + self.add_event(event) + + def pool_cleared(self, event): + self.add_event(event) + + def pool_closed(self, event): + self.add_event(event) + + +class CMAPThread(threading.Thread): + def __init__(self, name): + super(CMAPThread, self).__init__() + self.name = name + self.exc = None + self.setDaemon(True) + self.cond = threading.Condition() + self.ops = [] + self.stopped = False + + def schedule(self, work): + self.ops.append(work) + with self.cond: + self.cond.notify() + + def stop(self): + self.stopped = True + with self.cond: + self.cond.notify() + + def run(self): + while not self.stopped or self.ops: + if not self. ops: + with self.cond: + self.cond.wait(10) + if self.ops: + try: + work = self.ops.pop(0) + work() + except Exception as exc: + self.exc = exc + self.stop() + + +class TestCMAP(IntegrationTest): + # Location of JSON test specifications. + TEST_PATH = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'cmap') + + # Test operations: + + def start(self, op): + """Run the 'start' thread operation.""" + target = op['target'] + thread = CMAPThread(target) + thread.start() + self.targets[target] = thread + + def wait(self, op): + """Run the 'wait' operation.""" + time.sleep(op['ms'] / 1000.0) + + def wait_for_thread(self, op): + """Run the 'waitForThread' operation.""" + target = op['target'] + thread = self.targets[target] + thread.stop() + thread.join() + if thread.exc: + raise thread.exc + + def wait_for_event(self, op): + """Run the 'waitForEvent' operation.""" + event = OBJECT_TYPES[op['event']] + count = op['count'] + wait_until(lambda: self.listener.event_count(event) >= count, + 'find %s %s event(s)' % (count, event)) + + def check_out(self, op): + """Run the 'checkOut' operation.""" + label = op['label'] + with self.pool.get_socket({}, checkout=True) as sock_info: + if label: + self.labels[label] = sock_info + else: + self.addCleanup(sock_info.close_socket, None) + + def check_in(self, op): + """Run the 'checkIn' operation.""" + label = op['connection'] + sock_info = self.labels[label] + self.pool.return_socket(sock_info) + + def clear(self, op): + """Run the 'clear' operation.""" + self.pool.reset() + + def close(self, op): + """Run the 'close' operation.""" + self.pool.close() + + def run_operation(self, op): + """Run a single operation in a test.""" + op_name = camel_to_snake(op['name']) + thread = op['thread'] + meth = getattr(self, op_name) + if thread: + self.targets[thread].schedule(lambda: meth(op)) + else: + meth(op) + + def run_operations(self, ops): + """Run a test's operations.""" + for op in ops: + self.run_operation(op) + + def check_object(self, actual, expected): + """Assert that the actual object matches the expected object.""" + self.assertEqual(type(actual), OBJECT_TYPES[expected['type']]) + for attr, expected_val in expected.items(): + if attr == 'type': + continue + c2s = camel_to_snake(attr) + actual_val = getattr(actual, c2s) + if expected_val == 42: + self.assertIsNotNone(actual_val) + else: + self.assertEqual(actual_val, expected_val) + + def check_event(self, actual, expected): + """Assert that the actual event matches the expected event.""" + self.check_object(actual, expected) + + def actual_events(self, ignore): + """Return all the non-ignored events.""" + ignore = tuple(OBJECT_TYPES[name] for name in ignore) + return [event for event in self.listener.events + if not isinstance(event, ignore)] + + def check_events(self, events, ignore): + """Check the events of a test.""" + actual_events = self.actual_events(ignore) + for actual, expected in zip(actual_events, events): + self.check_event(actual, expected) + + if len(events) > len(actual_events): + self.fail('missing events: %r' % (events[len(actual_events):],)) + elif len(events) < len(actual_events): + self.fail('extra events: %r' % (actual_events[len(events):],)) + + def check_error(self, actual, expected): + message = expected.pop('message') + self.check_object(actual, expected) + self.assertIn(message, str(actual)) + + def run_scenario(self, scenario_def, test): + """Run a CMAP spec test.""" + self.assertEqual(scenario_def['version'], 1) + self.assertEqual(scenario_def['style'], 'unit') + self.listener = CMAPListener() + + opts = test['poolOptions'].copy() + opts['event_listeners'] = [self.listener] + client = single_client(**opts) + self.addCleanup(client.close) + self.pool = get_pool(client) + + # Map of target names to Thread objects. + self.targets = dict() + # Map of label names to Connection objects + self.labels = dict() + + def cleanup(): + for t in self.targets.values(): + t.stop() + for t in self.targets.values(): + t.join(5) + for conn in self.labels.values(): + conn.close_socket(None) + + self.addCleanup(cleanup) + + if test['error']: + with self.assertRaises(PyMongoError) as ctx: + self.run_operations(test['operations']) + self.check_error(ctx.exception, test['error']) + else: + self.run_operations(test['operations']) + + self.check_events(test['events'], test['ignore']) + + POOL_OPTIONS = { + 'maxPoolSize': 50, + 'minPoolSize': 1, + 'maxIdleTimeMS': 10000, + 'waitQueueTimeoutMS': 10000 + } + + # + # Prose tests. Numbers correspond to the prose test number in the spec. + # + def test_1_client_connection_pool_options(self): + client = rs_or_single_client(**self.POOL_OPTIONS) + pool_opts = get_pool(client).opts + self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS) + + def test_2_all_client_pools_have_same_options(self): + client = rs_or_single_client(**self.POOL_OPTIONS) + client.admin.command('isMaster') + # Discover at least one secondary. + if client_context.has_secondaries: + client.admin.command( + 'isMaster', read_preference=ReadPreference.SECONDARY) + pools = get_pools(client) + pool_opts = pools[0].opts + + self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS) + for pool in pools[1:]: + self.assertEqual(pool.opts, pool_opts) + + def test_3_uri_connection_pool_options(self): + opts = '&'.join(['%s=%s' % (k, v) + for k, v in self.POOL_OPTIONS.items()]) + uri = 'mongodb://%s/?%s' % (client_context.pair, opts) + client = rs_or_single_client(uri, **self.credentials) + pool_opts = get_pool(client).opts + self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS) + + def test_4_subscribe_to_events(self): + listener = CMAPListener() + client = single_client(event_listeners=[listener]) + self.assertEqual(listener.event_count(PoolCreatedEvent), 1) + + # Creates a new connection. + client.admin.command('isMaster') + self.assertEqual( + listener.event_count(ConnectionCheckOutStartedEvent), 1) + self.assertEqual(listener.event_count(ConnectionCreatedEvent), 1) + self.assertEqual(listener.event_count(ConnectionReadyEvent), 1) + self.assertEqual(listener.event_count(ConnectionCheckedOutEvent), 1) + self.assertEqual(listener.event_count(ConnectionCheckedInEvent), 1) + + # Uses the existing connection. + client.admin.command('isMaster') + self.assertEqual( + listener.event_count(ConnectionCheckOutStartedEvent), 2) + self.assertEqual(listener.event_count(ConnectionCheckedOutEvent), 2) + self.assertEqual(listener.event_count(ConnectionCheckedInEvent), 2) + + client.close() + self.assertEqual(listener.event_count(PoolClearedEvent), 1) + self.assertEqual(listener.event_count(ConnectionClosedEvent), 1) + + # + # Extra non-spec tests + # + def assertRepr(self, obj): + new_obj = eval(repr(obj)) + self.assertEqual(type(new_obj), type(obj)) + self.assertEqual(repr(new_obj), repr(obj)) + + def test_events_repr(self): + host = ('localhost', 27017) + self.assertRepr(ConnectionCheckedInEvent(host, 1)) + self.assertRepr(ConnectionCheckedOutEvent(host, 1)) + self.assertRepr(ConnectionCheckOutFailedEvent( + host, ConnectionCheckOutFailedReason.POOL_CLOSED)) + self.assertRepr(ConnectionClosedEvent( + host, 1, ConnectionClosedReason.POOL_CLOSED)) + self.assertRepr(ConnectionCreatedEvent(host, 1)) + self.assertRepr(ConnectionReadyEvent(host, 1)) + self.assertRepr(ConnectionCheckOutStartedEvent(host)) + self.assertRepr(PoolCreatedEvent(host, {})) + self.assertRepr(PoolClearedEvent(host)) + self.assertRepr(PoolClosedEvent(host)) + + +def create_test(scenario_def, test, name): + def run_scenario(self): + self.run_scenario(scenario_def, test) + + return run_scenario + + +class CMAPTestCreator(TestCreator): + + def tests(self, scenario_def): + """Extract the tests from a spec file. + + CMAP tests do not have a 'tests' field. The whole file represents + a single test case. + """ + return [scenario_def] + + +test_creator = CMAPTestCreator(create_test, TestCMAP, TestCMAP.TEST_PATH) +test_creator.create_tests() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_collation.py b/test/test_collation.py index 25158856c..7cb4d8b5c 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -17,7 +17,6 @@ import functools import warnings -from pymongo import monitoring from pymongo.collation import ( Collation, CollationCaseFirst, CollationStrength, CollationAlternate, @@ -95,8 +94,6 @@ class TestCollation(unittest.TestCase): @client_context.require_connection def setUpClass(cls): cls.listener = EventListener() - cls.saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = rs_or_single_client(event_listeners=[cls.listener]) cls.db = cls.client.pymongo_test cls.collation = Collation('en_US') @@ -106,7 +103,6 @@ class TestCollation(unittest.TestCase): @classmethod def tearDownClass(cls): - monitoring._LISTENERS = cls.saved_listeners cls.warn_context.__exit__() cls.warn_context = None diff --git a/test/test_collection.py b/test/test_collection.py index 38a77eb84..8b9705064 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -37,7 +37,6 @@ from bson.py3compat import itervalues from bson.son import SON from pymongo import (ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, HASHED, TEXT) -from pymongo import monitoring from pymongo.bulk import BulkWriteError from pymongo.collection import Collection, ReturnDocument from pymongo.command_cursor import CommandCursor @@ -2225,8 +2224,6 @@ class TestCollection(IntegrationTest): def test_find_one_and_write_concern(self): listener = EventListener() - saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) db = single_client(event_listeners=[listener])[self.db.name] # non-default WriteConcern. c_w0 = db.get_collection( @@ -2237,89 +2234,86 @@ class TestCollection(IntegrationTest): # Authenticate the client and throw out auth commands from the listener. db.command('ismaster') results.clear() - try: - if client_context.version.at_least(3, 1, 9, -1): - c_w0.find_and_modify( + if client_context.version.at_least(3, 1, 9, -1): + c_w0.find_and_modify( + {'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() + + c_w0.find_one_and_update( + {'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() + + c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() + + c_w0.find_one_and_delete({'_id': 1}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() + + # Test write concern errors. + if client_context.is_rs: + c_wc_error = db.get_collection( + 'test', + write_concern=WriteConcern( + w=len(client_context.nodes) + 1)) + self.assertRaises( + WriteConcernError, + c_wc_error.find_and_modify, {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertEqual( + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_update, + {'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_replace, + {'w': 0}, results['started'][0].command['writeConcern']) + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_delete, {'w': 0}, results['started'][0].command['writeConcern']) results.clear() - - c_w0.find_one_and_update( - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() - - c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() - - c_w0.find_one_and_delete({'_id': 1}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() - - # Test write concern errors. - if client_context.is_rs: - c_wc_error = db.get_collection( - 'test', - write_concern=WriteConcern( - w=len(client_context.nodes) + 1)) - self.assertRaises( - WriteConcernError, - c_wc_error.find_and_modify, - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_update, - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_replace, - {'w': 0}, results['started'][0].command['writeConcern']) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_delete, - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() - else: - c_w0.find_and_modify( - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_w0.find_one_and_update( - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_w0.find_one_and_delete({'_id': 1}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_default.find_and_modify({'_id': 1}, {'$set': {'foo': 'bar'}}) + else: + c_w0.find_and_modify( + {'_id': 1}, {'$set': {'foo': 'bar'}}) self.assertNotIn('writeConcern', results['started'][0].command) results.clear() - c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}}) + c_w0.find_one_and_update( + {'_id': 1}, {'$set': {'foo': 'bar'}}) self.assertNotIn('writeConcern', results['started'][0].command) results.clear() - c_default.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) + c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) self.assertNotIn('writeConcern', results['started'][0].command) results.clear() - c_default.find_one_and_delete({'_id': 1}) + c_w0.find_one_and_delete({'_id': 1}) self.assertNotIn('writeConcern', results['started'][0].command) results.clear() - finally: - monitoring._LISTENERS = saved_listeners + + c_default.find_and_modify({'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertNotIn('writeConcern', results['started'][0].command) + results.clear() + + c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertNotIn('writeConcern', results['started'][0].command) + results.clear() + + c_default.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) + self.assertNotIn('writeConcern', results['started'][0].command) + results.clear() + + c_default.find_one_and_delete({'_id': 1}) + self.assertNotIn('writeConcern', results['started'][0].command) + results.clear() def test_find_with_nested(self): c = self.db.test diff --git a/test/test_command_monitoring_spec.py b/test/test_command_monitoring_spec.py index 192f07cc0..257988911 100644 --- a/test/test_command_monitoring_spec.py +++ b/test/test_command_monitoring_spec.py @@ -23,7 +23,6 @@ sys.path[0:0] = [""] import pymongo from bson import json_util -from pymongo import monitoring from pymongo.errors import OperationFailure from pymongo.write_concern import WriteConcern from test import unittest, client_context @@ -48,14 +47,8 @@ class TestAllScenarios(unittest.TestCase): @client_context.require_connection def setUpClass(cls): cls.listener = EventListener() - cls.saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = single_client(event_listeners=[cls.listener]) - @classmethod - def tearDownClass(cls): - monitoring._LISTENERS = cls.saved_listeners - def tearDown(self): self.listener.results.clear() diff --git a/test/test_cursor.py b/test/test_cursor.py index 81b234063..73caa9f88 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -29,8 +29,7 @@ from bson import decode_all from bson.code import Code from bson.py3compat import PY3 from bson.son import SON -from pymongo import (monitoring, - ASCENDING, +from pymongo import (ASCENDING, DESCENDING, ALL, OFF) @@ -42,9 +41,8 @@ from pymongo.errors import (ConfigurationError, OperationFailure) from pymongo.read_concern import ReadConcern from test import (client_context, - SkipTest, unittest, - IntegrationTest, Version) + IntegrationTest) from test.utils import (EventListener, ignore_deprecations, rs_or_single_client, @@ -227,91 +225,85 @@ class TestCursor(IntegrationTest): self.assertEqual(90, cursor._Cursor__max_await_time_ms) listener = WhiteListEventListener('find', 'getMore') - saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) coll = rs_or_single_client( event_listeners=[listener])[self.db.name].pymongo_test results = listener.results - try: - # Tailable_await defaults. - list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT)) - # find - self.assertFalse('maxTimeMS' in results['started'][0].command) - # getMore - self.assertFalse('maxTimeMS' in results['started'][1].command) - results.clear() + # Tailable_await defaults. + list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT)) + # find + self.assertFalse('maxTimeMS' in results['started'][0].command) + # getMore + self.assertFalse('maxTimeMS' in results['started'][1].command) + results.clear() - # Tailable_await with max_await_time_ms set. - list(coll.find( - cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertFalse('maxTimeMS' in results['started'][0].command) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertTrue('maxTimeMS' in results['started'][1].command) - self.assertEqual(99, results['started'][1].command['maxTimeMS']) - results.clear() + # Tailable_await with max_await_time_ms set. + list(coll.find( + cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertFalse('maxTimeMS' in results['started'][0].command) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertTrue('maxTimeMS' in results['started'][1].command) + self.assertEqual(99, results['started'][1].command['maxTimeMS']) + results.clear() - # Tailable_await with max_time_ms - list(coll.find( - cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertTrue('maxTimeMS' in results['started'][0].command) - self.assertEqual(99, results['started'][0].command['maxTimeMS']) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertFalse('maxTimeMS' in results['started'][1].command) - results.clear() + # Tailable_await with max_time_ms + list(coll.find( + cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertTrue('maxTimeMS' in results['started'][0].command) + self.assertEqual(99, results['started'][0].command['maxTimeMS']) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertFalse('maxTimeMS' in results['started'][1].command) + results.clear() - # Tailable_await with both max_time_ms and max_await_time_ms - list(coll.find( - cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms( - 99).max_await_time_ms(99)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertTrue('maxTimeMS' in results['started'][0].command) - self.assertEqual(99, results['started'][0].command['maxTimeMS']) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertTrue('maxTimeMS' in results['started'][1].command) - self.assertEqual(99, results['started'][1].command['maxTimeMS']) - results.clear() + # Tailable_await with both max_time_ms and max_await_time_ms + list(coll.find( + cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms( + 99).max_await_time_ms(99)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertTrue('maxTimeMS' in results['started'][0].command) + self.assertEqual(99, results['started'][0].command['maxTimeMS']) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertTrue('maxTimeMS' in results['started'][1].command) + self.assertEqual(99, results['started'][1].command['maxTimeMS']) + results.clear() - # Non tailable_await with max_await_time_ms - list(coll.find(batch_size=1).max_await_time_ms(99)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertFalse('maxTimeMS' in results['started'][0].command) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertFalse('maxTimeMS' in results['started'][1].command) - results.clear() + # Non tailable_await with max_await_time_ms + list(coll.find(batch_size=1).max_await_time_ms(99)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertFalse('maxTimeMS' in results['started'][0].command) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertFalse('maxTimeMS' in results['started'][1].command) + results.clear() - # Non tailable_await with max_time_ms - list(coll.find(batch_size=1).max_time_ms(99)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertTrue('maxTimeMS' in results['started'][0].command) - self.assertEqual(99, results['started'][0].command['maxTimeMS']) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertFalse('maxTimeMS' in results['started'][1].command) + # Non tailable_await with max_time_ms + list(coll.find(batch_size=1).max_time_ms(99)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertTrue('maxTimeMS' in results['started'][0].command) + self.assertEqual(99, results['started'][0].command['maxTimeMS']) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertFalse('maxTimeMS' in results['started'][1].command) - # Non tailable_await with both max_time_ms and max_await_time_ms - list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88)) - # find - self.assertEqual('find', results['started'][0].command_name) - self.assertTrue('maxTimeMS' in results['started'][0].command) - self.assertEqual(99, results['started'][0].command['maxTimeMS']) - # getMore - self.assertEqual('getMore', results['started'][1].command_name) - self.assertFalse('maxTimeMS' in results['started'][1].command) - - finally: - monitoring._LISTENERS = saved_listeners + # Non tailable_await with both max_time_ms and max_await_time_ms + list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88)) + # find + self.assertEqual('find', results['started'][0].command_name) + self.assertTrue('maxTimeMS' in results['started'][0].command) + self.assertEqual(99, results['started'][0].command['maxTimeMS']) + # getMore + self.assertEqual('getMore', results['started'][1].command_name) + self.assertFalse('maxTimeMS' in results['started'][1].command) @client_context.require_test_commands @client_context.require_no_mongos diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 2ae5d4bb1..9815ee308 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -53,10 +53,16 @@ class MockPool(object): self.pool_id = 0 self._lock = threading.Lock() - def reset(self): + def _reset(self): with self._lock: self.pool_id += 1 + def reset(self): + self._reset() + + def close(self): + self._reset() + class MockMonitor(object): def __init__(self, server_description, topology, pool, topology_settings): diff --git a/test/test_heartbeat_monitoring.py b/test/test_heartbeat_monitoring.py index 69d99aab5..cf1620649 100644 --- a/test/test_heartbeat_monitoring.py +++ b/test/test_heartbeat_monitoring.py @@ -19,7 +19,6 @@ import threading sys.path[0:0] = [""] -from pymongo import monitoring from pymongo.errors import ConnectionFailure from pymongo.ismaster import IsMaster from pymongo.monitor import Monitor @@ -51,23 +50,21 @@ class MockPool(object): def return_socket(self, _): pass - def reset(self): + def _reset(self): with self._lock: self.pool_id += 1 + def reset(self): + self._reset() + + def close(self): + self._reset() + def remove_stale_sockets(self): pass class TestHeartbeatMonitoring(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) - - @classmethod - def tearDownClass(cls): - monitoring._LISTENERS = cls.saved_listeners def create_mock_monitor(self, responses, uri, expected_results): listener = HeartbeatEventListener() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 48a44c452..2e16e1c9a 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -45,17 +45,10 @@ class TestCommandMonitoring(PyMongoTestCase): @client_context.require_connection def setUpClass(cls): cls.listener = EventListener() - cls.saved_listeners = monitoring._LISTENERS - # Don't use any global subscribers. - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = rs_or_single_client( event_listeners=[cls.listener], retryWrites=False) - @classmethod - def tearDownClass(cls): - monitoring._LISTENERS = cls.saved_listeners - def tearDown(self): self.listener.results.clear() diff --git a/test/test_pooling.py b/test/test_pooling.py index 4b0cc5d6e..f5945a43d 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -130,7 +130,7 @@ class SocketGetter(MongoThread): def __del__(self): if self.sock: - self.sock.close() + self.sock.close_socket(None) def run_cases(client, cases): @@ -222,7 +222,7 @@ class TestPooling(_TestPoolingBase): with cx_pool.get_socket({}) as sock_info: # Use SocketInfo's API to close the socket. - sock_info.close() + sock_info.close_socket(None) self.assertEqual(0, len(cx_pool.sockets)) @@ -260,6 +260,7 @@ class TestPooling(_TestPoolingBase): def test_socket_closed_thread_safe(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((client_context.host, client_context.port)) + self.addCleanup(s.close) socket_checker = SocketChecker() def check_socket(): @@ -290,6 +291,7 @@ class TestPooling(_TestPoolingBase): connect_timeout=1, wait_queue_timeout=1) cx_pool._check_interval_seconds = 0 # Always check. + self.addCleanup(cx_pool.close) with cx_pool.get_socket({}) as sock_info: # Simulate a closed socket without telling the SocketInfo it's @@ -307,12 +309,13 @@ class TestPooling(_TestPoolingBase): with cx_pool.get_socket({}, checkout=True) as sock_info: pass - sock_info.close() + sock_info.close_socket(None) def test_wait_queue_timeout(self): wait_queue_timeout = 2 # Seconds pool = self.create_pool( max_pool_size=1, wait_queue_timeout=wait_queue_timeout) + self.addCleanup(pool.close) with pool.get_socket({}) as sock_info: start = time.time() @@ -326,11 +329,11 @@ class TestPooling(_TestPoolingBase): "Waited %.2f seconds for a socket, expected %f" % ( duration, wait_queue_timeout)) - sock_info.close() def test_no_wait_queue_timeout(self): # Verify get_socket() with no wait_queue_timeout blocks forever. pool = self.create_pool(max_pool_size=1) + self.addCleanup(pool.close) # Reach max_size. with pool.get_socket({}) as s1: @@ -347,7 +350,6 @@ class TestPooling(_TestPoolingBase): self.assertEqual(t.state, 'sock') self.assertEqual(t.sock, s1) - s1.close() def test_wait_queue_multiple(self): wait_queue_multiple = 3 @@ -392,7 +394,7 @@ class TestPooling(_TestPoolingBase): self.assertEqual(t.state, 'get_socket') for socket_info in socks: - socket_info.close() + socket_info.close_socket(None) class TestPoolMaxSize(_TestPoolingBase): diff --git a/test/test_read_concern.py b/test/test_read_concern.py index 8a408b8d0..4d7fcaf94 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -15,7 +15,6 @@ """Test the read_concern module.""" from bson.son import SON -from pymongo import monitoring from pymongo.errors import ConfigurationError, OperationFailure from pymongo.read_concern import ReadConcern @@ -29,16 +28,9 @@ class TestReadConcern(PyMongoTestCase): @client_context.require_connection def setUpClass(cls): cls.listener = OvertCommandListener() - cls.saved_listeners = monitoring._LISTENERS - # Don't use any global subscribers. - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) cls.client = single_client(event_listeners=[cls.listener]) cls.db = cls.client.pymongo_test - @classmethod - def tearDownClass(cls): - monitoring._LISTENERS = cls.saved_listeners - def tearDown(self): self.db.coll.drop() self.listener.results.clear() diff --git a/test/test_sdam_monitoring_spec.py b/test/test_sdam_monitoring_spec.py index a84f2f7f7..cb5ebcd5e 100644 --- a/test/test_sdam_monitoring_spec.py +++ b/test/test_sdam_monitoring_spec.py @@ -172,12 +172,6 @@ class TestAllScenarios(unittest.TestCase): @client_context.require_connection def setUp(cls): cls.all_listener = ServerAndTopologyEventListener() - cls.saved_listeners = monitoring._LISTENERS - monitoring._LISTENERS = monitoring._Listeners([], [], [], []) - - @classmethod - def tearDown(cls): - monitoring._LISTENERS = cls.saved_listeners def create_test(scenario_def): diff --git a/test/test_topology.py b/test/test_topology.py index 11c726324..7852f6863 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -63,10 +63,16 @@ class MockPool(object): def return_socket(self, _): pass - def reset(self): + def _reset(self): with self._lock: self.pool_id += 1 + def reset(self): + self._reset() + + def close(self): + self._reset() + def remove_stale_sockets(self): pass diff --git a/test/utils.py b/test/utils.py index fc8042a4e..afe792178 100644 --- a/test/utils.py +++ b/test/utils.py @@ -265,6 +265,10 @@ class TestCreator(object): "runOn not satisfied", method) + def tests(self, scenario_def): + """Allow CMAP spec test to override the location of test.""" + return scenario_def['tests'] + def create_tests(self): for dirpath, _, filenames in os.walk(self.test_path): dirname = os.path.split(dirpath)[-1] @@ -277,7 +281,7 @@ class TestCreator(object): test_type = os.path.splitext(filename)[0] # Construct test from scenario. - for test_def in scenario_def['tests']: + for test_def in self.tests(scenario_def): test_name = 'test_%s_%s_%s' % ( dirname, test_type.replace("-", "_").replace('.', '_'), diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index d9c1a5a03..ea1cda782 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -50,6 +50,9 @@ class MockPool(object): def reset(self): pass + def close(self): + pass + def remove_stale_sockets(self): pass