PYTHON-1842 Implement Connection Monitoring and Pooling spec

This commit is contained in:
Shane Harvey 2019-05-22 15:06:11 -07:00
parent 876db9939e
commit f588412b67
42 changed files with 2365 additions and 271 deletions

View File

@ -17,6 +17,10 @@
.. autoclass:: TopologyListener
:members:
:inherited-members:
.. autoclass:: ConnectionPoolListener
:members:
:inherited-members:
.. autoclass:: CommandStartedEvent
:members:
:inherited-members:
@ -53,3 +57,43 @@
.. autoclass:: ServerHeartbeatFailedEvent
:members:
:inherited-members:
.. autoclass:: PoolCreatedEvent
:members:
:inherited-members:
.. autoclass:: PoolClearedEvent
:members:
:inherited-members:
.. autoclass:: PoolClosedEvent
:members:
:inherited-members:
.. autoclass:: ConnectionCreatedEvent
:members:
:inherited-members:
.. autoclass:: ConnectionReadyEvent
:members:
:inherited-members:
.. autoclass:: ConnectionClosedReason
:members:
.. autoclass:: ConnectionClosedEvent
:members:
:inherited-members:
.. autoclass:: ConnectionCheckOutStartedEvent
:members:
:inherited-members:
.. autoclass:: ConnectionCheckOutFailedReason
:members:
.. autoclass:: ConnectionCheckOutFailedEvent
:members:
:inherited-members:
.. autoclass:: ConnectionCheckedOutEvent
:members:
:inherited-members:
.. autoclass:: ConnectionCheckedInEvent
:members:
:inherited-members:

View File

@ -37,16 +37,18 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
time, with at-most-once semantics.
- Support for retryable reads and the ``retryReads`` URI option which is
enabled by default. See the :class:`~pymongo.mongo_client.MongoClient`
documentation for details.
documentation for details. Now that supported operations are retried
automatically and transparently, users should consider adjusting any custom
retry logic to prevent an application from inadvertently retrying for too
long.
- Support zstandard for wire protocol compression.
- Support for periodically polling DNS SRV records to update the mongos proxy
list without having to change client configuration.
- New method :meth:`pymongo.database.Database.aggregate` to support running
database level aggregations.
Now that supported operations are retried automatically and transparently,
users should consider adjusting any custom retry logic to prevent
an application from inadvertently retrying for too long.
- Support for publishing Connection Monitoring and Pooling events via the new
:class:`~pymongo.monitoring.ConnectionPoolListener` class. See
:mod:`~pymongo.monitoring` for an example.
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst

View File

@ -110,15 +110,15 @@ def _parse_pool_options(options):
"""Parse connection pool options."""
max_pool_size = options.get('maxpoolsize', common.MAX_POOL_SIZE)
min_pool_size = options.get('minpoolsize', common.MIN_POOL_SIZE)
default_idle_seconds = common.validate_timeout_or_none(
'maxidletimems', common.MAX_IDLE_TIME_MS)
max_idle_time_seconds = options.get('maxidletimems', default_idle_seconds)
max_idle_time_seconds = options.get(
'maxidletimems', common.MAX_IDLE_TIME_SEC)
if max_pool_size is not None and min_pool_size > max_pool_size:
raise ValueError("minPoolSize must be smaller or equal to maxPoolSize")
connect_timeout = options.get('connecttimeoutms', common.CONNECT_TIMEOUT)
socket_keepalive = options.get('socketkeepalive', True)
socket_timeout = options.get('sockettimeoutms')
wait_queue_timeout = options.get('waitqueuetimeoutms')
wait_queue_timeout = options.get(
'waitqueuetimeoutms', common.WAIT_QUEUE_TIMEOUT)
wait_queue_multiple = options.get('waitqueuemultiple')
event_listeners = options.get('event_listeners')
appname = options.get('appname')

View File

@ -88,6 +88,12 @@ MIN_POOL_SIZE = 0
# Default value for maxIdleTimeMS.
MAX_IDLE_TIME_MS = None
# Default value for maxIdleTimeMS in seconds.
MAX_IDLE_TIME_SEC = None
# Default value for waitQueueTimeoutMS in seconds.
WAIT_QUEUE_TIMEOUT = None
# Default value for localThresholdMS.
LOCAL_THRESHOLD_MS = 15

View File

