PYTHON-2462 Avoid connection storms: implement pool PAUSED state (#531)

Mark server unknown and clear the pool when background connections fail.
Eagerly evict threads from the wait queue when pool is paused. Evicted
threads will raise the following error:
AutoReconnect('localhost:27017: connection pool paused')
Introduce PoolClearedEvent and ConnectionPoolListener.pool_ready.

CMAP spec test changes:
- CMAP unit tests should not use real monitors
- Assert that CMAP threads complete all scheduled operations
This commit is contained in:
Shane Harvey 2021-01-06 15:15:37 -08:00 committed by GitHub
parent a9d668c3b9
commit 86b40c195d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1023 additions and 292 deletions

View File

@ -4,6 +4,11 @@ Changelog
Changes in Version 4.0
----------------------
Breaking Changes in 4.0
```````````````````````
- Removed :mod:`~pymongo.thread_util`.
Issues Resolved
...............

View File

@ -171,6 +171,9 @@ class ConnectionPoolLogger(monitoring.ConnectionPoolListener):
def pool_created(self, event):
logging.info("[pool {0.address}] pool created".format(event))
def pool_ready(self, event):
logging.info("[pool {0.address}] pool ready".format(event))
def pool_cleared(self, event):
logging.info("[pool {0.address}] pool cleared".format(event))

View File

@ -737,7 +737,7 @@ class MongoClient(common.BaseObject):
executor = periodic_executor.PeriodicExecutor(
interval=common.KILL_CURSOR_FREQUENCY,
min_interval=0.5,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
name="pymongo_kill_cursors_thread")

View File

@ -255,6 +255,18 @@ class ConnectionPoolListener(_EventListener):
"""
raise NotImplementedError
def pool_ready(self, event):
"""Abstract method to handle a :class:`PoolReadyEvent`.
Emitted when a Connection Pool is marked ready.
:Parameters:
- `event`: An instance of :class:`PoolReadyEvent`.
.. versionadded:: 4.0
"""
raise NotImplementedError
def pool_cleared(self, event):
"""Abstract method to handle a `PoolClearedEvent`.
@ -692,6 +704,18 @@ class PoolCreatedEvent(_PoolEvent):
self.__class__.__name__, self.address, self.__options)
class PoolReadyEvent(_PoolEvent):
"""Published when a Connection Pool is marked ready.
:Parameters:
- `address`: The address (host, port) pair of the server this Pool is
attempting to connect to.
.. versionadded:: 4.0
"""
__slots__ = ()
class PoolClearedEvent(_PoolEvent):
"""Published when a Connection Pool is cleared.
@ -1475,6 +1499,16 @@ class _EventListeners(object):
except Exception:
_handle_exception()
def publish_pool_ready(self, address):
"""Publish a :class:`PoolReadyEvent` to all pool listeners.
"""
event = PoolReadyEvent(address)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_ready(event)
except Exception:
_handle_exception()
def publish_pool_cleared(self, address):
"""Publish a :class:`PoolClearedEvent` to all pool listeners.
"""

View File

@ -30,7 +30,7 @@ from pymongo.ssl_support import (
from bson import DEFAULT_CODEC_OPTIONS
from bson.py3compat import imap, itervalues, _unicode, PY3
from bson.son import SON
from pymongo import auth, helpers, thread_util, __version__
from pymongo import auth, helpers, __version__
from pymongo.client_session import _validate_session_write_concern
from pymongo.common import (MAX_BSON_SIZE,
MAX_CONNECTING,
@ -46,6 +46,7 @@ from pymongo.errors import (AutoReconnect,
CertificateError,
ConnectionFailure,
ConfigurationError,
ExceededMaxWaiters,
InvalidOperation,
DocumentTooLarge,
NetworkTimeout,
@ -309,7 +310,8 @@ class PoolOptions(object):
'__wait_queue_timeout', '__wait_queue_multiple',
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings', '__max_connecting')
'__compression_settings', '__max_connecting',
'__pause_enabled')
def __init__(self, max_pool_size=MAX_POOL_SIZE,
min_pool_size=MIN_POOL_SIZE,
@ -318,7 +320,8 @@ class PoolOptions(object):
wait_queue_multiple=None, ssl_context=None,
ssl_match_hostname=True, socket_keepalive=True,
event_listeners=None, appname=None, driver=None,
compression_settings=None, max_connecting=MAX_CONNECTING):
compression_settings=None, max_connecting=MAX_CONNECTING,
pause_enabled=True):
self.__max_pool_size = max_pool_size
self.__min_pool_size = min_pool_size
@ -335,6 +338,7 @@ class PoolOptions(object):
self.__driver = driver
self.__compression_settings = compression_settings
self.__max_connecting = max_connecting
self.__pause_enabled = pause_enabled
self.__metadata = copy.deepcopy(_METADATA)
if appname:
self.__metadata['application'] = {'name': appname}
@ -406,6 +410,10 @@ class PoolOptions(object):
"""
return self.__max_connecting
@property
def pause_enabled(self):
return self.__pause_enabled
@property
def max_idle_time_seconds(self):
"""The maximum number of seconds that a connection can remain
@ -1058,6 +1066,12 @@ class _PoolClosedError(PyMongoError):
pass
class PoolState(object):
PAUSED = 1
READY = 2
CLOSED = 3
# Do *not* explicitly inherit from object or Jython won't call __del__
# http://bugs.jython.org/issue1057
class Pool:
@ -1068,6 +1082,10 @@ class Pool:
- `options`: a PoolOptions instance
- `handshake`: whether to call ismaster for each new SocketInfo
"""
if options.pause_enabled:
self.state = PoolState.PAUSED
else:
self.state = PoolState.READY
# Check a socket's health with socket_closed() every once in a while.
# Can override for testing: 0 to always check, None to never check.
self._check_interval_seconds = 1
@ -1079,7 +1097,6 @@ class Pool:
self.active_sockets = 0
# Monotonically increasing connection ID required for CMAP Events.
self.next_connection_id = 1
self.closed = False
# Track whether the sockets in this pool are writeable or not.
self.is_writable = None
@ -1098,13 +1115,23 @@ class Pool:
if (self.opts.wait_queue_multiple is None or
self.opts.max_pool_size is None):
max_waiters = None
max_waiters = float('inf')
else:
max_waiters = (
self.opts.max_pool_size * self.opts.wait_queue_multiple)
self._socket_semaphore = thread_util.create_semaphore(
self.opts.max_pool_size, max_waiters)
# The first portion of the wait queue.
# Enforces: maxPoolSize and waitQueueMultiple
# Also used for: clearing the wait queue
self.size_cond = threading.Condition(self.lock)
self.requests = 0
self.max_pool_size = self.opts.max_pool_size
if self.max_pool_size is None:
self.max_pool_size = float('inf')
self.waiters = 0
self.max_waiters = max_waiters
# The second portion of the wait queue.
# Enforces: maxConnecting
# Also used for: clearing the wait queue
self._max_connecting_cond = threading.Condition(self.lock)
self._max_connecting = self.opts.max_connecting
self._pending = 0
@ -1114,10 +1141,23 @@ class Pool:
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count = 0
def _reset(self, close):
with self.lock:
def ready(self):
old_state, self.state = self.state, PoolState.READY
if old_state != PoolState.READY:
if self.enabled_for_cmap:
self.opts.event_listeners.publish_pool_ready(self.address)
@property
def closed(self):
return self.state == PoolState.CLOSED
def _reset(self, close, pause=True):
old_state = self.state
with self.size_cond:
if self.closed:
return
if self.opts.pause_enabled and pause:
old_state, self.state = self.state, PoolState.PAUSED
self.generation += 1
newpid = os.getpid()
if self.pid != newpid:
@ -1126,7 +1166,10 @@ class Pool:
self.operation_count = 0
sockets, self.sockets = self.sockets, collections.deque()
if close:
self.closed = True
self.state = PoolState.CLOSED
# Clear the wait queue
self._max_connecting_cond.notify_all()
self.size_cond.notify_all()
listeners = self.opts.event_listeners
# CMAP spec says that close() MUST close sockets before publishing the
@ -1138,7 +1181,7 @@ class Pool:
if self.enabled_for_cmap:
listeners.publish_pool_closed(self.address)
else:
if self.enabled_for_cmap:
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
listeners.publish_pool_cleared(self.address)
for sock_info in sockets:
sock_info.close_socket(ConnectionClosedReason.STALE)
@ -1155,6 +1198,9 @@ class Pool:
def reset(self):
self._reset(close=False)
def reset_without_pause(self):
self._reset(close=False, pause=False)
def close(self):
self._reset(close=True)
@ -1164,6 +1210,9 @@ class Pool:
`generation` at the point in time this operation was requested on the
pool.
"""
if self.state != PoolState.READY:
return
if self.opts.max_idle_time_seconds is not None:
with self.lock:
while (self.sockets and
@ -1172,15 +1221,14 @@ class Pool:
sock_info.close_socket(ConnectionClosedReason.IDLE)
while True:
with self.lock:
with self.size_cond:
# There are enough sockets in the pool.
if (len(self.sockets) + self.active_sockets >=
self.opts.min_pool_size):
# There are enough sockets in the pool.
return
# We must acquire the semaphore to respect max_pool_size.
if not self._socket_semaphore.acquire(False):
return
if self.requests >= self.opts.min_pool_size:
return
self.requests += 1
incremented = False
try:
with self._max_connecting_cond:
@ -1204,7 +1252,10 @@ class Pool:
with self._max_connecting_cond:
self._pending -= 1
self._max_connecting_cond.notify()
self._socket_semaphore.release()
with self.size_cond:
self.requests -= 1
self.size_cond.notify()
def connect(self, all_credentials=None):
"""Connect to Mongo and return a new SocketInfo.
@ -1289,6 +1340,14 @@ class Pool:
if not checkout:
self.return_socket(sock_info)
def _raise_if_not_ready(self, emit_event):
if self.state != PoolState.READY:
if self.enabled_for_cmap and emit_event:
self.opts.event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
_raise_connection_failure(
self.address, AutoReconnect('connection pool paused'))
def _get_socket(self, all_credentials):
"""Get or create a SocketInfo. Can raise ConnectionFailure."""
# We use the pid here to avoid issues with fork / multiprocessing.
@ -1313,9 +1372,26 @@ class Pool:
deadline = _time() + self.opts.wait_queue_timeout
else:
deadline = None
if not self._socket_semaphore.acquire(
True, self.opts.wait_queue_timeout):
self._raise_wait_queue_timeout()
with self.size_cond:
self._raise_if_not_ready(emit_event=True)
if self.waiters >= self.max_waiters:
raise ExceededMaxWaiters(
'exceeded max waiters: %s threads already waiting' % (
self.waiters))
self.waiters += 1
try:
while not (self.requests < self.max_pool_size):
if not _cond_wait(self.size_cond, deadline):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.requests < self.max_pool_size:
self.size_cond.notify()
self._raise_wait_queue_timeout()
self._raise_if_not_ready(emit_event=True)
finally:
self.waiters -= 1
self.requests += 1
# We've now acquired the semaphore and must release it on error.
sock_info = None
@ -1330,6 +1406,7 @@ class Pool:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
with self._max_connecting_cond:
self._raise_if_not_ready(emit_event=False)
while not (self.sockets or
self._pending < self._max_connecting):
if not _cond_wait(self._max_connecting_cond, deadline):
@ -1340,6 +1417,7 @@ class Pool:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout()
self._raise_if_not_ready(emit_event=False)
try:
sock_info = self.sockets.popleft()
@ -1361,11 +1439,11 @@ class Pool:
if sock_info:
# We checked out a socket but authentication failed.
sock_info.close_socket(ConnectionClosedReason.ERROR)
self._socket_semaphore.release()
if incremented:
with self.lock:
with self.size_cond:
self.requests -= 1
if incremented:
self.active_sockets -= 1
self.size_cond.notify()
if self.enabled_for_cmap and not emitted_event:
self.opts.event_listeners.publish_connection_check_out_failed(
@ -1401,10 +1479,11 @@ class Pool:
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()
self._socket_semaphore.release()
with self.lock:
with self.size_cond:
self.requests -= 1
self.active_sockets -= 1
self.operation_count -= 1
self.size_cond.notify()
def _perished(self, sock_info):
"""Return True and close the connection if it is "perished".

View File

@ -61,7 +61,7 @@ class Server(object):
self._events.put((self._listener.publish_server_closed,
(self._description.address, self._topology_id)))
self._monitor.close()
self._pool.reset()
self._pool.reset_without_pause()
def request_check(self):
"""Check the server's state soon."""

View File

@ -1,129 +0,0 @@
# Copyright 2012-2015 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for multi-threading support."""
import threading
try:
from time import monotonic as _time
except ImportError:
from time import time as _time
from pymongo.monotonic import time as _time
from pymongo.errors import ExceededMaxWaiters
### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
class Semaphore:
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = threading.Condition(threading.Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value = self._value - 1
rc = True
return rc
__enter__ = acquire
def release(self):
with self._cond:
self._value = self._value + 1
self._cond.notify()
def __exit__(self, t, v, tb):
self.release()
@property
def counter(self):
return self._value
class BoundedSemaphore(Semaphore):
"""Semaphore that checks that # releases is <= # acquires"""
def __init__(self, value=1):
Semaphore.__init__(self, value)
self._initial_value = value
def release(self):
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
return Semaphore.release(self)
### End backport from CPython 3.2
class DummySemaphore(object):
def __init__(self, value=None):
pass
def acquire(self, blocking=True, timeout=None):
return True
def release(self):
pass
class MaxWaitersBoundedSemaphore(object):
def __init__(self, semaphore_class, value=1, max_waiters=1):
self.waiter_semaphore = semaphore_class(max_waiters)
self.semaphore = semaphore_class(value)
def acquire(self, blocking=True, timeout=None):
if not self.waiter_semaphore.acquire(False):
raise ExceededMaxWaiters()
try:
return self.semaphore.acquire(blocking, timeout)
finally:
self.waiter_semaphore.release()
def __getattr__(self, name):
return getattr(self.semaphore, name)
class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
def __init__(self, value=1, max_waiters=1):
MaxWaitersBoundedSemaphore.__init__(
self, BoundedSemaphore, value, max_waiters)
def create_semaphore(max_size, max_waiters):
if max_size is None:
return DummySemaphore()
else:
if max_waiters is None:
return BoundedSemaphore(max_size)
else:
return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)

View File

@ -39,6 +39,7 @@ from pymongo.errors import (ConnectionFailure,
NetworkTimeout,
NotMasterError,
OperationFailure,
PyMongoError,
ServerSelectionTimeoutError)
from pymongo.monitor import SrvMonitor
from pymongo.monotonic import time as _time
@ -282,6 +283,12 @@ class Topology(object):
# This is a stale isMaster response. Ignore it.
return
# CMAP: Ensure the pool is "ready" when the server is selectable.
if server_description.is_server_type_known:
server = self._servers.get(server_description.address)
if server:
server.pool.ready()
suppress_event = ((self._publish_server or self._publish_tp)
and sd_old == server_description)
if self._publish_server and not suppress_event:
@ -444,7 +451,13 @@ class Topology(object):
servers.append((server, server._pool.generation))
for server, generation in servers:
server._pool.remove_stale_sockets(generation, all_credentials)
pool = server._pool
try:
pool.remove_stale_sockets(generation, all_credentials)
except PyMongoError as exc:
ctx = _ErrorContext(exc, 0, generation, False)
self.handle_error(pool.address, ctx)
raise
def close(self):
"""Clear pools and terminate monitors. Topology reopens on demand."""
@ -686,7 +699,9 @@ class Topology(object):
ssl_match_hostname=options.ssl_match_hostname,
event_listeners=options.event_listeners,
appname=options.appname,
driver=options.driver)
driver=options.driver,
pause_enabled=False,
)
return self._settings.pool_class(address, monitor_pool_options,
handshake=False)

View File

@ -687,6 +687,21 @@ class ClientContext(object):
"Sessions not supported",
func=func)
def supports_retryable_writes(self):
if self.storage_engine == 'mmapv1':
return False
if not self.sessions_enabled:
return False
if self.version.at_least(3, 6):
return self.is_mongos or self.is_rs
return False
def require_retryable_writes(self, func):
"""Run a test only if the deployment supports retryable writes."""
return self._require(self.supports_retryable_writes,
"This server does not support retryable writes",
func=func)
def supports_transactions(self):
if self.storage_engine == 'mmapv1':
return False

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must have an ID number associated with it",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
@ -42,6 +45,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionPoolClosed",
"ConnectionReady"
]

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must have IDs assigned in order of creation",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
@ -42,6 +45,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionPoolClosed",
"ConnectionReady"
]

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must destroy checked in connection if pool has been closed",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -39,6 +42,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must destroy checked in connection if it is stale",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -39,6 +42,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must make valid checked in connection available",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -34,6 +37,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckOutStarted"

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must have a method of allowing the driver to check in a connection",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -21,6 +24,7 @@
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must be able to check out a connection",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
}
@ -29,6 +32,7 @@
}
],
"ignore": [
"ConnectionPoolReady",
"ConnectionPoolCreated"
]
}

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must throw error if checkOut is called on a closed pool",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn1"
@ -57,6 +60,7 @@
}
],
"ignore": [
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed"

View File

@ -26,6 +26,9 @@
"waitQueueTimeoutMS": 5000
},
"operations": [
{
"name": "ready"
},
{
"name": "start",
"target": "thread1"
@ -98,6 +101,7 @@
"ConnectionCheckedIn",
"ConnectionCheckedOut",
"ConnectionClosed",
"ConnectionPoolCreated"
"ConnectionPoolCreated",
"ConnectionPoolReady"
]
}

View File

@ -26,6 +26,9 @@
"waitQueueTimeoutMS": 50
},
"operations": [
{
"name": "ready"
},
{
"name": "start",
"target": "thread1"
@ -93,6 +96,7 @@
"ConnectionCheckedIn",
"ConnectionCheckedOut",
"ConnectionClosed",
"ConnectionPoolCreated"
"ConnectionPoolCreated",
"ConnectionPoolReady"
]
}

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must be able to check out multiple connections at the same time",
"operations": [
{
"name": "ready"
},
{
"name": "start",
"target": "thread1"
@ -59,6 +62,7 @@
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionPoolCreated",
"ConnectionCheckOutStarted"

View File

@ -6,6 +6,9 @@
"maxIdleTimeMS": 10
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -52,6 +55,7 @@
],
"ignore": [
"ConnectionReady",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "must destroy and must not check out a stale connection if found while iterating available connections",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn"
@ -14,6 +17,9 @@
{
"name": "clear"
},
{
"name": "ready"
},
{
"name": "checkOut"
}
@ -52,6 +58,7 @@
],
"ignore": [
"ConnectionReady",
"ConnectionPoolReady",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]

View File

@ -26,6 +26,9 @@
"waitQueueTimeoutMS": 5000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn0"
@ -111,6 +114,7 @@
}
],
"ignore": [
"ConnectionPoolReady",
"ConnectionClosed",
"ConnectionReady",
"ConnectionPoolCreated",

View File

@ -0,0 +1,104 @@
{
"version": 1,
"style": "unit",
"description": "clearing pool clears the WaitQueue",
"poolOptions": {
"maxPoolSize": 1,
"waitQueueTimeoutMS": 30000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "start",
"target": "thread2"
},
{
"name": "checkOut",
"thread": "thread2"
},
{
"name": "start",
"target": "thread3"
},
{
"name": "checkOut",
"thread": "thread3"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutStarted",
"count": 4
},
{
"name": "clear"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 3,
"timeout": 1000
}
],
"events": [
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"reason": "connectionError",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"reason": "connectionError",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"reason": "connectionError",
"address": 42
}
],
"ignore": [
"ConnectionPoolReady",
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionReady",
"ConnectionCheckedIn",
"ConnectionClosed"
]
}

View File

@ -0,0 +1,67 @@
{
"version": 1,
"style": "unit",
"description": "pool clear halts background minPoolSize establishments",
"poolOptions": {
"minPoolSize": 1
},
"operations": [
{
"name": "ready"
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 1
},
{
"name": "clear"
},
{
"name": "wait",
"ms": 200
},
{
"name": "ready"
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 2
}
],
"events": [
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionClosed"
]
}

View File

@ -0,0 +1,32 @@
{
"version": 1,
"style": "unit",
"description": "clearing a paused pool emits no events",
"operations": [
{
"name": "clear"
},
{
"name": "ready"
},
{
"name": "clear"
},
{
"name": "clear"
}
],
"events": [
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated"
]
}

View File

@ -0,0 +1,69 @@
{
"version": 1,
"style": "unit",
"description": "after clear, cannot check out connections until pool ready",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "clear"
},
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 1
},
{
"name": "ready"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42,
"connectionId": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"address": 42,
"reason": "connectionError"
},
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionReady",
"ConnectionCheckOutStarted",
"ConnectionCreated"
]
}

View File

@ -3,6 +3,9 @@
"style": "unit",
"description": "When a pool is closed, it MUST first destroy all available connections in that pool",
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
@ -40,6 +43,7 @@
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionPoolCreated",
"ConnectionCheckOutStarted",

View File

@ -6,6 +6,9 @@
"maxPoolSize": 3
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn1"
@ -124,6 +127,7 @@
}
],
"ignore": [
"ConnectionReady"
"ConnectionReady",
"ConnectionPoolReady"
]
}

View File

@ -0,0 +1,62 @@
{
"version": 1,
"style": "integration",
"description": "error during minPoolSize population clears pool",
"runOn": [
{
"minServerVersion": "4.2.0"
}
],
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 50
},
"data": {
"failCommands": [
"isMaster"
],
"closeConnection": true
}
},
"poolOptions": {
"minPoolSize": 1
},
"operations": [
{
"name": "ready"
},
{
"name": "waitForEvent",
"event": "ConnectionClosed",
"count": 1
},
{
"name": "wait",
"ms": 200
}
],
"events": [
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionClosed",
"address": 42,
"connectionId": 42,
"reason": "error"
},
{
"type": "ConnectionPoolCleared",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated"
]
}

View File

@ -6,6 +6,13 @@
"minPoolSize": 3
},
"operations": [
{
"name": "wait",
"ms": 200
},
{
"name": "ready"
},
{
"name": "waitForEvent",
"event": "ConnectionCreated",
@ -26,6 +33,10 @@
"address": 42,
"options": 42
},
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42,

View File

@ -0,0 +1,39 @@
{
"version": 1,
"style": "unit",
"description": "readying a ready pool emits no events",
"operations": [
{
"name": "ready"
},
{
"name": "ready"
},
{
"name": "ready"
},
{
"name": "clear"
},
{
"name": "ready"
}
],
"events": [
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionPoolReady",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated"
]
}

57
test/cmap/pool-ready.json Normal file
View File

@ -0,0 +1,57 @@
{
"version": 1,
"style": "unit",
"description": "pool starts as cleared and becomes ready",
"operations": [
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "waitForEvent",
"event": "ConnectionCheckOutFailed",
"count": 1
},
{
"name": "ready"
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCheckOutFailed",
"reason": "connectionError",
"address": 42
},
{
"type": "ConnectionPoolReady",
"address": 42
},
{
"type": "ConnectionCheckOutStarted",
"address": 42
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionReady"
]
}

View File

@ -7,6 +7,9 @@
"waitQueueTimeoutMS": 20
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut",
"label": "conn0"
@ -66,6 +69,7 @@
"ConnectionCreated",
"ConnectionReady",
"ConnectionClosed",
"ConnectionPoolCreated"
"ConnectionPoolCreated",
"ConnectionPoolReady"
]
}

View File

@ -39,14 +39,6 @@
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
},
{
"name": "insertMany",
"object": "collection",

View File

@ -38,14 +38,6 @@
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
},
{
"name": "insertMany",
"object": "collection",

View File

@ -39,14 +39,6 @@
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
},
{
"name": "insertMany",
"object": "collection",

View File

@ -0,0 +1,101 @@
{
"runOn": [
{
"minServerVersion": "4.9"
}
],
"database_name": "sdam-tests",
"collection_name": "sdam-minPoolSize-error",
"data": [],
"tests": [
{
"description": "Network error on minPoolSize background creation",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"skip": 3
},
"data": {
"failCommands": [
"isMaster"
],
"appName": "SDAMminPoolSizeError",
"closeConnection": true
}
},
"clientOptions": {
"heartbeatFrequencyMS": 10000,
"appname": "SDAMminPoolSizeError",
"minPoolSize": 10,
"serverSelectionTimeoutMS": 1000,
"directConnection": true
},
"operations": [
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "PoolReadyEvent",
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "PoolClearedEvent",
"count": 1
}
},
{
"name": "waitForEvent",
"object": "testRunner",
"arguments": {
"event": "ServerMarkedUnknownEvent",
"count": 1
}
},
{
"name": "runCommand",
"object": "database",
"command_name": "ping",
"arguments": {
"command": {
"ping": {}
}
},
"error": true
},
{
"name": "configureFailPoint",
"object": "testRunner",
"arguments": {
"failPoint": {
"configureFailPoint": "failCommand",
"mode": "off"
}
}
},
{
"name": "runCommand",
"object": "database",
"command_name": "ping",
"arguments": {
"command": {
"ping": 1
}
},
"error": false
},
{
"name": "assertEventCount",
"object": "testRunner",
"arguments": {
"event": "PoolReadyEvent",
"count": 2
}
}
]
}
]
}

View File

@ -57,6 +57,27 @@ class MockPool(Pool):
yield sock_info
class DummyMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):
self._server_description = server_description
self.opened = False
def cancel_check(self):
pass
def join(self):
pass
def open(self):
self.opened = True
def request_check(self):
pass
def close(self):
self.opened = False
class MockMonitor(Monitor):
def __init__(
self,

View File

@ -62,6 +62,7 @@ from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
from pymongo.server_type import SERVER_TYPE
from pymongo.settings import TOPOLOGY_TYPE
from pymongo.topology import _ErrorContext
from pymongo.srv_resolver import _HAVE_DNSPYTHON
from pymongo.write_concern import WriteConcern
from test import (client_context,
@ -1090,7 +1091,8 @@ class TestClient(IntegrationTest):
client = rs_or_single_client(maxPoolSize=3, waitQueueMultiple=2)
pool = get_pool(client)
self.assertEqual(pool.opts.wait_queue_multiple, 2)
self.assertEqual(pool._socket_semaphore.waiter_semaphore.counter, 6)
self.assertEqual(pool.max_waiters, 6)
self.assertEqual(pool.max_pool_size, 3)
def test_socketKeepAlive(self):
for socketKeepAlive in [True, False]:
@ -1341,7 +1343,7 @@ class TestClient(IntegrationTest):
self.assertTrue(sock_info.closed)
# The semaphore was decremented despite the error.
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
self.assertEqual(0, pool.requests)
@client_context.require_auth
def test_auth_network_error(self):
@ -1546,7 +1548,9 @@ class TestClient(IntegrationTest):
def run(self):
while self.running:
self.pool.reset()
exc = AutoReconnect('mock pool error')
ctx = _ErrorContext(exc, 0, pool.generation, False)
client._topology.handle_error(pool.address, ctx)
time.sleep(0.001)
t = ResetPoolThread(pool)
@ -1680,7 +1684,7 @@ class TestExhaustCursor(IntegrationTest):
# The socket was checked in and the semaphore was decremented.
self.assertIn(sock_info, pool.sockets)
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
self.assertEqual(0, pool.requests)
def test_exhaust_getmore_server_error(self):
# When doing a getmore on an exhaust cursor, the socket stays checked
@ -1739,7 +1743,7 @@ class TestExhaustCursor(IntegrationTest):
# The socket was closed and the semaphore was decremented.
self.assertNotIn(sock_info, pool.sockets)
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
self.assertEqual(0, pool.requests)
def test_exhaust_getmore_network_error(self):
# When doing a getmore on an exhaust cursor, the socket stays checked
@ -1766,7 +1770,7 @@ class TestExhaustCursor(IntegrationTest):
# The socket was closed and the semaphore was decremented.
self.assertNotIn(sock_info, pool.sockets)
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
self.assertEqual(0, pool.requests)
class TestClientLazyConnect(IntegrationTest):

View File

@ -16,6 +16,7 @@
import os
import sys
import threading
import time
sys.path[0:0] = [""]
@ -35,23 +36,27 @@ from pymongo.monitoring import (ConnectionCheckedInEvent,
ConnectionCreatedEvent,
ConnectionReadyEvent,
PoolCreatedEvent,
PoolReadyEvent,
PoolClearedEvent,
PoolClosedEvent)
from pymongo.read_preferences import ReadPreference
from pymongo.pool import _PoolClosedError
from pymongo.pool import _PoolClosedError, PoolState
from test import (IntegrationTest,
from test import (client_knobs,
IntegrationTest,
unittest)
from test.utils import (camel_to_snake,
client_context,
CMAPListener,
get_pool,
get_pools,
OvertCommandListener,
rs_or_single_client,
single_client,
TestCreator,
wait_until)
from test.utils_spec_runner import SpecRunnerThread
from test.pymongo_mocks import DummyMonitor
OBJECT_TYPES = {
@ -64,6 +69,7 @@ OBJECT_TYPES = {
'ConnectionReady': ConnectionReadyEvent,
'ConnectionCheckOutStarted': ConnectionCheckOutStartedEvent,
'ConnectionPoolCreated': PoolCreatedEvent,
'ConnectionPoolReady': PoolReadyEvent,
'ConnectionPoolCleared': PoolClearedEvent,
'ConnectionPoolClosed': PoolClosedEvent,
# Error types.
@ -98,13 +104,15 @@ class TestCMAP(IntegrationTest):
thread.join()
if thread.exc:
raise thread.exc
self.assertFalse(thread.ops)
def wait_for_event(self, op):
"""Run the 'waitForEvent' operation."""
event = OBJECT_TYPES[op['event']]
count = op['count']
timeout = op.get('timeout', 10000) / 1000.0
wait_until(lambda: self.listener.event_count(event) >= count,
'find %s %s event(s)' % (count, event))
'find %s %s event(s)' % (count, event), timeout=timeout)
def check_out(self, op):
"""Run the 'checkOut' operation."""
@ -121,6 +129,10 @@ class TestCMAP(IntegrationTest):
sock_info = self.labels[label]
self.pool.return_socket(sock_info)
def ready(self, op):
"""Run the 'ready' operation."""
self.pool.ready()
def clear(self, op):
"""Run the 'clear' operation."""
self.pool.reset()
@ -213,9 +225,13 @@ class TestCMAP(IntegrationTest):
opts = test['poolOptions'].copy()
opts['event_listeners'] = [self.listener]
client = single_client(**opts)
opts['_monitor_class'] = DummyMonitor
with client_knobs(kill_cursor_frequency=.05,
min_heartbeat_interval=.05):
client = single_client(**opts)
self.addCleanup(client.close)
self.pool = get_pool(client)
# self.pool = get_pools(client)[0]
self.pool = list(client._get_topology()._servers.values())[0].pool
# Map of target names to Thread objects.
self.targets = dict()
@ -342,13 +358,14 @@ class TestCMAP(IntegrationTest):
client.admin.command('isMaster')
self.assertIsInstance(listener.events[0], PoolCreatedEvent)
self.assertIsInstance(listener.events[1],
ConnectionCheckOutStartedEvent)
self.assertIsInstance(listener.events[1], PoolReadyEvent)
self.assertIsInstance(listener.events[2],
ConnectionCheckOutStartedEvent)
self.assertIsInstance(listener.events[3],
ConnectionCheckOutFailedEvent)
self.assertIsInstance(listener.events[3], PoolClearedEvent)
self.assertIsInstance(listener.events[4], PoolClearedEvent)
failed_event = listener.events[2]
failed_event = listener.events[3]
self.assertEqual(
failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR)
@ -363,17 +380,16 @@ class TestCMAP(IntegrationTest):
client.admin.command('isMaster')
self.assertIsInstance(listener.events[0], PoolCreatedEvent)
self.assertIsInstance(listener.events[1],
self.assertIsInstance(listener.events[1], PoolReadyEvent)
self.assertIsInstance(listener.events[2],
ConnectionCheckOutStartedEvent)
self.assertIsInstance(listener.events[2], ConnectionCreatedEvent)
self.assertIsInstance(listener.events[3], ConnectionCreatedEvent)
# Error happens here.
self.assertIsInstance(listener.events[3], ConnectionClosedEvent)
self.assertIsInstance(listener.events[4],
self.assertIsInstance(listener.events[4], ConnectionClosedEvent)
self.assertIsInstance(listener.events[5],
ConnectionCheckOutFailedEvent)
failed_event = listener.events[4]
self.assertEqual(
failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR)
self.assertEqual(listener.events[5].reason,
ConnectionCheckOutFailedReason.CONN_ERROR)
#
# Extra non-spec tests
@ -398,6 +414,73 @@ class TestCMAP(IntegrationTest):
self.assertRepr(PoolClearedEvent(host))
self.assertRepr(PoolClosedEvent(host))
def test_close_leaves_pool_unpaused(self):
# Needed until we implement PYTHON-2463. This test is related to
# test_threads.TestThreads.test_client_disconnect
listener = CMAPListener()
client = single_client(event_listeners=[listener])
client.admin.command('ping')
pool = get_pool(client)
client.close()
self.assertEqual(1, listener.event_count(PoolClearedEvent))
self.assertEqual(PoolState.READY, pool.state)
# Checking out a connection should succeed
with pool.get_socket({}):
pass
@client_context.require_version_max(4, 3) # Remove after SERVER-53624.
@client_context.require_retryable_writes
@client_context.require_failCommand_fail_point
def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener()
client = rs_or_single_client(
maxPoolSize=1,
heartbeatFrequencyMS=500,
event_listeners=[cmap_listener, cmd_listener])
self.addCleanup(client.close)
threads = [InsertThread(client.pymongo_test.test) for _ in range(3)]
fail_command = {
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'blockConnection': True,
'blockTimeMS': 1000,
'errorCode': 91
},
}
with self.fail_point(fail_command):
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
self.assertTrue(thread.passed)
# The two threads in the wait queue fail the initial connection check
# out attempt and then succeed on retry.
self.assertEqual(
2, cmap_listener.event_count(ConnectionCheckOutFailedEvent))
# Connection check out failures are not reflected in command
# monitoring because we only publish command events _after_ checking
# out a connection.
self.assertEqual(4, len(cmd_listener.results['started']))
self.assertEqual(3, len(cmd_listener.results['succeeded']))
self.assertEqual(1, len(cmd_listener.results['failed']))
class InsertThread(threading.Thread):
def __init__(self, collection):
super(InsertThread, self).__init__()
self.daemon = True
self.collection = collection
self.passed = False
def run(self):
self.collection.insert_one({})
self.passed = True
def create_test(scenario_def, test, name):
def run_scenario(self):

View File

@ -39,14 +39,18 @@ from pymongo.uri_parser import parse_uri
from test import unittest, IntegrationTest
from test.utils import (assertion_context,
cdecimal_patched,
CMAPListener,
client_context,
Barrier,
get_pool,
HeartbeatEventListener,
server_name_to_type,
rs_or_single_client,
single_client,
TestCreator,
wait_until)
from test.utils_spec_runner import SpecRunner, SpecRunnerThread
from test.pymongo_mocks import DummyMonitor
# Location of JSON test specifications.
@ -54,27 +58,7 @@ _TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'discovery_and_monitoring')
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):
self._server_description = server_description
def cancel_check(self):
pass
def open(self):
pass
def close(self):
pass
def join(self):
pass
def request_check(self):
pass
def create_mock_topology(uri, monitor_class=MockMonitor):
def create_mock_topology(uri, monitor_class=DummyMonitor):
parsed_uri = parse_uri(uri)
replica_set_name = None
direct_connection = None
@ -318,6 +302,46 @@ class TestIgnoreStaleErrors(IntegrationTest):
client.admin.command('ping')
class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener):
pass
class TestPoolManagement(IntegrationTest):
@client_context.require_failCommand_appName
def test_pool_unpause(self):
# This test implements the prose test "Connection Pool Management"
listener = CMAPHeartbeatListener()
client = single_client(appName="SDAMPoolManagementTest",
heartbeatFrequencyMS=500,
event_listeners=[listener])
self.addCleanup(client.close)
# Assert that ConnectionPoolReadyEvent occurs after the first
# ServerHeartbeatSucceededEvent.
listener.wait_for_event(monitoring.PoolReadyEvent, 1)
pool_ready = listener.events_by_type(monitoring.PoolReadyEvent)[0]
hb_succeeded = listener.events_by_type(
monitoring.ServerHeartbeatSucceededEvent)[0]
self.assertGreater(
listener.events.index(pool_ready),
listener.events.index(hb_succeeded))
listener.reset()
fail_ismaster = {
'mode': {'times': 2},
'data': {
'failCommands': ['isMaster'],
'errorCode': 1234,
'appName': 'SDAMPoolManagementTest',
},
}
with self.fail_point(fail_ismaster):
listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1)
listener.wait_for_event(monitoring.PoolClearedEvent, 1)
listener.wait_for_event(
monitoring.ServerHeartbeatSucceededEvent, 1)
listener.wait_for_event(monitoring.PoolReadyEvent, 1)
class TestIntegration(SpecRunner):
# Location of JSON test specifications.
TEST_PATH = os.path.join(

View File

@ -51,12 +51,12 @@ class TestHeartbeatMonitoring(unittest.TestCase):
# monitor thread may run multiple times during the execution
# of this test.
wait_until(
lambda: len(listener.results) >= expected_len,
lambda: len(listener.events) >= expected_len,
"publish all events")
try:
# zip gives us len(expected_results) pairs.
for expected, actual in zip(expected_results, listener.results):
for expected, actual in zip(expected_results, listener.events):
self.assertEqual(expected,
actual.__class__.__name__)
self.assertEqual(actual.connection_id,

View File

@ -176,7 +176,9 @@ class _TestPoolingBase(unittest.TestCase):
pool_options = client_context.client._topology_settings.pool_options
kwargs['ssl_context'] = pool_options.ssl_context
kwargs['ssl_match_hostname'] = pool_options.ssl_match_hostname
return Pool(pair, PoolOptions(*args, **kwargs))
pool = Pool(pair, PoolOptions(*args, **kwargs))
pool.ready()
return pool
class TestPooling(_TestPoolingBase):
@ -483,7 +485,7 @@ class TestPoolMaxSize(_TestPoolingBase):
joinall(threads)
self.assertEqual(nthreads, self.n_passed)
self.assertTrue(len(cx_pool.sockets) > 1)
self.assertEqual(max_pool_size, cx_pool._socket_semaphore.counter)
self.assertEqual(0, cx_pool.requests)
def test_max_pool_size_none(self):
c = rs_or_single_client(maxPoolSize=None)
@ -529,6 +531,7 @@ class TestPoolMaxSize(_TestPoolingBase):
connect_timeout=1,
socket_timeout=1,
wait_queue_timeout=1))
test_pool.ready()
# First call to get_socket fails; if pool doesn't release its semaphore
# then the second call raises "ConnectionFailure: Timed out waiting for

View File

@ -203,7 +203,7 @@ class TestStreamingProtocol(IntegrationTest):
self.assertTrue(hb_failed_events[0].awaited)
# Depending on thread scheduling, the failed heartbeat could occur on
# the second or third check.
events = [type(e) for e in hb_listener.results[:4]]
events = [type(e) for e in hb_listener.events[:4]]
if events == [monitoring.ServerHeartbeatStartedEvent,
monitoring.ServerHeartbeatSucceededEvent,
monitoring.ServerHeartbeatStartedEvent,

View File

@ -37,24 +37,7 @@ from pymongo.server_selectors import (any_server_selector,
from pymongo.settings import TopologySettings
from test import client_knobs, unittest
from test.utils import MockPool, wait_until
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):
self._server_description = server_description
self.opened = False
def cancel_check(self):
pass
def open(self):
self.opened = True
def request_check(self):
pass
def close(self):
self.opened = False
from test.pymongo_mocks import DummyMonitor
class SetNameDiscoverySettings(TopologySettings):
@ -68,7 +51,7 @@ address = ('a', 27017)
def create_mock_topology(
seeds=None,
replica_set_name=None,
monitor_class=MockMonitor):
monitor_class=DummyMonitor):
partitioned_seeds = list(imap(common.partition_node, seeds or ['a']))
topology_settings = TopologySettings(
partitioned_seeds,
@ -501,7 +484,7 @@ class TestMultiServerTopology(TopologyTest):
topology_settings = SetNameDiscoverySettings(
seeds=[address],
pool_class=MockPool,
monitor_class=MockMonitor)
monitor_class=DummyMonitor)
t = Topology(topology_settings)
self.assertEqual(t.description.replica_set_name, None)
@ -537,7 +520,7 @@ class TestMultiServerTopology(TopologyTest):
topology_settings = SetNameDiscoverySettings(
seeds=[address],
pool_class=MockPool,
monitor_class=MockMonitor)
monitor_class=DummyMonitor)
t = Topology(topology_settings)
self.assertEqual(t.description.replica_set_name, None)

View File

@ -63,7 +63,7 @@ else:
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=50)
class CMAPListener(ConnectionPoolListener):
class BaseListener(object):
def __init__(self):
self.events = []
@ -74,9 +74,26 @@ class CMAPListener(ConnectionPoolListener):
self.events.append(event)
def event_count(self, event_type):
return len([event for event in self.events[:]
if isinstance(event, event_type)])
return len(self.events_by_type(event_type))
def events_by_type(self, event_type):
"""Return the matching events by event class.
event_type can be a single class or a tuple of classes.
"""
return self.matching(lambda e: isinstance(e, event_type))
def matching(self, matcher):
"""Return the matching events."""
return [event for event in self.events[:] if matcher(event)]
def wait_for_event(self, event, count):
"""Wait for a number of events to be published, or fail."""
wait_until(lambda: self.event_count(event) >= count,
'find %s %s event(s)' % (count, event))
class CMAPListener(BaseListener, monitoring.ConnectionPoolListener):
def connection_created(self, event):
self.add_event(event)
@ -101,6 +118,9 @@ class CMAPListener(ConnectionPoolListener):
def pool_created(self, event):
self.add_event(event)
def pool_ready(self, event):
self.add_event(event)
def pool_cleared(self, event):
self.add_event(event)
@ -199,25 +219,17 @@ class ServerAndTopologyEventListener(ServerEventListener,
"""Listens to Server and Topology events."""
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
class HeartbeatEventListener(BaseListener, monitoring.ServerHeartbeatListener):
"""Listens to only server heartbeat events."""
def __init__(self):
self.results = []
def started(self, event):
self.results.append(event)
self.add_event(event)
def succeeded(self, event):
self.results.append(event)
self.add_event(event)
def failed(self, event):
self.results.append(event)
def matching(self, matcher):
"""Return the matching events."""
results = self.results[:]
return [event for event in results if matcher(event)]
self.add_event(event)
class MockSocketInfo(object):
@ -252,9 +264,15 @@ class MockPool(object):
with self._lock:
self.generation += 1
def ready(self):
pass
def reset(self):
self._reset()
def reset_without_pause(self):
self._reset()
def close(self):
self._reset()

View File

@ -30,23 +30,7 @@ from pymongo.server_selectors import writable_server_selector
from pymongo.topology import Topology
from test import unittest
from test.utils import MockPool, parse_read_preference
class MockMonitor(object):
def __init__(self, server_description, topology, pool, topology_settings):
pass
def cancel_check(self):
pass
def open(self):
pass
def request_check(self):
pass
def close(self):
pass
from test.pymongo_mocks import DummyMonitor
def get_addresses(server_list):
@ -122,7 +106,7 @@ def get_topology_type_name(scenario_def):
def get_topology_settings_dict(**kwargs):
settings = dict(
monitor_class=MockMonitor,
monitor_class=DummyMonitor,
heartbeat_frequency=HEARTBEAT_FREQUENCY,
pool_class=MockPool
)