PYTHON-3841 Add durations to connection pool events (#1538)

This commit is contained in:
Noah Stapp 2024-03-04 11:55:00 -08:00 committed by GitHub
parent 7da5688d00
commit af2d56c5b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 118 additions and 34 deletions

View File

@ -32,6 +32,10 @@ PyMongo 4.7 brings a number of improvements including:
- Fixed a bug appearing in Python 3.12 where "RuntimeError: can't create new thread at interpreter shutdown"
could be written to stderr when a MongoClient's thread starts as the python interpreter is shutting down.
- Added the :attr:`pymongo.monitoring.ConnectionCheckedOutEvent.duration`,
:attr:`pymongo.monitoring.ConnectionCheckOutFailedEvent.duration`, and
:attr:`pymongo.monitoring.ConnectionReadyEvent.duration` properties.
Unavoidable breaking changes
............................

View File

@ -128,7 +128,7 @@ class LogMessage:
if self._redacted:
return
self._kwargs = {k: v for k, v in self._kwargs.items() if v is not None}
if "durationMS" in self._kwargs:
if "durationMS" in self._kwargs and hasattr(self._kwargs["durationMS"], "total_seconds"):
self._kwargs["durationMS"] = self._kwargs["durationMS"].total_seconds() * 1000
if "serviceId" in self._kwargs:
self._kwargs["serviceId"] = str(self._kwargs["serviceId"])

View File

@ -1002,6 +1002,27 @@ class _ConnectionIdEvent(_ConnectionEvent):
return f"{self.__class__.__name__}({self.address!r}, {self.__connection_id!r})"
class _ConnectionDurationEvent(_ConnectionIdEvent):
"""Private base class for connection events with a duration."""
__slots__ = ("__duration",)
def __init__(self, address: _Address, connection_id: int, duration: Optional[float]) -> None:
super().__init__(address, connection_id)
self.__duration = duration
@property
def duration(self) -> Optional[float]:
"""The duration of the connection event.
.. versionadded:: 4.7
"""
return self.__duration
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.address!r}, {self.connection_id!r}, {self.__duration!r})"
class ConnectionCreatedEvent(_ConnectionIdEvent):
"""Published when a Connection Pool creates a Connection object.
@ -1018,7 +1039,7 @@ class ConnectionCreatedEvent(_ConnectionIdEvent):
__slots__ = ()
class ConnectionReadyEvent(_ConnectionIdEvent):
class ConnectionReadyEvent(_ConnectionDurationEvent):
"""Published when a Connection has finished its setup, and is ready to use.
:param address: The address (host, port) pair of the server this
@ -1078,7 +1099,7 @@ class ConnectionCheckOutStartedEvent(_ConnectionEvent):
__slots__ = ()
class ConnectionCheckOutFailedEvent(_ConnectionEvent):
class ConnectionCheckOutFailedEvent(_ConnectionDurationEvent):
"""Published when the driver's attempt to check out a connection fails.
:param address: The address (host, port) pair of the server this
@ -1090,8 +1111,8 @@ class ConnectionCheckOutFailedEvent(_ConnectionEvent):
__slots__ = ("__reason",)
def __init__(self, address: _Address, reason: str) -> None:
super().__init__(address)
def __init__(self, address: _Address, reason: str, duration: Optional[float]) -> None:
super().__init__(address=address, connection_id=0, duration=duration)
self.__reason = reason
@property
@ -1104,10 +1125,10 @@ class ConnectionCheckOutFailedEvent(_ConnectionEvent):
return self.__reason
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.address!r}, {self.__reason!r})"
return f"{self.__class__.__name__}({self.address!r}, {self.__reason!r}, {self.duration!r})"
class ConnectionCheckedOutEvent(_ConnectionIdEvent):
class ConnectionCheckedOutEvent(_ConnectionDurationEvent):
"""Published when the driver successfully checks out a connection.
:param address: The address (host, port) pair of the server this
@ -1824,9 +1845,11 @@ class _EventListeners:
except Exception:
_handle_exception()
def publish_connection_ready(self, address: _Address, connection_id: int) -> None:
def publish_connection_ready(
self, address: _Address, connection_id: int, duration: float
) -> None:
"""Publish a :class:`ConnectionReadyEvent` to all connection listeners."""
event = ConnectionReadyEvent(address, connection_id)
event = ConnectionReadyEvent(address, connection_id, duration)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_ready(event)
@ -1855,22 +1878,26 @@ class _EventListeners:
except Exception:
_handle_exception()
def publish_connection_check_out_failed(self, address: _Address, reason: str) -> None:
def publish_connection_check_out_failed(
self, address: _Address, reason: str, duration: float
) -> None:
"""Publish a :class:`ConnectionCheckOutFailedEvent` to all connection
listeners.
"""
event = ConnectionCheckOutFailedEvent(address, reason)
event = ConnectionCheckOutFailedEvent(address, reason, duration)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_check_out_failed(event)
except Exception:
_handle_exception()
def publish_connection_checked_out(self, address: _Address, connection_id: int) -> None:
def publish_connection_checked_out(
self, address: _Address, connection_id: int, duration: float
) -> None:
"""Publish a :class:`ConnectionCheckedOutEvent` to all connection
listeners.
"""
event = ConnectionCheckedOutEvent(address, connection_id)
event = ConnectionCheckedOutEvent(address, connection_id, duration)
for subscriber in self.__cmap_listeners:
try:
subscriber.connection_checked_out(event)