@ -37,6 +37,7 @@ from pymongo.message import (_CursorAddress,
_RawBatchGetMore,
_Query,
_RawBatchQuery)
from pymongo.monitoring import ConnectionClosedReason
_QUERY_OPTIONS = {
@ -303,7 +304,8 @@ class Cursor(object):
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
self.__exhaust_mgr.sock.close()
self.__exhaust_mgr.sock.close_socket(
ConnectionClosedReason.ERROR)
else:
address = _CursorAddress(
self.__address, self.__collection.full_name)

View File

@ -234,7 +234,7 @@ class InvalidURI(ConfigurationError):
"""Raised when trying to parse an invalid mongodb URI."""
class ExceededMaxWaiters(Exception):
class ExceededMaxWaiters(PyMongoError):
"""Raised when a thread tries to get a connection from a pool and
``maxPoolSize * waitQueueMultiple`` threads are already waiting.

View File

@ -114,6 +114,48 @@ Server discovery and monitoring events are also available. For example::
logging.info("Topology with id {0.topology_id} "
"closed".format(event))
Connection monitoring and pooling events are also available. For example::
class ConnectionPoolLogger(ConnectionPoolListener):
def pool_created(self, event):
logging.info("[pool {0.address}] pool created".format(event))
def pool_cleared(self, event):
logging.info("[pool {0.address}] pool cleared".format(event))
def pool_closed(self, event):
logging.info("[pool {0.address}] pool closed".format(event))
def connection_created(self, event):
logging.info("[pool {0.address}][conn #{0.connection_id}] "
"connection created".format(event))
def connection_ready(self, event):
logging.info("[pool {0.address}][conn #{0.connection_id}] "
"connection setup succeeded".format(event))
def connection_closed(self, event):
logging.info("[pool {0.address}][conn #{0.connection_id}] "
"connection closed, reason: "
"{0.reason}".format(event))
def connection_check_out_started(self, event):
logging.info("[pool {0.address}] connection check out "
"started".format(event))
def connection_check_out_failed(self, event):
logging.info("[pool {0.address}] connection check out "
"failed, reason: {0.reason}".format(event))
def connection_checked_out(self, event):
logging.info("[pool {0.address}][conn #{0.connection_id}] "
"connection checked out of pool".format(event))
def connection_checked_in(self, event):
logging.info("[pool {0.address}][conn #{0.connection_id}] "
"connection checked into pool".format(event))
Event listeners can also be registered per instance of
:class:`~pymongo.mongo_client.MongoClient`::
@ -134,9 +176,6 @@ will not add that listener to existing client instances.
handler first.
"""
import sys
import traceback
from collections import namedtuple
from bson.py3compat import abc
@ -144,9 +183,10 @@ from pymongo.helpers import _handle_exception
_Listeners = namedtuple('Listeners',
('command_listeners', 'server_listeners',
'server_heartbeat_listeners', 'topology_listeners'))
'server_heartbeat_listeners', 'topology_listeners',
'cmap_listeners'))
_LISTENERS = _Listeners([], [], [], [])
_LISTENERS = _Listeners([], [], [], [], [])
class _EventListener(object):
@ -155,8 +195,10 @@ class _EventListener(object):
class CommandListener(_EventListener):
"""Abstract base class for command listeners.
Handles `CommandStartedEvent`, `CommandSucceededEvent`,
and `CommandFailedEvent`."""
and `CommandFailedEvent`.
"""
def started(self, event):
"""Abstract method to handle a `CommandStartedEvent`.
@ -183,8 +225,128 @@ class CommandListener(_EventListener):
raise NotImplementedError
class ConnectionPoolListener(_EventListener):
"""Abstract base class for connection pool listeners.
Handles all of the connection pool events defined in the Connection
Monitoring and Pooling Specification:
:class:`PoolCreatedEvent`, :class:`PoolClearedEvent`,
:class:`PoolClosedEvent`, :class:`ConnectionCreatedEvent`,
:class:`ConnectionReadyEvent`, :class:`ConnectionClosedEvent`,
:class:`ConnectionCheckOutStartedEvent`,
:class:`ConnectionCheckOutFailedEvent`,
:class:`ConnectionCheckedOutEvent`,
and :class:`ConnectionCheckedInEvent`.
.. versionadded:: 3.9
"""
def pool_created(self, event):
"""Abstract method to handle a :class:`PoolCreatedEvent`.
Emitted when a Connection Pool is created.
:Parameters:
- `event`: An instance of :class:`PoolCreatedEvent`.
"""
raise NotImplementedError
def pool_cleared(self, event):
"""Abstract method to handle a `PoolClearedEvent`.
Emitted when a Connection Pool is cleared.
:Parameters:
- `event`: An instance of :class:`PoolClearedEvent`.
"""
raise NotImplementedError
def pool_closed(self, event):
"""Abstract method to handle a `PoolClosedEvent`.
Emitted when a Connection Pool is closed.
:Parameters:
- `event`: An instance of :class:`PoolClosedEvent`.
"""
raise NotImplementedError
def connection_created(self, event):
"""Abstract method to handle a :class:`ConnectionCreatedEvent`.
Emitted when a Connection Pool creates a Connection object.
:Parameters:
- `event`: An instance of :class:`ConnectionCreatedEvent`.
"""
raise NotImplementedError
def connection_ready(self, event):
"""Abstract method to handle a :class:`ConnectionReadyEvent`.
Emitted when a Connection has finished its setup, and is now ready to
use.
:Parameters:
- `event`: An instance of :class:`ConnectionReadyEvent`.
"""
raise NotImplementedError
def connection_closed(self, event):
"""Abstract method to handle a :class:`ConnectionClosedEvent`.
Emitted when a Connection Pool closes a Connection.
:Parameters:
- `event`: An instance of :class:`ConnectionClosedEvent`.
"""
raise NotImplementedError
def connection_check_out_started(self, event):
"""Abstract method to handle a :class:`ConnectionCheckOutStartedEvent`.
Emitted when the driver starts attempting to check out a connection.
:Parameters:
- `event`: An instance of :class:`ConnectionCheckOutStartedEvent`.
"""
raise NotImplementedError
def connection_check_out_failed(self, event):
"""Abstract method to handle a :class:`ConnectionCheckOutFailedEvent`.
Emitted when the driver's attempt to check out a connection fails.
:Parameters:
- `event`: An instance of :class:`ConnectionCheckOutFailedEvent`.
"""
raise NotImplementedError
def connection_checked_out(self, event):
"""Abstract method to handle a :class:`ConnectionCheckedOutEvent`.
Emitted when the driver successfully checks out a Connection.
:Parameters:
- `event`: An instance of :class:`ConnectionCheckedOutEvent`.
"""
raise NotImplementedError
def connection_checked_in(self, event):
"""Abstract method to handle a :class:`ConnectionCheckedInEvent`.
Emitted when the driver checks in a Connection back to the Connection
Pool.
:Parameters:
- `event`: An instance of :class:`ConnectionCheckedInEvent`.
"""
raise NotImplementedError
class ServerHeartbeatListener(_EventListener):
"""Abstract base class for server heartbeat listeners.
Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`,
and `ServerHeartbeatFailedEvent`.
@ -295,7 +457,8 @@ def _validate_event_listeners(option, listeners):
if not isinstance(listener, _EventListener):
raise TypeError("Listeners for %s must be either a "
"CommandListener, ServerHeartbeatListener, "
"ServerListener, or TopologyListener." % (option,))
"ServerListener, TopologyListener, or "
"ConnectionPoolListener." % (option,))
return listeners
@ -304,13 +467,14 @@ def register(listener):
:Parameters:
- `listener`: A subclasses of :class:`CommandListener`,
:class:`ServerHeartbeatListener`, :class:`ServerListener`, or
:class:`TopologyListener`.
:class:`ServerHeartbeatListener`, :class:`ServerListener`,
:class:`TopologyListener`, or :class:`ConnectionPoolListener`.
"""
if not isinstance(listener, _EventListener):
raise TypeError("Listeners for %s must be either a "
"CommandListener, ServerHeartbeatListener, "
"ServerListener, or TopologyListener." % (listener,))
"ServerListener, TopologyListener, or "
"ConnectionPoolListener." % (listener,))
if isinstance(listener, CommandListener):
_LISTENERS.command_listeners.append(listener)
if isinstance(listener, ServerHeartbeatListener):
@ -319,7 +483,8 @@ def register(listener):
_LISTENERS.server_listeners.append(listener)
if isinstance(listener, TopologyListener):
_LISTENERS.topology_listeners.append(listener)
if isinstance(listener, ConnectionPoolListener):
_LISTENERS.cmap_listeners.append(listener)
# Note - to avoid bugs from forgetting which if these is all lowercase and
# which are camelCase, and at the same time avoid having to add a test for
@ -462,6 +627,283 @@ class CommandFailedEvent(_CommandEvent):
return self.__failure
class _PoolEvent(object):
"""Base class for pool events."""
__slots__ = ("__address",)
def __init__(self, address):
self.__address = address
@property
def address(self):
"""The address (host, port) pair of the server the pool is attempting
to connect to.
"""
return self.__address
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.__address)
class PoolCreatedEvent(_PoolEvent):
"""Published when a Connection Pool is created.
:Parameters:
- `address`: The address (host, port) pair of the server this Pool is
attempting to connect to.
.. versionadded:: 3.9
"""
__slots__ = ("__options",)
def __init__(self, address, options):
super(PoolCreatedEvent, self).__init__(address)
self.__options = options
@property
def options(self):
"""Any non-default pool options that were set on this Connection Pool.
"""
return self.__options
def __repr__(self):
return '%s(%r, %r)' % (
self.__class__.__name__, self.address, self.__options)
class PoolClearedEvent(_PoolEvent):
"""Published when a Connection Pool is cleared.
:Parameters:
- `address`: The address (host, port) pair of the server this Pool is
attempting to connect to.
.. versionadded:: 3.9
"""
__slots__ = ()
class PoolClosedEvent(_PoolEvent):
"""Published when a Connection Pool is closed.
:Parameters:
- `address`: The address (host, port) pair of the server this Pool is
attempting to connect to.
.. versionadded:: 3.9
"""
__slots__ = ()
class ConnectionClosedReason(object):
"""An enum that defines values for `reason` on a
:class:`ConnectionClosedEvent`.
.. versionadded:: 3.9
"""
STALE = 'stale'
"""The pool was cleared, making the connection no longer valid."""
IDLE = 'idle'
"""The connection became stale by being idle for too long (maxIdleTimeMS).
"""
ERROR = 'error'
"""The connection experienced an error, making it no longer valid."""
POOL_CLOSED = 'poolClosed'
"""The pool was closed, making the connection no longer valid."""
class ConnectionCheckOutFailedReason(object):
"""An enum that defines values for `reason` on a
:class:`ConnectionCheckOutFailedEvent`.
.. versionadded:: 3.9
"""
TIMEOUT = 'timeout'
"""The connection check out attempt exceeded the specified timeout."""
POOL_CLOSED = 'poolClosed'
"""The pool was previously closed, and cannot provide new connections."""
class _ConnectionEvent(object):
"""Private base class for some connection events."""
__slots__ = ("__address", "__connection_id")
def __init__(self, address, connection_id):
self.__address = address
self.__connection_id = connection_id
@property
def address(self):
"""The address (host, port) pair of the server this connection is
attempting to connect to.
"""
return self.__address
@property
def connection_id(self):
"""The ID of the Connection."""
return self.__connection_id
def __repr__(self):
return '%s(%r, %r)' % (
self.__class__.__name__, self.__address, self.__connection_id)
class ConnectionCreatedEvent(_ConnectionEvent):
"""Published when a Connection Pool creates a Connection object.
NOTE: This connection is not ready for use until the
:class:`ConnectionReadyEvent` is published.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `connection_id`: The integer ID of the Connection in this Pool.
.. versionadded:: 3.9
"""
__slots__ = ()
class ConnectionReadyEvent(_ConnectionEvent):
"""Published when a Connection has finished its setup, and is ready to use.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `connection_id`: The integer ID of the Connection in this Pool.
.. versionadded:: 3.9
"""
__slots__ = ()
class ConnectionClosedEvent(_ConnectionEvent):
"""Published when a Connection is closed.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `connection_id`: The integer ID of the Connection in this Pool.
- `reason`: A reason explaining why this connection was closed.
.. versionadded:: 3.9
"""
__slots__ = ("__reason",)
def __init__(self, address, connection_id, reason):
super(ConnectionClosedEvent, self).__init__(address, connection_id)
self.__reason = reason
@property
def reason(self):
"""A reason explaining why this connection was closed.
The reason must be one of the strings from the
:class:`ConnectionClosedReason` enum.
"""
return self.__reason
def __repr__(self):
return '%s(%r, %r, %r)' % (
self.__class__.__name__, self.address, self.connection_id,
self.__reason)
class ConnectionCheckOutStartedEvent(object):
"""Published when the driver starts attempting to check out a connection.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
.. versionadded:: 3.9
"""
__slots__ = ("__address",)
def __init__(self, address):
self.__address = address
@property
def address(self):
"""The address (host, port) pair of the server this connection is
attempting to connect to.
"""
return self.__address
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.__address)
class ConnectionCheckOutFailedEvent(object):
"""Published when the driver's attempt to check out a connection fails.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `reason`: A reason explaining why connection check out failed.
.. versionadded:: 3.9
"""
__slots__ = ("__address", "__reason")
def __init__(self, address, reason):
self.__address = address
self.__reason = reason
@property
def address(self):
"""The address (host, port) pair of the server this connection is
attempting to connect to.
"""
return self.__address
@property
def reason(self):
"""A reason explaining why connection check out failed.
The reason must be one of the strings from the
:class:`ConnectionCheckOutFailedReason` enum.
"""
return self.__reason
def __repr__(self):
return '%s(%r, %r)' % (
self.__class__.__name__, self.__address, self.__reason)
class ConnectionCheckedOutEvent(_ConnectionEvent):
"""Published when the driver successfully checks out a Connection.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `connection_id`: The integer ID of the Connection in this Pool.
.. versionadded:: 3.9
"""
__slots__ = ()
class ConnectionCheckedInEvent(_ConnectionEvent):
"""Published when the driver checks in a Connection into the Pool.
:Parameters:
- `address`: The address (host, port) pair of the server this
Connection is attempting to connect to.
- `connection_id`: The integer ID of the Connection in this Pool.
.. versionadded:: 3.9
"""
__slots__ = ()
class _ServerEvent(object):
"""Base class for server events."""
@ -473,7 +915,7 @@ class _ServerEvent(object):
@property
def server_address(self):
"""The address (host/port pair) of the server"""
"""The address (host, port) pair of the server"""
return self.__server_address
@property
@ -671,6 +1113,7 @@ class _EventListeners(object):
lst = _LISTENERS.server_heartbeat_listeners
self.__server_heartbeat_listeners = lst[:]
self.__topology_listeners = _LISTENERS.topology_listeners[:]
self.__cmap_listeners = _LISTENERS.cmap_listeners[:]
if listeners is not None:
for lst in listeners:
if isinstance(lst, CommandListener):
@ -681,11 +1124,14 @@ class _EventListeners(object):
self.__server_heartbeat_listeners.append(lst)
if isinstance(lst, TopologyListener):
self.__topology_listeners.append(lst)
if isinstance(lst, ConnectionPoolListener):
self.__cmap_listeners.append(lst)
self.__enabled_for_commands = bool(self.__command_listeners)
self.__enabled_for_server = bool(self.__server_listeners)
self.__enabled_for_server_heartbeat = bool(
self.__server_heartbeat_listeners)
self.__enabled_for_topology = bool(self.__topology_listeners)
self.__enabled_for_cmap = bool(self.__cmap_listeners)
@property
def enabled_for_commands(self):
@ -707,6 +1153,11 @@ class _EventListeners(object):
"""Are any TopologyListener instances registered?"""
return self.__enabled_for_topology
@property
def enabled_for_cmap(self):
"""Are any ConnectionPoolListener instances registered?"""
return self.__enabled_for_cmap
def event_listeners(self):
"""List of registered event listeners."""
return (self.__command_listeners[:],
@ -789,7 +1240,7 @@ class _EventListeners(object):
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
- `connection_id`: The address (host, port) pair of the connection.
"""
event = ServerHeartbeatStartedEvent(connection_id)
for subscriber in self.__server_heartbeat_listeners:
@ -804,7 +1255,7 @@ class _EventListeners(object):
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
- `connection_id`: The address (host, port) pair of the connection.
- `duration`: The execution time of the event in the highest possible
resolution for the platform.
- `reply`: The command reply.
@ -821,7 +1272,7 @@ class _EventListeners(object):
listeners.
:Parameters:
- `connection_id`: The address (host/port pair) of the connection.
- `connection_id`: The address (host, port) pair of the connection.
- `duration`: The execution time of the event in the highest possible
resolution for the platform.
- `reply`: The command reply.
@ -837,7 +1288,7 @@ class _EventListeners(object):
"""Publish a ServerOpeningEvent to all server listeners.
:Parameters:
- `server_address`: The address (host/port pair) of the server.
- `server_address`: The address (host, port) pair of the server.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
@ -852,7 +1303,7 @@ class _EventListeners(object):
"""Publish a ServerClosedEvent to all server listeners.
:Parameters:
- `server_address`: The address (host/port pair) of the server.
- `server_address`: The address (host, port) pair of the server.
- `topology_id`: A unique identifier for the topology this server
is a part of.
"""
@ -870,7 +1321,7 @@ class _EventListeners(object):
:Parameters:
- `previous_description`: The previous server description.
- `server_address`: The address (host/port pair) of the server.
- `server_address`: The address (host, port) pair of the server.
- `new_description`: The new server description.
- `topology_id`: A unique identifier for the topology this server
is a part of.
@ -929,3 +1380,109 @@ class _EventListeners(object):
subscriber.description_changed(event)
except Exception:
_handle_exception()
def publish_pool_created(self, address, options):
"""Publish a :class:`PoolCreatedEvent` to all pool listeners.
"""
event = PoolCreatedEvent(address, options)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_created(event)
except Exception:
_handle_exception()
def publish_pool_cleared(self, address):
"""Publish a :class:`PoolClearedEvent` to all pool listeners.
"""
event = PoolClearedEvent(address)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_cleared(event)
except Exception:
_handle_exception()
def publish_pool_closed(self, address):
"""Publish a :class:`PoolClosedEvent` to all pool listeners.
"""
event = PoolClosedEvent(address)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_closed(event)
except Exception:
_handle_exception()
def publish_connection_created(self, address, connection_id):
"""Publish a :class:`ConnectionCreatedEvent` to all connection
listeners.
"""
event = ConnectionCreatedEvent(address, connection_id)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_created(event)
except Exception:
_handle_exception()
def publish_connection_ready(self, address, connection_id):
"""Publish a :class:`ConnectionReadyEvent` to all connection listeners.
"""
event = ConnectionReadyEvent(address, connection_id)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_ready(event)
except Exception:
_handle_exception()
def publish_connection_closed(self, address, connection_id, reason):
"""Publish a :class:`ConnectionClosedEvent` to all connection
listeners.
"""
event = ConnectionClosedEvent(address, connection_id, reason)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_closed(event)
except Exception:
_handle_exception()
def publish_connection_check_out_started(self, address):
"""Publish a :class:`ConnectionCheckOutStartedEvent` to all connection
listeners.
"""
event = ConnectionCheckOutStartedEvent(address)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_check_out_started(event)
except Exception:
_handle_exception()
def publish_connection_check_out_failed(self, address, reason):
"""Publish a :class:`ConnectionCheckOutFailedEvent` to all connection
listeners.
"""
event = ConnectionCheckOutFailedEvent(address, reason)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_check_out_started(event)
except Exception:
_handle_exception()
def publish_connection_checked_out(self, address, connection_id):
"""Publish a :class:`ConnectionCheckedOutEvent` to all connection
listeners.
"""
event = ConnectionCheckedOutEvent(address, connection_id)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_checked_out(event)
except Exception:
_handle_exception()
def publish_connection_checked_in(self, address, connection_id):
"""Publish a :class:`ConnectionCheckedInEvent` to all connection
listeners.
"""
event = ConnectionCheckedInEvent(address, connection_id)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_checked_in(event)
except Exception:
_handle_exception()

View File

@ -43,10 +43,14 @@ from bson.son import SON
from pymongo import auth, helpers, thread_util, __version__
from pymongo.client_session import _validate_session_write_concern
from pymongo.common import (MAX_BSON_SIZE,
MAX_IDLE_TIME_SEC,
MAX_MESSAGE_SIZE,
MAX_POOL_SIZE,
MAX_WIRE_VERSION,
MAX_WRITE_BATCH_SIZE,
ORDERED_TYPES)
MIN_POOL_SIZE,
ORDERED_TYPES,
WAIT_QUEUE_TIMEOUT)
from pymongo.errors import (AutoReconnect,
ConnectionFailure,
ConfigurationError,
@ -54,9 +58,12 @@ from pymongo.errors import (AutoReconnect,
DocumentTooLarge,
NetworkTimeout,
NotMasterError,
OperationFailure)
OperationFailure,
PyMongoError)
from pymongo.ismaster import IsMaster
from pymongo.monotonic import time as _time
from pymongo.monitoring import (ConnectionCheckOutFailedReason,
ConnectionClosedReason)
from pymongo.network import (command,
receive_message,
SocketChecker)
@ -293,9 +300,10 @@ class PoolOptions(object):
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings')
def __init__(self, max_pool_size=100, min_pool_size=0,
max_idle_time_seconds=None, connect_timeout=None,
socket_timeout=None, wait_queue_timeout=None,
def __init__(self, max_pool_size=MAX_POOL_SIZE,
min_pool_size=MIN_POOL_SIZE,
max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None,
socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT,
wait_queue_multiple=None, ssl_context=None,
ssl_match_hostname=True, socket_keepalive=True,
event_listeners=None, appname=None, driver=None,
@ -338,6 +346,23 @@ class PoolOptions(object):
self.__metadata['platform'] = "%s|%s" % (
_METADATA['platform'], driver.platform)
@property
def non_default_options(self):
"""The non-default options this pool was created with.
Added for CMAP's :class:`PoolCreatedEvent`.
"""
opts = {}
if self.__max_pool_size != MAX_POOL_SIZE:
opts['maxPoolSize'] = self.__max_pool_size
if self.__min_pool_size != MIN_POOL_SIZE:
opts['minPoolSize'] = self.__min_pool_size
if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC:
opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000
if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT:
opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000
return opts
@property
def max_pool_size(self):
"""The maximum allowable number of concurrent connections to each
@ -449,10 +474,12 @@ class SocketInfo(object):
- `sock`: a raw socket object
- `pool`: a Pool instance
- `address`: the server's (host, port)
- `id`: the id of this socket in it's pool
"""
def __init__(self, sock, pool, address):
def __init__(self, sock, pool, address, id):
self.sock = sock
self.address = address
self.id = id
self.authset = set()
self.closed = False
self.last_checkin_time = _time()
@ -466,6 +493,7 @@ class SocketInfo(object):
self.is_mongos = False
self.op_msg_enabled = False
self.listeners = pool.opts.event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.compression_settings = pool.opts.compression_settings
self.compression_context = None
@ -709,7 +737,10 @@ class SocketInfo(object):
'Cannot use session after authenticating with different'
' credentials')
def close(self):
def close_socket(self, reason):
"""Close this connection with a reason."""
if self.closed:
return
self.closed = True
# Avoid exceptions on interpreter shutdown.
try:
@ -717,6 +748,10 @@ class SocketInfo(object):
except Exception:
pass
if reason and self.enabled_for_cmap:
self.listeners.publish_connection_closed(
self.address, self.id, reason)
def send_cluster_time(self, command, session, client):
"""Add cluster time for MongoDB >= 3.6."""
if self.max_wire_version >= 6 and client:
@ -743,7 +778,7 @@ class SocketInfo(object):
# ...) is called in Python code, which experiences the signal as a
# KeyboardInterrupt from the start, rather than as an initial
# socket.error, so we catch that, close the socket, and reraise it.
self.close()
self.close_socket(ConnectionClosedReason.ERROR)
if isinstance(error, socket.error):
_raise_connection_failure(self.address, error)
else:
@ -886,6 +921,13 @@ def _configured_socket(address, options):
return sock
class _PoolClosedError(PyMongoError):
"""Internal error raised when a thread tries to get a connection from a
closed pool.
"""
pass
# Do *not* explicitly inherit from object or Jython won't call __del__
# http://bugs.jython.org/issue1057
class Pool:
@ -905,6 +947,9 @@ class Pool:
self.sockets = collections.deque()
self.lock = threading.Lock()
self.active_sockets = 0
# Monotonically increasing connection ID required for CMAP Events.
self.next_connection_id = 1
self.closed = False
# Keep track of resets, so we notice sockets created before the most
# recent reset and close them.
@ -913,6 +958,11 @@ class Pool:
self.address = address
self.opts = options
self.handshake = handshake
# Don't publish events in Monitor pools.
self.enabled_for_cmap = (
self.handshake and
self.opts.event_listeners is not None and
self.opts.event_listeners.enabled_for_cmap)
if (self.opts.wait_queue_multiple is None or
self.opts.max_pool_size is None):
@ -924,16 +974,41 @@ class Pool:
self._socket_semaphore = thread_util.create_semaphore(
self.opts.max_pool_size, max_waiters)
self.socket_checker = SocketChecker()
if self.enabled_for_cmap:
self.opts.event_listeners.publish_pool_created(
self.address, self.opts.non_default_options)
def reset(self):
def _reset(self, close):
with self.lock:
if self.closed:
return
self.pool_id += 1
self.pid = os.getpid()
sockets, self.sockets = self.sockets, collections.deque()
self.active_sockets = 0
if close:
self.closed = True
for sock_info in sockets:
sock_info.close()
listeners = self.opts.event_listeners
# CMAP spec says that close() MUST close sockets before publishing the
# PoolClosedEvent but that reset() SHOULD close sockets *after*
# publishing the PoolClearedEvent.
if close:
for sock_info in sockets:
sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
if self.enabled_for_cmap:
listeners.publish_pool_closed(self.address)
else:
if self.enabled_for_cmap:
listeners.publish_pool_cleared(self.address)
for sock_info in sockets:
sock_info.close_socket(ConnectionClosedReason.STALE)
def reset(self):
self._reset(close=False)
def close(self):
self._reset(close=True)
def remove_stale_sockets(self):
"""Removes stale sockets then adds new ones if pool is too small."""
@ -942,7 +1017,7 @@ class Pool:
while (self.sockets and
self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds):
sock_info = self.sockets.pop()
sock_info.close()
sock_info.close_socket(ConnectionClosedReason.IDLE)
while True:
with self.lock:
if (len(self.sockets) + self.active_sockets >=
@ -968,17 +1043,34 @@ class Pool:
Note that the pool does not keep a reference to the socket -- you
must call return_socket() when you're done with it.
"""
with self.lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
listeners = self.opts.event_listeners
if self.enabled_for_cmap:
listeners.publish_connection_created(self.address, conn_id)
sock = None
try:
sock = _configured_socket(self.address, self.opts)
except socket.error as error:
if sock is not None:
sock.close()
if self.enabled_for_cmap:
listeners.publish_connection_closed(
self.address, conn_id, ConnectionClosedReason.ERROR)
_raise_connection_failure(self.address, error)
sock_info = SocketInfo(sock, self, self.address)
sock_info = SocketInfo(sock, self, self.address, conn_id)
if self.handshake:
sock_info.ismaster(self.opts.metadata, None)
if self.enabled_for_cmap:
listeners.publish_connection_ready(self.address, conn_id)
return sock_info
@contextlib.contextmanager
@ -1004,11 +1096,17 @@ class Pool:
- `all_credentials`: dict, maps auth source to MongoCredential.
- `checkout` (optional): keep socket checked out.
"""
listeners = self.opts.event_listeners
if self.enabled_for_cmap:
listeners.publish_connection_check_out_started(self.address)
# First get a socket, then attempt authentication. Simplifies
# semaphore management in the face of network errors during auth.
sock_info = self._get_socket_no_auth()
try:
sock_info.check_auth(all_credentials)
if self.enabled_for_cmap:
listeners.publish_connection_checked_out(
self.address, sock_info.id)
yield sock_info
except:
# Exception in caller. Decrement semaphore.
@ -1026,6 +1124,14 @@ class Pool:
if self.pid != os.getpid():
self.reset()
if self.closed:
if self.enabled_for_cmap:
self.opts.event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED)
raise _PoolClosedError(
'Attempted to check out a connection from closed connection '
'pool')
# Get a free socket or create one.
if not self._socket_semaphore.acquire(
True, self.opts.wait_queue_timeout):
@ -1035,18 +1141,17 @@ class Pool:
# We've now acquired the semaphore and must release it on error.
try:
try:
# set.pop() isn't atomic in Jython less than 2.7, see
# http://bugs.jython.org/issue1854
with self.lock:
# Can raise ConnectionFailure.
sock_info = self.sockets.popleft()
except IndexError:
# Can raise ConnectionFailure or CertificateError.
sock_info = self.connect()
else:
# Can raise ConnectionFailure.
sock_info = self._check(sock_info)
sock_info = None
while sock_info is None:
try:
with self.lock:
sock_info = self.sockets.popleft()
except IndexError:
# Can raise ConnectionFailure or CertificateError.
sock_info = self.connect()
else:
if self._perished(sock_info):
sock_info = None
except Exception:
self._socket_semaphore.release()
with self.lock:
@ -1057,11 +1162,16 @@ class Pool:
def return_socket(self, sock_info):
"""Return the socket to the pool, or if it's closed discard it."""
listeners = self.opts.event_listeners
if self.enabled_for_cmap:
listeners.publish_connection_checked_in(self.address, sock_info.id)
if self.pid != os.getpid():
self.reset()
else:
if sock_info.pool_id != self.pool_id:
sock_info.close()
if self.closed:
sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
elif sock_info.pool_id != self.pool_id:
sock_info.close_socket(ConnectionClosedReason.STALE)
elif not sock_info.closed:
sock_info.update_last_checkin_time()
with self.lock:
@ -1071,12 +1181,10 @@ class Pool:
with self.lock:
self.active_sockets -= 1
def _check(self, sock_info):
def _perished(self, sock_info):
"""This side-effecty function checks if this socket has been idle for
for longer than the max idle time, or if the socket has been closed by
some external network error, and if so, attempts to create a new
socket. If this connection attempt fails we raise the
ConnectionFailure.
some external network error.
Checking sockets lets us avoid seeing *some*
:class:`~pymongo.errors.AutoReconnect` exceptions on server
@ -1089,25 +1197,31 @@ class Pool:
# If socket is idle, open a new one.
if (self.opts.max_idle_time_seconds is not None and
idle_time_seconds > self.opts.max_idle_time_seconds):
sock_info.close()
return self.connect()
sock_info.close_socket(ConnectionClosedReason.IDLE)
return True
if (self._check_interval_seconds is not None and (
0 == self._check_interval_seconds or
idle_time_seconds > self._check_interval_seconds)):
if self.socket_checker.socket_closed(sock_info.sock):
sock_info.close()
return self.connect()
sock_info.close_socket(ConnectionClosedReason.ERROR)
return True
return sock_info
return False
def _raise_wait_queue_timeout(self):
listeners = self.opts.event_listeners
if self.enabled_for_cmap:
listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.TIMEOUT)
raise ConnectionFailure(
'Timed out waiting for socket from pool with max_size %r and'
' wait_queue_timeout %r' % (
'Timed out while checking out a connection from connection pool '
'with max_size %r and wait_queue_timeout %r' % (
self.opts.max_pool_size, self.opts.wait_queue_timeout))
def __del__(self):
# Avoid ResourceWarnings in Python 3
# Close all sockets without calling reset() or close() because it is
# not safe to acquire a lock in __del__.
for sock_info in self.sockets:
sock_info.close()
sock_info.close_socket(None)

View File

@ -0,0 +1,42 @@
{
"version": 1,
"style": "unit",
"description": "must have an ID number associated with it",
"operations": [
{
"name": "checkOut"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolClosed",
"ConnectionReady"
]
}

View File

@ -0,0 +1,42 @@
{
"version": 1,
"style": "unit",
"description": "must have IDs assigned in order of creation",
"operations": [
{
"name": "checkOut"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 1
},
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 2
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolClosed",
"ConnectionReady"
]
}

View File

@ -0,0 +1,43 @@
{
"version": 1,
"style": "unit",
"description": "must destroy checked in connection if pool has been closed",
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "close"
},
{
"name": "checkIn",
"connection": "conn"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionPoolClosed",
"address": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 1
},
{
"type": "ConnectionClosed",
"connectionId": 1,
"reason": "poolClosed"
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,43 @@
{
"version": 1,
"style": "unit",
"description": "must destroy checked in connection if it is stale",
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear"
},
{
"name": "checkIn",
"connection": "conn"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 1
},
{
"type": "ConnectionClosed",
"connectionId": 1,
"reason": "stale"
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,38 @@
{
"version": 1,
"style": "unit",
"description": "must make valid checked in connection available",
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionCheckedIn",
"connectionId": 1
},
{
"type": "ConnectionCheckedOut",
"connectionId": 1
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,29 @@
{
"version": 1,
"style": "unit",
"description": "must have a method of allowing the driver to check in a connection",
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkIn",
"connection": "conn"
}
],
"events": [
{
"type": "ConnectionCheckedIn",
"connectionId": 42
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted",
"ConnectionCheckedOut"
]
}

View File

@ -0,0 +1,24 @@
{
"version": 1,
"style": "unit",
"description": "must be able to check out a connection",
"operations": [
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 1
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady"
]
}

View File

@ -0,0 +1,55 @@
{
"version": 1,
"style": "unit",
"description": "must throw error if checkOut is called on a closed pool",
"operations": [
{
"name": "checkOut",
"label": "conn1"
},
{
"name": "checkIn",
"connection": "conn1"
},
{
"name": "close"
},
{
"name": "checkOut"
}
],
"error": {
"type": "PoolClosedError",
"message": "Attempted to check out a connection from closed connection pool"
},
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"address": 42,
"reason": "poolClosed"
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,63 @@
{
"version": 1,
"style": "unit",
"description": "must be able to check out multiple connections at the same time",
"operations": [
{
"name": "start",
"target": "thread1"
},
{
"name": "start",
"target": "thread2"
},
{
"name": "start",
"target": "thread3"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "checkOut",
"thread": "thread2"
},
{
"name": "checkOut",
"thread": "thread3"
},
{
"name": "waitForThread",
"target": "thread1"
},
{
"name": "waitForThread",
"target": "thread2"
},
{
"name": "waitForThread",
"target": "thread3"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionPoolCreated",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,54 @@
{
"version": 1,
"style": "unit",
"description": "must destroy and must not check out an idle connection if found while iterating available connections",
"poolOptions": {
"maxIdleTimeMS": 10
},
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "wait",
"ms": 50
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionCheckedIn",
"connectionId": 1
},
{
"type": "ConnectionClosed",
"connectionId": 1,
"reason": "idle"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2
}
],
"ignore": [
"ConnectionReady",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,54 @@
{
"version": 1,
"style": "unit",
"description": "must destroy and must not check out a stale connection if found while iterating available connections",
"operations": [
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "clear"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 1
},
{
"type": "ConnectionCheckedIn",
"connectionId": 1
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionClosed",
"connectionId": 1,
"reason": "stale"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2
}
],
"ignore": [
"ConnectionReady",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,46 @@
{
"version": 1,
"style": "unit",
"description": "When a pool is closed, it MUST first destroy all available connections in that pool",
"operations": [
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkOut"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionCheckedIn",
"connectionId": 2
},
{
"type": "ConnectionClosed",
"connectionId": 2,
"reason": "poolClosed"
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionPoolCreated",
"ConnectionCheckOutStarted",
"ConnectionCheckedOut"
]
}

21
test/cmap/pool-close.json Normal file
View File

@ -0,0 +1,21 @@
{
"version": 1,
"style": "unit",
"description": "must be able to manually close a pool",
"operations": [
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
]
}

View File

@ -0,0 +1,114 @@
{
"version": 1,
"style": "unit",
"description": "must never exceed maxPoolSize total connections",
"poolOptions": {
"maxPoolSize": 3
},
"operations": [
{
"name": "checkOut",
"label": "conn1"
},
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn2"
},
{
"name": "checkIn",
"connection": "conn2"
},
{
"name": "checkOut"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 5
},
{
"name": "checkIn",
"connection": "conn1"
},
{
"name": "waitForThread",
"target": "thread1"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
}
],
"ignore": [
"ConnectionReady"
]
}

View File

@ -0,0 +1,46 @@
{
"version": 1,
"style": "unit",
"description": "must be able to start a pool with minPoolSize connections",
"poolOptions": {
"minPoolSize": 3
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 3
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
}
],
"ignore": [
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted"
]
}

View File

@ -0,0 +1,32 @@
{
"version": 1,
"style": "unit",
"description": "must be able to start a pool with various options set",
"poolOptions": {
"maxPoolSize": 50,
"minPoolSize": 5,
"maxIdleTimeMS": 100
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionPoolCreated",
"count": 1
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": {
"maxPoolSize": 50,
"minPoolSize": 5,
"maxIdleTimeMS": 100
}
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady"
]
}

View File

@ -0,0 +1,19 @@
{
"version": 1,
"style": "unit",
"description": "must be able to create a pool",
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionPoolCreated",
"count": 1
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42
}
]
}

View File

@ -0,0 +1,162 @@
{
"version": 1,
"style": "unit",
"description": "must issue Connections to threads in the order that the threads entered the queue",
"poolOptions": {
"maxPoolSize": 1,
"waitQueueTimeoutMS": 1000
},
"operations": [
{
"name": "checkOut",
"label": "conn0"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1",
"label": "conn1"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 2
},
{
"name": "start",
"target": "thread2"
},
{
"name": "checkOut",
"thread": "thread2",
"label": "conn2"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 3
},
{
"name": "start",
"target": "thread3"
},
{
"name": "checkOut",
"thread": "thread3",
"label": "conn3"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 4
},
{
"name": "start",
"target": "thread4"
},
{
"name": "checkOut",
"thread": "thread4",
"label": "conn4"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 5
},
{
"name": "checkIn",
"connection": "conn0"
},
{
"name": "waitForThread",
"target": "thread1"
},
{
"name": "checkIn",
"connection": "conn1"
},
{
"name": "waitForThread",
"target": "thread2"
},
{
"name": "checkIn",
"connection": "conn2"
},
{
"name": "waitForThread",
"target": "thread3"
},
{
"name": "checkIn",
"connection": "conn3"
},
{
"name": "waitForThread",
"target": "thread4"
}
],
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionPoolCreated"
]
}

View File

@ -0,0 +1,66 @@
{
"version": 1,
"style": "unit",
"description": "must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
"poolOptions": {
"maxPoolSize": 1,
"waitQueueTimeoutMS": 20
},
"operations": [
{
"name": "checkOut",
"label": "conn0"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 1
},
{
"name": "checkIn",
"connection": "conn0"
},
{
"name": "waitForThread",
"target": "thread1"
}
],
"error": {
"type": "WaitQueueTimeoutError",
"message": "Timed out while checking out a connection from connection pool"
},
"events": [
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42
},
{
"type": "ConnectionCheckOutStarted"
},
{
"type": "ConnectionCheckOutFailed",
"reason": "timeout"
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionPoolCreated"
]
}

View File

@ -453,7 +453,7 @@ class TestClient(IntegrationTest):
# Assert that if a socket is closed, a new one takes its place
with server._pool.get_socket({}) as sock_info:
sock_info.close()
sock_info.close_socket(None)
wait_until(lambda: 10 == len(server._pool.sockets),
"a closed socket gets replaced from the pool")
self.assertFalse(sock_info in server._pool.sockets)

404
test/test_cmap.py Normal file
View File

@ -0,0 +1,404 @@
# Copyright 2019-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Execute Transactions Spec tests."""
import os
import sys
import time
import threading
sys.path[0:0] = [""]
from pymongo.errors import (ConnectionFailure,
PyMongoError)
from pymongo.monitoring import (ConnectionPoolListener,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
ConnectionCheckOutStartedEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
ConnectionCreatedEvent,
ConnectionReadyEvent,
PoolCreatedEvent,
PoolClearedEvent,
PoolClosedEvent)
from pymongo.read_preferences import ReadPreference
from pymongo.pool import _PoolClosedError
from test import (IntegrationTest,
unittest)
from test.utils import (camel_to_snake,
client_context,
get_pool,
get_pools,
rs_or_single_client,
single_client,
TestCreator,
wait_until)
OBJECT_TYPES = {
# Event types.
'ConnectionCheckedIn': ConnectionCheckedInEvent,
'ConnectionCheckedOut': ConnectionCheckedOutEvent,
'ConnectionCheckOutFailed': ConnectionCheckOutFailedEvent,
'ConnectionClosed': ConnectionClosedEvent,
'ConnectionCreated': ConnectionCreatedEvent,
'ConnectionReady': ConnectionReadyEvent,
'ConnectionCheckOutStarted': ConnectionCheckOutStartedEvent,
'ConnectionPoolCreated': PoolCreatedEvent,
'ConnectionPoolCleared': PoolClearedEvent,
'ConnectionPoolClosed': PoolClosedEvent,
# Error types.
'PoolClosedError': _PoolClosedError,
'WaitQueueTimeoutError': ConnectionFailure,
}
class CMAPListener(ConnectionPoolListener):
def __init__(self):
self.events = []
def add_event(self, event):
self.events.append(event)
def event_count(self, event_type):
return len([event for event in self.events[:]
if isinstance(event, event_type)])
def connection_created(self, event):
self.add_event(event)
def connection_ready(self, event):
self.add_event(event)
def connection_closed(self, event):
self.add_event(event)
def connection_check_out_started(self, event):
self.add_event(event)
def connection_check_out_failed(self, event):
self.add_event(event)
def connection_checked_out(self, event):
self.add_event(event)
def connection_checked_in(self, event):
self.add_event(event)
def pool_created(self, event):
self.add_event(event)
def pool_cleared(self, event):
self.add_event(event)
def pool_closed(self, event):
self.add_event(event)
class CMAPThread(threading.Thread):
def __init__(self, name):
super(CMAPThread, self).__init__()
self.name = name
self.exc = None
self.setDaemon(True)
self.cond = threading.Condition()
self.ops = []
self.stopped = False
def schedule(self, work):
self.ops.append(work)
with self.cond:
self.cond.notify()
def stop(self):
self.stopped = True
with self.cond:
self.cond.notify()
def run(self):
while not self.stopped or self.ops:
if not self. ops:
with self.cond:
self.cond.wait(10)
if self.ops:
try:
work = self.ops.pop(0)
work()
except Exception as exc:
self.exc = exc
self.stop()
class TestCMAP(IntegrationTest):
# Location of JSON test specifications.
TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'cmap')
# Test operations:
def start(self, op):
"""Run the 'start' thread operation."""
target = op['target']
thread = CMAPThread(target)
thread.start()
self.targets[target] = thread
def wait(self, op):
"""Run the 'wait' operation."""
time.sleep(op['ms'] / 1000.0)
def wait_for_thread(self, op):
"""Run the 'waitForThread' operation."""
target = op['target']
thread = self.targets[target]
thread.stop()
thread.join()
if thread.exc:
raise thread.exc
def wait_for_event(self, op):
"""Run the 'waitForEvent' operation."""
event = OBJECT_TYPES[op['event']]
count = op['count']
wait_until(lambda: self.listener.event_count(event) >= count,
'find %s %s event(s)' % (count, event))
def check_out(self, op):
"""Run the 'checkOut' operation."""
label = op['label']
with self.pool.get_socket({}, checkout=True) as sock_info:
if label:
self.labels[label] = sock_info
else:
self.addCleanup(sock_info.close_socket, None)
def check_in(self, op):
"""Run the 'checkIn' operation."""
label = op['connection']
sock_info = self.labels[label]
self.pool.return_socket(sock_info)
def clear(self, op):
"""Run the 'clear' operation."""
self.pool.reset()
def close(self, op):
"""Run the 'close' operation."""
self.pool.close()
def run_operation(self, op):
"""Run a single operation in a test."""
op_name = camel_to_snake(op['name'])
thread = op['thread']
meth = getattr(self, op_name)
if thread:
self.targets[thread].schedule(lambda: meth(op))
else:
meth(op)
def run_operations(self, ops):
"""Run a test's operations."""
for op in ops:
self.run_operation(op)
def check_object(self, actual, expected):
"""Assert that the actual object matches the expected object."""
self.assertEqual(type(actual), OBJECT_TYPES[expected['type']])
for attr, expected_val in expected.items():
if attr == 'type':
continue
c2s = camel_to_snake(attr)
actual_val = getattr(actual, c2s)
if expected_val == 42:
self.assertIsNotNone(actual_val)
else:
self.assertEqual(actual_val, expected_val)
def check_event(self, actual, expected):
"""Assert that the actual event matches the expected event."""
self.check_object(actual, expected)
def actual_events(self, ignore):
"""Return all the non-ignored events."""
ignore = tuple(OBJECT_TYPES[name] for name in ignore)
return [event for event in self.listener.events
if not isinstance(event, ignore)]
def check_events(self, events, ignore):
"""Check the events of a test."""
actual_events = self.actual_events(ignore)
for actual, expected in zip(actual_events, events):
self.check_event(actual, expected)
if len(events) > len(actual_events):
self.fail('missing events: %r' % (events[len(actual_events):],))
elif len(events) < len(actual_events):
self.fail('extra events: %r' % (actual_events[len(events):],))
def check_error(self, actual, expected):
message = expected.pop('message')
self.check_object(actual, expected)
self.assertIn(message, str(actual))
def run_scenario(self, scenario_def, test):
"""Run a CMAP spec test."""
self.assertEqual(scenario_def['version'], 1)
self.assertEqual(scenario_def['style'], 'unit')
self.listener = CMAPListener()
opts = test['poolOptions'].copy()
opts['event_listeners'] = [self.listener]
client = single_client(**opts)
self.addCleanup(client.close)
self.pool = get_pool(client)
# Map of target names to Thread objects.
self.targets = dict()
# Map of label names to Connection objects
self.labels = dict()
def cleanup():
for t in self.targets.values():
t.stop()
for t in self.targets.values():
t.join(5)
for conn in self.labels.values():
conn.close_socket(None)
self.addCleanup(cleanup)
if test['error']:
with self.assertRaises(PyMongoError) as ctx:
self.run_operations(test['operations'])
self.check_error(ctx.exception, test['error'])
else:
self.run_operations(test['operations'])
self.check_events(test['events'], test['ignore'])
POOL_OPTIONS = {
'maxPoolSize': 50,
'minPoolSize': 1,
'maxIdleTimeMS': 10000,
'waitQueueTimeoutMS': 10000
}
#
# Prose tests. Numbers correspond to the prose test number in the spec.
#
def test_1_client_connection_pool_options(self):
client = rs_or_single_client(**self.POOL_OPTIONS)
pool_opts = get_pool(client).opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
def test_2_all_client_pools_have_same_options(self):
client = rs_or_single_client(**self.POOL_OPTIONS)
client.admin.command('isMaster')
# Discover at least one secondary.
if client_context.has_secondaries:
client.admin.command(
'isMaster', read_preference=ReadPreference.SECONDARY)
pools = get_pools(client)
pool_opts = pools[0].opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
for pool in pools[1:]:
self.assertEqual(pool.opts, pool_opts)
def test_3_uri_connection_pool_options(self):
opts = '&'.join(['%s=%s' % (k, v)
for k, v in self.POOL_OPTIONS.items()])
uri = 'mongodb://%s/?%s' % (client_context.pair, opts)
client = rs_or_single_client(uri, **self.credentials)
pool_opts = get_pool(client).opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
def test_4_subscribe_to_events(self):
listener = CMAPListener()
client = single_client(event_listeners=[listener])
self.assertEqual(listener.event_count(PoolCreatedEvent), 1)
# Creates a new connection.
client.admin.command('isMaster')
self.assertEqual(
listener.event_count(ConnectionCheckOutStartedEvent), 1)
self.assertEqual(listener.event_count(ConnectionCreatedEvent), 1)
self.assertEqual(listener.event_count(ConnectionReadyEvent), 1)
self.assertEqual(listener.event_count(ConnectionCheckedOutEvent), 1)
self.assertEqual(listener.event_count(ConnectionCheckedInEvent), 1)
# Uses the existing connection.
client.admin.command('isMaster')
self.assertEqual(
listener.event_count(ConnectionCheckOutStartedEvent), 2)
self.assertEqual(listener.event_count(ConnectionCheckedOutEvent), 2)
self.assertEqual(listener.event_count(ConnectionCheckedInEvent), 2)
client.close()
self.assertEqual(listener.event_count(PoolClearedEvent), 1)
self.assertEqual(listener.event_count(ConnectionClosedEvent), 1)
#
# Extra non-spec tests
#
def assertRepr(self, obj):
new_obj = eval(repr(obj))
self.assertEqual(type(new_obj), type(obj))
self.assertEqual(repr(new_obj), repr(obj))
def test_events_repr(self):
host = ('localhost', 27017)
self.assertRepr(ConnectionCheckedInEvent(host, 1))
self.assertRepr(ConnectionCheckedOutEvent(host, 1))
self.assertRepr(ConnectionCheckOutFailedEvent(
host, ConnectionCheckOutFailedReason.POOL_CLOSED))
self.assertRepr(ConnectionClosedEvent(
host, 1, ConnectionClosedReason.POOL_CLOSED))
self.assertRepr(ConnectionCreatedEvent(host, 1))
self.assertRepr(ConnectionReadyEvent(host, 1))
self.assertRepr(ConnectionCheckOutStartedEvent(host))
self.assertRepr(PoolCreatedEvent(host, {}))
self.assertRepr(PoolClearedEvent(host))
self.assertRepr(PoolClosedEvent(host))
def create_test(scenario_def, test, name):
def run_scenario(self):
self.run_scenario(scenario_def, test)
return run_scenario
class CMAPTestCreator(TestCreator):
def tests(self, scenario_def):
"""Extract the tests from a spec file.
CMAP tests do not have a 'tests' field. The whole file represents
a single test case.
"""
return [scenario_def]
test_creator = CMAPTestCreator(create_test, TestCMAP, TestCMAP.TEST_PATH)
test_creator.create_tests()
if __name__ == "__main__":
unittest.main()

