PYTHON-3175 Preemptively cancel in progress operations when SDAM heartbeats timeout (#1465)
This commit is contained in:
parent
b8d6bfdf08
commit
c4e4bd638f
@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
|
||||
|
||||
from pymongo import common, periodic_executor
|
||||
from pymongo._csot import MovingMinimum
|
||||
from pymongo.errors import NotPrimaryError, OperationFailure, _OperationCancelled
|
||||
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
|
||||
from pymongo.hello import Hello
|
||||
from pymongo.lock import _create_lock
|
||||
from pymongo.periodic_executor import _shutdown_executors
|
||||
@ -204,7 +204,9 @@ class Monitor(MonitorBase):
|
||||
|
||||
# Update the Topology and clear the server pool on error.
|
||||
self._topology.on_change(
|
||||
self._server_description, reset_pool=self._server_description.error
|
||||
self._server_description,
|
||||
reset_pool=self._server_description.error,
|
||||
interrupt_connections=isinstance(self._server_description.error, NetworkTimeout),
|
||||
)
|
||||
|
||||
if self._stream and (
|
||||
|
||||
@ -874,15 +874,22 @@ class PoolClearedEvent(_PoolEvent):
|
||||
:param address: The address (host, port) pair of the server this Pool is
|
||||
attempting to connect to.
|
||||
:param service_id: The service_id this command was sent to, or ``None``.
|
||||
:param interrupt_connections: True if all active connections were interrupted by the Pool during clearing.
|
||||
|
||||
.. versionadded:: 3.9
|
||||
"""
|
||||
|
||||
__slots__ = ("__service_id",)
|
||||
__slots__ = ("__service_id", "__interrupt_connections")
|
||||
|
||||
def __init__(self, address: _Address, service_id: Optional[ObjectId] = None) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
address: _Address,
|
||||
service_id: Optional[ObjectId] = None,
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
super().__init__(address)
|
||||
self.__service_id = service_id
|
||||
self.__interrupt_connections = interrupt_connections
|
||||
|
||||
@property
|
||||
def service_id(self) -> Optional[ObjectId]:
|
||||
@ -894,8 +901,16 @@ class PoolClearedEvent(_PoolEvent):
|
||||
"""
|
||||
return self.__service_id
|
||||
|
||||
@property
|
||||
def interrupt_connections(self) -> bool:
|
||||
"""If True, active connections are interrupted during clearing.
|
||||
|
||||
.. versionadded:: 4.7
|
||||
"""
|
||||
return self.__interrupt_connections
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__.__name__}({self.address!r}, {self.__service_id!r})"
|
||||
return f"{self.__class__.__name__}({self.address!r}, {self.__service_id!r}, {self.__interrupt_connections!r})"
|
||||
|
||||
|
||||
class PoolClosedEvent(_PoolEvent):
|
||||
@ -1775,9 +1790,14 @@ class _EventListeners:
|
||||
except Exception:
|
||||
_handle_exception()
|
||||
|
||||
def publish_pool_cleared(self, address: _Address, service_id: Optional[ObjectId]) -> None:
|
||||
def publish_pool_cleared(
|
||||
self,
|
||||
address: _Address,
|
||||
service_id: Optional[ObjectId],
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
"""Publish a :class:`PoolClearedEvent` to all pool listeners."""
|
||||
event = PoolClearedEvent(address, service_id)
|
||||
event = PoolClearedEvent(address, service_id, interrupt_connections)
|
||||
for subscriber in self.__cmap_listeners:
|
||||
try:
|
||||
subscriber.pool_cleared(event)
|
||||
|
||||
@ -296,35 +296,35 @@ _POLL_TIMEOUT = 0.5
|
||||
|
||||
def wait_for_read(conn: Connection, deadline: Optional[float]) -> None:
|
||||
"""Block until at least one byte is read, or a timeout, or a cancel."""
|
||||
context = conn.cancel_context
|
||||
# Only Monitor connections can be cancelled.
|
||||
if context:
|
||||
sock = conn.conn
|
||||
timed_out = False
|
||||
while True:
|
||||
# SSLSocket can have buffered data which won't be caught by select.
|
||||
if hasattr(sock, "pending") and sock.pending() > 0:
|
||||
readable = True
|
||||
sock = conn.conn
|
||||
timed_out = False
|
||||
# Check if the connection's socket has been manually closed
|
||||
if sock.fileno() == -1:
|
||||
return
|
||||
while True:
|
||||
# SSLSocket can have buffered data which won't be caught by select.
|
||||
if hasattr(sock, "pending") and sock.pending() > 0:
|
||||
readable = True
|
||||
else:
|
||||
# Wait up to 500ms for the socket to become readable and then
|
||||
# check for cancellation.
|
||||
if deadline:
|
||||
remaining = deadline - time.monotonic()
|
||||
# When the timeout has expired perform one final check to
|
||||
# see if the socket is readable. This helps avoid spurious
|
||||
# timeouts on AWS Lambda and other FaaS environments.
|
||||
if remaining <= 0:
|
||||
timed_out = True
|
||||
timeout = max(min(remaining, _POLL_TIMEOUT), 0)
|
||||
else:
|
||||
# Wait up to 500ms for the socket to become readable and then
|
||||
# check for cancellation.
|
||||
if deadline:
|
||||
remaining = deadline - time.monotonic()
|
||||
# When the timeout has expired perform one final check to
|
||||
# see if the socket is readable. This helps avoid spurious
|
||||
# timeouts on AWS Lambda and other FaaS environments.
|
||||
if remaining <= 0:
|
||||
timed_out = True
|
||||
timeout = max(min(remaining, _POLL_TIMEOUT), 0)
|
||||
else:
|
||||
timeout = _POLL_TIMEOUT
|
||||
readable = conn.socket_checker.select(sock, read=True, timeout=timeout)
|
||||
if context.cancelled:
|
||||
raise _OperationCancelled("hello cancelled")
|
||||
if readable:
|
||||
return
|
||||
if timed_out:
|
||||
raise socket.timeout("timed out")
|
||||
timeout = _POLL_TIMEOUT
|
||||
readable = conn.socket_checker.select(sock, read=True, timeout=timeout)
|
||||
if conn.cancel_context.cancelled:
|
||||
raise _OperationCancelled("operation cancelled")
|
||||
if readable:
|
||||
return
|
||||
if timed_out:
|
||||
raise socket.timeout("timed out")
|
||||
|
||||
|
||||
# Errors raised by sockets (and TLS sockets) when in non-blocking mode.
|
||||
|
||||
@ -739,10 +739,7 @@ class Connection:
|
||||
self.pool_gen = pool.gen
|
||||
self.generation = self.pool_gen.get_overall()
|
||||
self.ready = False
|
||||
self.cancel_context: Optional[_CancellationContext] = None
|
||||
if not pool.handshake:
|
||||
# This is a Monitor connection.
|
||||
self.cancel_context = _CancellationContext()
|
||||
self.cancel_context: _CancellationContext = _CancellationContext()
|
||||
self.opts = pool.opts
|
||||
self.more_to_come: bool = False
|
||||
# For load balancer support.
|
||||
@ -1112,8 +1109,7 @@ class Connection:
|
||||
if self.closed:
|
||||
return
|
||||
self.closed = True
|
||||
if self.cancel_context:
|
||||
self.cancel_context.cancel()
|
||||
self.cancel_context.cancel()
|
||||
# Note: We catch exceptions to avoid spurious errors on interpreter
|
||||
# shutdown.
|
||||
try:
|
||||
@ -1378,6 +1374,7 @@ class Pool:
|
||||
# and returned to pool from the left side. Stale sockets removed
|
||||
# from the right side.
|
||||
self.conns: collections.deque = collections.deque()
|
||||
self.active_contexts: set[_CancellationContext] = set()
|
||||
self.lock = _create_lock()
|
||||
self.active_sockets = 0
|
||||
# Monotonically increasing connection ID required for CMAP Events.
|
||||
@ -1442,7 +1439,11 @@ class Pool:
|
||||
return self.state == PoolState.CLOSED
|
||||
|
||||
def _reset(
|
||||
self, close: bool, pause: bool = True, service_id: Optional[ObjectId] = None
|
||||
self,
|
||||
close: bool,
|
||||
pause: bool = True,
|
||||
service_id: Optional[ObjectId] = None,
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
old_state = self.state
|
||||
with self.size_cond:
|
||||
@ -1475,6 +1476,10 @@ class Pool:
|
||||
self._max_connecting_cond.notify_all()
|
||||
self.size_cond.notify_all()
|
||||
|
||||
if interrupt_connections:
|
||||
for context in self.active_contexts:
|
||||
context.cancel()
|
||||
|
||||
listeners = self.opts._event_listeners
|
||||
# CMAP spec says that close() MUST close sockets before publishing the
|
||||
# PoolClosedEvent but that reset() SHOULD close sockets *after*
|
||||
@ -1488,7 +1493,11 @@ class Pool:
|
||||
else:
|
||||
if old_state != PoolState.PAUSED and self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_pool_cleared(self.address, service_id=service_id)
|
||||
listeners.publish_pool_cleared(
|
||||
self.address,
|
||||
service_id=service_id,
|
||||
interrupt_connections=interrupt_connections,
|
||||
)
|
||||
for conn in sockets:
|
||||
conn.close_conn(ConnectionClosedReason.STALE)
|
||||
|
||||
@ -1501,8 +1510,10 @@ class Pool:
|
||||
for _socket in self.conns:
|
||||
_socket.update_is_writable(self.is_writable)
|
||||
|
||||
def reset(self, service_id: Optional[ObjectId] = None) -> None:
|
||||
self._reset(close=False, service_id=service_id)
|
||||
def reset(
|
||||
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
|
||||
) -> None:
|
||||
self._reset(close=False, service_id=service_id, interrupt_connections=interrupt_connections)
|
||||
|
||||
def reset_without_pause(self) -> None:
|
||||
self._reset(close=False, pause=False)
|
||||
@ -1558,6 +1569,7 @@ class Pool:
|
||||
conn.close_conn(ConnectionClosedReason.STALE)
|
||||
return
|
||||
self.conns.appendleft(conn)
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
finally:
|
||||
if incremented:
|
||||
# Notify after adding the socket to the pool.
|
||||
@ -1602,6 +1614,8 @@ class Pool:
|
||||
raise
|
||||
|
||||
conn = Connection(sock, self, self.address, conn_id) # type: ignore[arg-type]
|
||||
with self.lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
try:
|
||||
if self.handshake:
|
||||
conn.hello()
|
||||
@ -1644,6 +1658,8 @@ class Pool:
|
||||
assert listeners is not None
|
||||
listeners.publish_connection_checked_out(self.address, conn.id)
|
||||
try:
|
||||
with self.lock:
|
||||
self.active_contexts.add(conn.cancel_context)
|
||||
yield conn
|
||||
except BaseException:
|
||||
# Exception in caller. Ensure the connection gets returned.
|
||||
@ -1731,7 +1747,6 @@ class Pool:
|
||||
with self.lock:
|
||||
self.active_sockets += 1
|
||||
incremented = True
|
||||
|
||||
while conn is None:
|
||||
# CMAP: we MUST wait for either maxConnecting OR for a socket
|
||||
# to be checked back into the pool.
|
||||
@ -1794,6 +1809,8 @@ class Pool:
|
||||
conn.pinned_cursor = False
|
||||
self.__pinned_sockets.discard(conn)
|
||||
listeners = self.opts._event_listeners
|
||||
with self.lock:
|
||||
self.active_contexts.discard(conn.cancel_context)
|
||||
if self.enabled_for_cmap:
|
||||
assert listeners is not None
|
||||
listeners.publish_connection_checked_in(self.address, conn.id)
|
||||
|
||||
@ -330,7 +330,10 @@ class Topology:
|
||||
return self.select_server(any_server_selector, server_selection_timeout, address)
|
||||
|
||||
def _process_change(
|
||||
self, server_description: ServerDescription, reset_pool: bool = False
|
||||
self,
|
||||
server_description: ServerDescription,
|
||||
reset_pool: bool = False,
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
"""Process a new ServerDescription on an opened topology.
|
||||
|
||||
@ -387,12 +390,17 @@ class Topology:
|
||||
if reset_pool:
|
||||
server = self._servers.get(server_description.address)
|
||||
if server:
|
||||
server.pool.reset()
|
||||
server.pool.reset(interrupt_connections=interrupt_connections)
|
||||
|
||||
# Wake waiters in select_servers().
|
||||
self._condition.notify_all()
|
||||
|
||||
def on_change(self, server_description: ServerDescription, reset_pool: bool = False) -> None:
|
||||
def on_change(
|
||||
self,
|
||||
server_description: ServerDescription,
|
||||
reset_pool: bool = False,
|
||||
interrupt_connections: bool = False,
|
||||
) -> None:
|
||||
"""Process a new ServerDescription after an hello call completes."""
|
||||
# We do no I/O holding the lock.
|
||||
with self._lock:
|
||||
@ -405,7 +413,7 @@ class Topology:
|
||||
# change removed it. E.g., we got a host list from the primary
|
||||
# that didn't include this server.
|
||||
if self._opened and self._description.has_server(server_description.address):
|
||||
self._process_change(server_description, reset_pool)
|
||||
self._process_change(server_description, reset_pool, interrupt_connections)
|
||||
|
||||
def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
|
||||
"""Process a new seedlist on an opened topology.
|
||||
|
||||
77
test/cmap/pool-clear-interrupting-pending-connections.json
Normal file
77
test/cmap/pool-clear-interrupting-pending-connections.json
Normal file
@ -0,0 +1,77 @@
|
||||
{
|
||||
"version": 1,
|
||||
"style": "integration",
|
||||
"description": "clear with interruptInUseConnections = true closes pending connections",
|
||||
"runOn": [
|
||||
{
|
||||
"minServerVersion": "4.9.0"
|
||||
}
|
||||
],
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": "alwaysOn",
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"isMaster",
|
||||
"hello"
|
||||
],
|
||||
"closeConnection": false,
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 1000
|
||||
}
|
||||
},
|
||||
"poolOptions": {
|
||||
"minPoolSize": 0
|
||||
},
|
||||
"operations": [
|
||||
{
|
||||
"name": "ready"
|
||||
},
|
||||
{
|
||||
"name": "start",
|
||||
"target": "thread1"
|
||||
},
|
||||
{
|
||||
"name": "checkOut",
|
||||
"thread": "thread1"
|
||||
},
|
||||
{
|
||||
"name": "waitForEvent",
|
||||
"event": "ConnectionCreated",
|
||||
"count": 1
|
||||
},
|
||||
{
|
||||
"name": "clear",
|
||||
"interruptInUseConnections": true
|
||||
},
|
||||
{
|
||||
"name": "waitForEvent",
|
||||
"event": "ConnectionCheckOutFailed",
|
||||
"count": 1
|
||||
}
|
||||
],
|
||||
"events": [
|
||||
{
|
||||
"type": "ConnectionCheckOutStarted"
|
||||
},
|
||||
{
|
||||
"type": "ConnectionCreated"
|
||||
},
|
||||
{
|
||||
"type": "ConnectionPoolCleared",
|
||||
"interruptInUseConnections": true
|
||||
},
|
||||
{
|
||||
"type": "ConnectionClosed"
|
||||
},
|
||||
{
|
||||
"type": "ConnectionCheckOutFailed"
|
||||
}
|
||||
],
|
||||
"ignore": [
|
||||
"ConnectionCheckedIn",
|
||||
"ConnectionCheckedOut",
|
||||
"ConnectionPoolCreated",
|
||||
"ConnectionPoolReady"
|
||||
]
|
||||
}
|
||||
@ -0,0 +1,81 @@
|
||||
{
|
||||
"version": 1,
|
||||
"style": "unit",
|
||||
"description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)",
|
||||
"poolOptions": {
|
||||
"backgroundThreadIntervalMS": 10000
|
||||
},
|
||||
"operations": [
|
||||
{
|
||||
"name": "ready"
|
||||
},
|
||||
{
|
||||
"name": "checkOut"
|
||||
},
|
||||
{
|
||||
"name": "checkOut",
|
||||
"label": "conn"
|
||||
},
|
||||
{
|
||||
"name": "checkIn",
|
||||
"connection": "conn"
|
||||
},
|
||||
{
|
||||
"name": "clear",
|
||||
"interruptInUseConnections": false
|
||||
},
|
||||
{
|
||||
"name": "waitForEvent",
|
||||
"event": "ConnectionPoolCleared",
|
||||
"count": 1,
|
||||
"timeout": 1000
|
||||
},
|
||||
{
|
||||
"name": "waitForEvent",
|
||||
"event": "ConnectionClosed",
|
||||
"count": 1,
|
||||
"timeout": 1000
|
||||
},
|
||||
{
|
||||
"name": "close"
|
||||
}
|
||||
],
|
||||
"events": [
|
||||
{
|
||||
"type": "ConnectionCheckedOut",
|
||||
"connectionId": 1,
|
||||
"address": 42
|
||||
},
|
||||
{
|
||||
"type": "ConnectionCheckedOut",
|
||||
"connectionId": 2,
|
||||
"address": 42
|
||||
},
|
||||
{
|
||||
"type": "ConnectionCheckedIn",
|
||||
"connectionId": 2,
|
||||
"address": 42
|
||||
},
|
||||
{
|
||||
"type": "ConnectionPoolCleared",
|
||||
"interruptInUseConnections": false
|
||||
},
|
||||
{
|
||||
"type": "ConnectionClosed",
|
||||
"connectionId": 2,
|
||||
"reason": "stale",
|
||||
"address": 42
|
||||
},
|
||||
{
|
||||
"type": "ConnectionPoolClosed",
|
||||
"address": 42
|
||||
}
|
||||
],
|
||||
"ignore": [
|
||||
"ConnectionCreated",
|
||||
"ConnectionPoolReady",
|
||||
"ConnectionReady",
|
||||
"ConnectionCheckOutStarted",
|
||||
"ConnectionPoolCreated"
|
||||
]
|
||||
}
|
||||
@ -0,0 +1,552 @@
|
||||
{
|
||||
"description": "interruptInUse",
|
||||
"schemaVersion": "1.11",
|
||||
"runOnRequirements": [
|
||||
{
|
||||
"minServerVersion": "4.9",
|
||||
"topologies": [
|
||||
"replicaset",
|
||||
"sharded"
|
||||
],
|
||||
"serverless": "forbid"
|
||||
}
|
||||
],
|
||||
"createEntities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "setupClient",
|
||||
"useMultipleMongoses": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"initialData": [
|
||||
{
|
||||
"collectionName": "interruptInUse",
|
||||
"databaseName": "sdam-tests",
|
||||
"documents": []
|
||||
}
|
||||
],
|
||||
"tests": [
|
||||
{
|
||||
"description": "Connection pool clear uses interruptInUseConnections=true after monitor timeout",
|
||||
"operations": [
|
||||
{
|
||||
"name": "createEntities",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"entities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "client",
|
||||
"useMultipleMongoses": false,
|
||||
"uriOptions": {
|
||||
"connectTimeoutMS": 500,
|
||||
"heartbeatFrequencyMS": 500,
|
||||
"appname": "interruptInUse",
|
||||
"retryReads": false,
|
||||
"minPoolSize": 0
|
||||
},
|
||||
"observeEvents": [
|
||||
"poolClearedEvent",
|
||||
"connectionClosedEvent",
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"commandFailedEvent",
|
||||
"connectionCheckedOutEvent",
|
||||
"connectionCheckedInEvent"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database",
|
||||
"client": "client",
|
||||
"databaseName": "sdam-tests"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "collection",
|
||||
"database": "database",
|
||||
"collectionName": "interruptInUse"
|
||||
}
|
||||
},
|
||||
{
|
||||
"thread": {
|
||||
"id": "thread1"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"document": { "_id" : 1 }
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runOnThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1",
|
||||
"operation": {
|
||||
"name": "find",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"filter": { "$where": "sleep(2000) || true" }
|
||||
},
|
||||
"expectError": {
|
||||
"isError": true
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 4
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"hello", "isMaster"
|
||||
],
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 1500,
|
||||
"appName": "interruptInUse"
|
||||
}
|
||||
},
|
||||
"client": "setupClient"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "waitForThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1"
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "command",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "cmap",
|
||||
"events": [
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"poolClearedEvent": {
|
||||
"interruptInUseConnections": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": {}
|
||||
},
|
||||
{
|
||||
"connectionClosedEvent": { }
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"outcome": [{
|
||||
"collectionName": "interruptInUse",
|
||||
"databaseName": "sdam-tests",
|
||||
"documents": [{ "_id": 1 }]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable",
|
||||
"operations": [
|
||||
{
|
||||
"name": "createEntities",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"entities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "client",
|
||||
"useMultipleMongoses": false,
|
||||
"uriOptions": {
|
||||
"connectTimeoutMS": 500,
|
||||
"heartbeatFrequencyMS": 500,
|
||||
"appname": "interruptInUseRetryable",
|
||||
"retryReads": true,
|
||||
"minPoolSize": 0
|
||||
},
|
||||
"observeEvents": [
|
||||
"poolClearedEvent",
|
||||
"connectionClosedEvent",
|
||||
"commandFailedEvent",
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"connectionCheckedOutEvent",
|
||||
"connectionCheckedInEvent"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database",
|
||||
"client": "client",
|
||||
"databaseName": "sdam-tests"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "collection",
|
||||
"database": "database",
|
||||
"collectionName": "interruptInUse"
|
||||
}
|
||||
},
|
||||
{
|
||||
"thread": {
|
||||
"id": "thread1"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"document": { "_id" : 1 }
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runOnThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1",
|
||||
"operation": {
|
||||
"name": "find",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"filter": { "$where": "sleep(2000) || true" }
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 4
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"hello", "isMaster"
|
||||
],
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 1500,
|
||||
"appName": "interruptInUseRetryable"
|
||||
}
|
||||
},
|
||||
"client": "setupClient"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "waitForThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1"
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "command",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "find"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "cmap",
|
||||
"events": [
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"poolClearedEvent": {
|
||||
"interruptInUseConnections": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": {}
|
||||
},
|
||||
{
|
||||
"connectionClosedEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedOutEvent": {}
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": {}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"outcome": [{
|
||||
"collectionName": "interruptInUse",
|
||||
"databaseName": "sdam-tests",
|
||||
"documents": [{ "_id": 1 }]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"description": "Error returned from connection pool clear with interruptInUseConnections=true is retryable for write",
|
||||
"operations": [
|
||||
{
|
||||
"name": "createEntities",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"entities": [
|
||||
{
|
||||
"client": {
|
||||
"id": "client",
|
||||
"useMultipleMongoses": false,
|
||||
"uriOptions": {
|
||||
"connectTimeoutMS": 500,
|
||||
"heartbeatFrequencyMS": 500,
|
||||
"appname": "interruptInUseRetryableWrite",
|
||||
"retryWrites": true,
|
||||
"minPoolSize": 0
|
||||
},
|
||||
"observeEvents": [
|
||||
"poolClearedEvent",
|
||||
"connectionClosedEvent",
|
||||
"commandFailedEvent",
|
||||
"commandStartedEvent",
|
||||
"commandSucceededEvent",
|
||||
"connectionCheckedOutEvent",
|
||||
"connectionCheckedInEvent"
|
||||
]}
|
||||
},
|
||||
{
|
||||
"database": {
|
||||
"id": "database",
|
||||
"client": "client",
|
||||
"databaseName": "sdam-tests"
|
||||
}
|
||||
},
|
||||
{
|
||||
"collection": {
|
||||
"id": "collection",
|
||||
"database": "database",
|
||||
"collectionName": "interruptInUse"
|
||||
}
|
||||
},
|
||||
{
|
||||
"thread": {
|
||||
"id": "thread1"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "insertOne",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"document": { "_id": 1 }
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "runOnThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1",
|
||||
"operation": {
|
||||
"name": "updateOne",
|
||||
"object": "collection",
|
||||
"arguments": {
|
||||
"filter": { "$where": "sleep(2000) || true" },
|
||||
"update": [ { "$set": { "a": "bar" } } ]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "failPoint",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"failPoint": {
|
||||
"configureFailPoint": "failCommand",
|
||||
"mode": {
|
||||
"times": 4
|
||||
},
|
||||
"data": {
|
||||
"failCommands": [
|
||||
"hello", "isMaster"
|
||||
],
|
||||
"blockConnection": true,
|
||||
"blockTimeMS": 1500,
|
||||
"appName": "interruptInUseRetryableWrite"
|
||||
}
|
||||
},
|
||||
"client": "setupClient"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "waitForThread",
|
||||
"object": "testRunner",
|
||||
"arguments": {
|
||||
"thread": "thread1"
|
||||
}
|
||||
}
|
||||
],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "command",
|
||||
"events": [
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "insert"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "update"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandFailedEvent": {
|
||||
"commandName": "update"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandStartedEvent": {
|
||||
"commandName": "update"
|
||||
}
|
||||
},
|
||||
{
|
||||
"commandSucceededEvent": {
|
||||
"commandName": "update"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"client": "client",
|
||||
"eventType": "cmap",
|
||||
"events": [
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedOutEvent": { }
|
||||
},
|
||||
{
|
||||
"poolClearedEvent": {
|
||||
"interruptInUseConnections": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": {}
|
||||
},
|
||||
{
|
||||
"connectionClosedEvent": { }
|
||||
},
|
||||
{
|
||||
"connectionCheckedOutEvent": {}
|
||||
},
|
||||
{
|
||||
"connectionCheckedInEvent": {}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"outcome": [{
|
||||
"collectionName": "interruptInUse",
|
||||
"databaseName": "sdam-tests",
|
||||
"documents": [{ "_id": 1, "a" : "bar"}]
|
||||
}]
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -623,7 +623,8 @@ class TestClient(IntegrationTest):
|
||||
client = rs_or_single_client(minPoolSize=10)
|
||||
server = client._get_topology().select_server(readable_server_selector)
|
||||
wait_until(
|
||||
lambda: len(server._pool.conns) == 10, "pool initialized with 10 connections"
|
||||
lambda: len(server._pool.conns) == 10,
|
||||
"pool initialized with 10 connections",
|
||||
)
|
||||
|
||||
# Assert that if a socket is closed, a new one takes its place
|
||||
|
||||
@ -144,7 +144,10 @@ class TestCMAP(IntegrationTest):
|
||||
|
||||
def clear(self, op):
|
||||
"""Run the 'clear' operation."""
|
||||
self.pool.reset()
|
||||
if "interruptInUseConnections" in op:
|
||||
self.pool.reset(interrupt_connections=op["interruptInUseConnections"])
|
||||
else:
|
||||
self.pool.reset()
|
||||
|
||||
def close(self, op):
|
||||
"""Run the 'close' operation."""
|
||||
@ -173,6 +176,8 @@ class TestCMAP(IntegrationTest):
|
||||
if attr == "type":
|
||||
continue
|
||||
c2s = camel_to_snake(attr)
|
||||
if c2s == "interrupt_in_use_connections":
|
||||
c2s = "interrupt_connections"
|
||||
actual_val = getattr(actual, c2s)
|
||||
if expected_val == 42:
|
||||
self.assertIsNotNone(actual_val)
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
{
|
||||
"description": "expectedCmapEvent-poolClearedEvent-interruptInUseConnections-type",
|
||||
"schemaVersion": "1.11",
|
||||
"tests": [
|
||||
{
|
||||
"description": "foo",
|
||||
"operations": [],
|
||||
"expectEvents": [
|
||||
{
|
||||
"client": "client0",
|
||||
"eventType": "cmap",
|
||||
"events": [
|
||||
{
|
||||
"poolClearedEvent": {
|
||||
"interruptInUseConnections": "foo"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -779,6 +779,14 @@ class MatchEvaluatorUtil:
|
||||
else:
|
||||
self.test.assertIsNone(actual.service_id)
|
||||
|
||||
def assertHasInterruptInUseConnections(self, spec, actual):
|
||||
if "interruptInUseConnections" in spec:
|
||||
self.test.assertEqual(
|
||||
spec.get("interruptInUseConnections"), actual.interrupt_connections
|
||||
)
|
||||
else:
|
||||
self.test.assertIsInstance(actual.interrupt_connections, bool)
|
||||
|
||||
def assertHasServerConnectionId(self, spec, actual):
|
||||
if "hasServerConnectionId" in spec:
|
||||
if spec.get("hasServerConnectionId"):
|
||||
@ -836,6 +844,7 @@ class MatchEvaluatorUtil:
|
||||
elif name == "poolClearedEvent":
|
||||
self.test.assertIsInstance(actual, PoolClearedEvent)
|
||||
self.assertHasServiceId(spec, actual)
|
||||
self.assertHasInterruptInUseConnections(spec, actual)
|
||||
elif name == "poolClosedEvent":
|
||||
self.test.assertIsInstance(actual, PoolClosedEvent)
|
||||
elif name == "connectionCreatedEvent":
|
||||
|
||||
@ -319,7 +319,7 @@ class MockPool:
|
||||
def ready(self):
|
||||
pass
|
||||
|
||||
def reset(self, service_id=None):
|
||||
def reset(self, service_id=None, interrupt_connections=False):
|
||||
self._reset()
|
||||
|
||||
def reset_without_pause(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user