View File

@ -760,6 +760,7 @@ class Connection:
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -1093,7 +1094,8 @@ class Connection:
self.ready = True
if self.enabled_for_cmap:
assert self.listeners is not None
self.listeners.publish_connection_ready(self.address, self.id)
duration = time.monotonic() - self.creation_time
self.listeners.publish_connection_ready(self.address, self.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -1102,6 +1104,7 @@ class Connection:
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=self.id,
durationMS=duration,
)
def validate_session(
@ -1739,6 +1742,7 @@ class Pool:
:param handler: A _MongoClientErrorHandler.
"""
listeners = self.opts._event_listeners
checkout_started_time = time.monotonic()
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_check_out_started(self.address)
@ -1751,11 +1755,12 @@ class Pool:
serverPort=self.address[1],
)
conn = self._get_conn(handler=handler)
conn = self._get_conn(checkout_started_time, handler=handler)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_connection_checked_out(self.address, conn.id)
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_checked_out(self.address, conn.id, duration)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -1764,6 +1769,7 @@ class Pool:
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn.id,
durationMS=duration,
)
try:
with self.lock:
@ -1794,12 +1800,13 @@ class Pool:
elif conn.active:
self.checkin(conn)
def _raise_if_not_ready(self, emit_event: bool) -> None:
def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None:
if self.state != PoolState.READY:
if self.enabled_for_cmap and emit_event:
assert self.opts._event_listeners is not None
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
@ -1810,6 +1817,7 @@ class Pool:
serverPort=self.address[1],
reason="An error occurred while trying to establish a new connection",
error=ConnectionCheckOutFailedReason.CONN_ERROR,
durationMS=duration,
)
details = _get_timeout_details(self.opts)
@ -1817,7 +1825,9 @@ class Pool:
self.address, AutoReconnect("connection pool paused"), timeout_details=details
)
def _get_conn(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection:
def _get_conn(
self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None
) -> Connection:
"""Get or create a Connection. Can raise ConnectionFailure."""
# We use the pid here to avoid issues with fork / multiprocessing.
# See test.test_client:TestClient.test_fork for an example of
@ -1828,8 +1838,9 @@ class Pool:
if self.closed:
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED
self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
@ -1840,6 +1851,7 @@ class Pool:
serverPort=self.address[1],
reason="Connection pool was closed",
error=ConnectionCheckOutFailedReason.POOL_CLOSED,
durationMS=duration,
)
raise _PoolClosedError(
"Attempted to check out a connection from closed connection pool"
@ -1857,15 +1869,15 @@ class Pool:
deadline = None
with self.size_cond:
self._raise_if_not_ready(emit_event=True)
self._raise_if_not_ready(checkout_started_time, emit_event=True)
while not (self.requests < self.max_pool_size):
if not _cond_wait(self.size_cond, deadline):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.requests < self.max_pool_size:
self.size_cond.notify()
self._raise_wait_queue_timeout()
self._raise_if_not_ready(emit_event=True)
self._raise_wait_queue_timeout(checkout_started_time)
self._raise_if_not_ready(checkout_started_time, emit_event=True)
self.requests += 1
# We've now acquired the semaphore and must release it on error.
@ -1880,7 +1892,7 @@ class Pool:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
with self._max_connecting_cond:
self._raise_if_not_ready(emit_event=False)
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self._max_connecting):
if not _cond_wait(self._max_connecting_cond, deadline):
# Timed out, notify the next thread to ensure a
@ -1888,8 +1900,8 @@ class Pool:
if self.conns or self._pending < self._max_connecting:
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout()
self._raise_if_not_ready(emit_event=False)
self._raise_wait_queue_timeout(checkout_started_time)
self._raise_if_not_ready(checkout_started_time, emit_event=False)
try:
conn = self.conns.popleft()
@ -1918,8 +1930,9 @@ class Pool:
if self.enabled_for_cmap and not emitted_event:
assert self.opts._event_listeners is not None
duration = time.monotonic() - checkout_started_time
self.opts._event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR
self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
@ -1930,6 +1943,7 @@ class Pool:
serverPort=self.address[1],
reason="An error occurred while trying to establish a new connection",
error=ConnectionCheckOutFailedReason.CONN_ERROR,
durationMS=duration,
)
raise
@ -2044,12 +2058,13 @@ class Pool:
return False
def _raise_wait_queue_timeout(self) -> NoReturn:
def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn:
listeners = self.opts._event_listeners
if self.enabled_for_cmap:
assert listeners is not None
duration = time.monotonic() - checkout_started_time
listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.TIMEOUT
self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration
)
if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
@ -2060,6 +2075,7 @@ class Pool:
serverPort=self.address[1],
reason="Wait queue timeout elapsed without a connection becoming available",
error=ConnectionCheckOutFailedReason.TIMEOUT,
durationMS=duration,
)
timeout = _csot.get_timeout() or self.opts.wait_queue_timeout
if self.opts.load_balanced:

View File

@ -140,6 +140,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
@ -162,6 +169,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
@ -222,6 +236,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
@ -468,6 +489,13 @@
"reason": "An error occurred while trying to establish a new connection",
"error": {
"$$exists": true
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
@ -492,4 +520,4 @@
]
}
]
}
}

View File

@ -128,6 +128,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
@ -450,4 +457,4 @@
]
}
]
}
}

View File

@ -426,13 +426,15 @@ class TestCMAP(IntegrationTest):
def test_events_repr(self):
host = ("localhost", 27017)
self.assertRepr(ConnectionCheckedInEvent(host, 1))
self.assertRepr(ConnectionCheckedOutEvent(host, 1))
self.assertRepr(ConnectionCheckedOutEvent(host, 1, time.monotonic()))
self.assertRepr(
ConnectionCheckOutFailedEvent(host, ConnectionCheckOutFailedReason.POOL_CLOSED)
ConnectionCheckOutFailedEvent(
host, ConnectionCheckOutFailedReason.POOL_CLOSED, time.monotonic()
)
)
self.assertRepr(ConnectionClosedEvent(host, 1, ConnectionClosedReason.POOL_CLOSED))
self.assertRepr(ConnectionCreatedEvent(host, 1))
self.assertRepr(ConnectionReadyEvent(host, 1))
self.assertRepr(ConnectionReadyEvent(host, 1, time.monotonic()))
self.assertRepr(ConnectionCheckOutStartedEvent(host))
self.assertRepr(PoolCreatedEvent(host, {}))
self.assertRepr(PoolClearedEvent(host))