View File

@ -17,7 +17,6 @@
import functools
import warnings
from pymongo import monitoring
from pymongo.collation import (
Collation,
CollationCaseFirst, CollationStrength, CollationAlternate,
@ -95,8 +94,6 @@ class TestCollation(unittest.TestCase):
@client_context.require_connection
def setUpClass(cls):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = rs_or_single_client(event_listeners=[cls.listener])
cls.db = cls.client.pymongo_test
cls.collation = Collation('en_US')
@ -106,7 +103,6 @@ class TestCollation(unittest.TestCase):
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
cls.warn_context.__exit__()
cls.warn_context = None

View File

@ -37,7 +37,6 @@ from bson.py3compat import itervalues
from bson.son import SON
from pymongo import (ASCENDING, DESCENDING, GEO2D,
GEOHAYSTACK, GEOSPHERE, HASHED, TEXT)
from pymongo import monitoring
from pymongo.bulk import BulkWriteError
from pymongo.collection import Collection, ReturnDocument
from pymongo.command_cursor import CommandCursor
@ -2225,8 +2224,6 @@ class TestCollection(IntegrationTest):
def test_find_one_and_write_concern(self):
listener = EventListener()
saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
db = single_client(event_listeners=[listener])[self.db.name]
# non-default WriteConcern.
c_w0 = db.get_collection(
@ -2237,89 +2234,86 @@ class TestCollection(IntegrationTest):
# Authenticate the client and throw out auth commands from the listener.
db.command('ismaster')
results.clear()
try:
if client_context.version.at_least(3, 1, 9, -1):
c_w0.find_and_modify(
if client_context.version.at_least(3, 1, 9, -1):
c_w0.find_and_modify(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
# Test write concern errors.
if client_context.is_rs:
c_wc_error = db.get_collection(
'test',
write_concern=WriteConcern(
w=len(client_context.nodes) + 1))
self.assertRaises(
WriteConcernError,
c_wc_error.find_and_modify,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_update,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_replace,
{'w': 0}, results['started'][0].command['writeConcern'])
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_delete,
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertEqual(
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
# Test write concern errors.
if client_context.is_rs:
c_wc_error = db.get_collection(
'test',
write_concern=WriteConcern(
w=len(client_context.nodes) + 1))
self.assertRaises(
WriteConcernError,
c_wc_error.find_and_modify,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_update,
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_replace,
{'w': 0}, results['started'][0].command['writeConcern'])
self.assertRaises(
WriteConcernError,
c_wc_error.find_one_and_delete,
{'w': 0}, results['started'][0].command['writeConcern'])
results.clear()
else:
c_w0.find_and_modify(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_w0.find_one_and_delete({'_id': 1})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_and_modify({'_id': 1}, {'$set': {'foo': 'bar'}})
else:
c_w0.find_and_modify(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}})
c_w0.find_one_and_update(
{'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_delete({'_id': 1})
c_w0.find_one_and_delete({'_id': 1})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
finally:
monitoring._LISTENERS = saved_listeners
c_default.find_and_modify({'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_replace({'_id': 1}, {'foo': 'bar'})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
c_default.find_one_and_delete({'_id': 1})
self.assertNotIn('writeConcern', results['started'][0].command)
results.clear()
def test_find_with_nested(self):
c = self.db.test

View File

@ -23,7 +23,6 @@ sys.path[0:0] = [""]
import pymongo
from bson import json_util
from pymongo import monitoring
from pymongo.errors import OperationFailure
from pymongo.write_concern import WriteConcern
from test import unittest, client_context
@ -48,14 +47,8 @@ class TestAllScenarios(unittest.TestCase):
@client_context.require_connection
def setUpClass(cls):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = single_client(event_listeners=[cls.listener])
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
def tearDown(self):
self.listener.results.clear()

View File

@ -29,8 +29,7 @@ from bson import decode_all
from bson.code import Code
from bson.py3compat import PY3
from bson.son import SON
from pymongo import (monitoring,
ASCENDING,
from pymongo import (ASCENDING,
DESCENDING,
ALL,
OFF)
@ -42,9 +41,8 @@ from pymongo.errors import (ConfigurationError,
OperationFailure)
from pymongo.read_concern import ReadConcern
from test import (client_context,
SkipTest,
unittest,
IntegrationTest, Version)
IntegrationTest)
from test.utils import (EventListener,
ignore_deprecations,
rs_or_single_client,
@ -227,91 +225,85 @@ class TestCursor(IntegrationTest):
self.assertEqual(90, cursor._Cursor__max_await_time_ms)
listener = WhiteListEventListener('find', 'getMore')
saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
coll = rs_or_single_client(
event_listeners=[listener])[self.db.name].pymongo_test
results = listener.results
try:
# Tailable_await defaults.
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT))
# find
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Tailable_await defaults.
list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT))
# find
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Tailable_await with max_await_time_ms set.
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertTrue('maxTimeMS' in results['started'][1].command)
self.assertEqual(99, results['started'][1].command['maxTimeMS'])
results.clear()
# Tailable_await with max_await_time_ms set.
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertTrue('maxTimeMS' in results['started'][1].command)
self.assertEqual(99, results['started'][1].command['maxTimeMS'])
results.clear()
# Tailable_await with max_time_ms
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Tailable_await with max_time_ms
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(
99).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertTrue('maxTimeMS' in results['started'][1].command)
self.assertEqual(99, results['started'][1].command['maxTimeMS'])
results.clear()
# Tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(
cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(
99).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertTrue('maxTimeMS' in results['started'][1].command)
self.assertEqual(99, results['started'][1].command['maxTimeMS'])
results.clear()
# Non tailable_await with max_await_time_ms
list(coll.find(batch_size=1).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Non tailable_await with max_await_time_ms
list(coll.find(batch_size=1).max_await_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertFalse('maxTimeMS' in results['started'][0].command)
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
results.clear()
# Non tailable_await with max_time_ms
list(coll.find(batch_size=1).max_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
# Non tailable_await with max_time_ms
list(coll.find(batch_size=1).max_time_ms(99))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
# Non tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
finally:
monitoring._LISTENERS = saved_listeners
# Non tailable_await with both max_time_ms and max_await_time_ms
list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88))
# find
self.assertEqual('find', results['started'][0].command_name)
self.assertTrue('maxTimeMS' in results['started'][0].command)
self.assertEqual(99, results['started'][0].command['maxTimeMS'])
# getMore
self.assertEqual('getMore', results['started'][1].command_name)
self.assertFalse('maxTimeMS' in results['started'][1].command)
@client_context.require_test_commands
@client_context.require_no_mongos

View File

@ -53,10 +53,16 @@ class MockPool(object):
self.pool_id = 0
self._lock = threading.Lock()
def reset(self):
def _reset(self):
with self._lock:
self.pool_id += 1
def reset(self):
self._reset()
def close(self):
self._reset()
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):

View File

@ -19,7 +19,6 @@ import threading
sys.path[0:0] = [""]
from pymongo import monitoring
from pymongo.errors import ConnectionFailure
from pymongo.ismaster import IsMaster
from pymongo.monitor import Monitor
@ -51,23 +50,21 @@ class MockPool(object):
def return_socket(self, _):
pass
def reset(self):
def _reset(self):
with self._lock:
self.pool_id += 1
def reset(self):
self._reset()
def close(self):
self._reset()
def remove_stale_sockets(self):
pass
class TestHeartbeatMonitoring(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
def create_mock_monitor(self, responses, uri, expected_results):
listener = HeartbeatEventListener()

View File

@ -45,17 +45,10 @@ class TestCommandMonitoring(PyMongoTestCase):
@client_context.require_connection
def setUpClass(cls):
cls.listener = EventListener()
cls.saved_listeners = monitoring._LISTENERS
# Don't use any global subscribers.
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = rs_or_single_client(
event_listeners=[cls.listener],
retryWrites=False)
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
def tearDown(self):
self.listener.results.clear()

View File

@ -130,7 +130,7 @@ class SocketGetter(MongoThread):
def __del__(self):
if self.sock:
self.sock.close()
self.sock.close_socket(None)
def run_cases(client, cases):
@ -222,7 +222,7 @@ class TestPooling(_TestPoolingBase):
with cx_pool.get_socket({}) as sock_info:
# Use SocketInfo's API to close the socket.
sock_info.close()
sock_info.close_socket(None)
self.assertEqual(0, len(cx_pool.sockets))
@ -260,6 +260,7 @@ class TestPooling(_TestPoolingBase):
def test_socket_closed_thread_safe(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((client_context.host, client_context.port))
self.addCleanup(s.close)
socket_checker = SocketChecker()
def check_socket():
@ -290,6 +291,7 @@ class TestPooling(_TestPoolingBase):
connect_timeout=1,
wait_queue_timeout=1)
cx_pool._check_interval_seconds = 0 # Always check.
self.addCleanup(cx_pool.close)
with cx_pool.get_socket({}) as sock_info:
# Simulate a closed socket without telling the SocketInfo it's
@ -307,12 +309,13 @@ class TestPooling(_TestPoolingBase):
with cx_pool.get_socket({}, checkout=True) as sock_info:
pass
sock_info.close()
sock_info.close_socket(None)
def test_wait_queue_timeout(self):
wait_queue_timeout = 2 # Seconds
pool = self.create_pool(
max_pool_size=1, wait_queue_timeout=wait_queue_timeout)
self.addCleanup(pool.close)
with pool.get_socket({}) as sock_info:
start = time.time()
@ -326,11 +329,11 @@ class TestPooling(_TestPoolingBase):
"Waited %.2f seconds for a socket, expected %f" % (
duration, wait_queue_timeout))
sock_info.close()
def test_no_wait_queue_timeout(self):
# Verify get_socket() with no wait_queue_timeout blocks forever.
pool = self.create_pool(max_pool_size=1)
self.addCleanup(pool.close)
# Reach max_size.
with pool.get_socket({}) as s1:
@ -347,7 +350,6 @@ class TestPooling(_TestPoolingBase):
self.assertEqual(t.state, 'sock')
self.assertEqual(t.sock, s1)
s1.close()
def test_wait_queue_multiple(self):
wait_queue_multiple = 3
@ -392,7 +394,7 @@ class TestPooling(_TestPoolingBase):
self.assertEqual(t.state, 'get_socket')
for socket_info in socks:
socket_info.close()
socket_info.close_socket(None)
class TestPoolMaxSize(_TestPoolingBase):

View File

@ -15,7 +15,6 @@
"""Test the read_concern module."""
from bson.son import SON
from pymongo import monitoring
from pymongo.errors import ConfigurationError, OperationFailure
from pymongo.read_concern import ReadConcern
@ -29,16 +28,9 @@ class TestReadConcern(PyMongoTestCase):
@client_context.require_connection
def setUpClass(cls):
cls.listener = OvertCommandListener()
cls.saved_listeners = monitoring._LISTENERS
# Don't use any global subscribers.
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
cls.client = single_client(event_listeners=[cls.listener])
cls.db = cls.client.pymongo_test
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
def tearDown(self):
self.db.coll.drop()
self.listener.results.clear()

View File

@ -172,12 +172,6 @@ class TestAllScenarios(unittest.TestCase):
@client_context.require_connection
def setUp(cls):
cls.all_listener = ServerAndTopologyEventListener()
cls.saved_listeners = monitoring._LISTENERS
monitoring._LISTENERS = monitoring._Listeners([], [], [], [])
@classmethod
def tearDown(cls):
monitoring._LISTENERS = cls.saved_listeners
def create_test(scenario_def):

View File

@ -63,10 +63,16 @@ class MockPool(object):
def return_socket(self, _):
pass
def reset(self):
def _reset(self):
with self._lock:
self.pool_id += 1
def reset(self):
self._reset()
def close(self):
self._reset()
def remove_stale_sockets(self):
pass

View File

@ -265,6 +265,10 @@ class TestCreator(object):
"runOn not satisfied",
method)
def tests(self, scenario_def):
"""Allow CMAP spec test to override the location of test."""
return scenario_def['tests']
def create_tests(self):
for dirpath, _, filenames in os.walk(self.test_path):
dirname = os.path.split(dirpath)[-1]
@ -277,7 +281,7 @@ class TestCreator(object):
test_type = os.path.splitext(filename)[0]
# Construct test from scenario.
for test_def in scenario_def['tests']:
for test_def in self.tests(scenario_def):
test_name = 'test_%s_%s_%s' % (
dirname,
test_type.replace("-", "_").replace('.', '_'),

View File

@ -50,6 +50,9 @@ class MockPool(object):
def reset(self):
pass
def close(self):
pass
def remove_stale_sockets(self):
pass