diff --git a/doc/changelog.rst b/doc/changelog.rst index 43f3fc3fa..fb07aad6b 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -32,6 +32,10 @@ PyMongo 4.7 brings a number of improvements including: - Fixed a bug appearing in Python 3.12 where "RuntimeError: can't create new thread at interpreter shutdown" could be written to stderr when a MongoClient's thread starts as the python interpreter is shutting down. +- Added the :attr:`pymongo.monitoring.ConnectionCheckedOutEvent.duration`, + :attr:`pymongo.monitoring.ConnectionCheckOutFailedEvent.duration`, and + :attr:`pymongo.monitoring.ConnectionReadyEvent.duration` properties. + Unavoidable breaking changes ............................ diff --git a/pymongo/logger.py b/pymongo/logger.py index 25b3d0262..472a18a52 100644 --- a/pymongo/logger.py +++ b/pymongo/logger.py @@ -128,7 +128,7 @@ class LogMessage: if self._redacted: return self._kwargs = {k: v for k, v in self._kwargs.items() if v is not None} - if "durationMS" in self._kwargs: + if "durationMS" in self._kwargs and hasattr(self._kwargs["durationMS"], "total_seconds"): self._kwargs["durationMS"] = self._kwargs["durationMS"].total_seconds() * 1000 if "serviceId" in self._kwargs: self._kwargs["serviceId"] = str(self._kwargs["serviceId"]) diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 07c7f5f47..aff11a9f4 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -1002,6 +1002,27 @@ class _ConnectionIdEvent(_ConnectionEvent): return f"{self.__class__.__name__}({self.address!r}, {self.__connection_id!r})" +class _ConnectionDurationEvent(_ConnectionIdEvent): + """Private base class for connection events with a duration.""" + + __slots__ = ("__duration",) + + def __init__(self, address: _Address, connection_id: int, duration: Optional[float]) -> None: + super().__init__(address, connection_id) + self.__duration = duration + + @property + def duration(self) -> Optional[float]: + """The duration of the connection event. + + .. versionadded:: 4.7 + """ + return self.__duration + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.address!r}, {self.connection_id!r}, {self.__duration!r})" + + class ConnectionCreatedEvent(_ConnectionIdEvent): """Published when a Connection Pool creates a Connection object. @@ -1018,7 +1039,7 @@ class ConnectionCreatedEvent(_ConnectionIdEvent): __slots__ = () -class ConnectionReadyEvent(_ConnectionIdEvent): +class ConnectionReadyEvent(_ConnectionDurationEvent): """Published when a Connection has finished its setup, and is ready to use. :param address: The address (host, port) pair of the server this @@ -1078,7 +1099,7 @@ class ConnectionCheckOutStartedEvent(_ConnectionEvent): __slots__ = () -class ConnectionCheckOutFailedEvent(_ConnectionEvent): +class ConnectionCheckOutFailedEvent(_ConnectionDurationEvent): """Published when the driver's attempt to check out a connection fails. :param address: The address (host, port) pair of the server this @@ -1090,8 +1111,8 @@ class ConnectionCheckOutFailedEvent(_ConnectionEvent): __slots__ = ("__reason",) - def __init__(self, address: _Address, reason: str) -> None: - super().__init__(address) + def __init__(self, address: _Address, reason: str, duration: Optional[float]) -> None: + super().__init__(address=address, connection_id=0, duration=duration) self.__reason = reason @property @@ -1104,10 +1125,10 @@ class ConnectionCheckOutFailedEvent(_ConnectionEvent): return self.__reason def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.address!r}, {self.__reason!r})" + return f"{self.__class__.__name__}({self.address!r}, {self.__reason!r}, {self.duration!r})" -class ConnectionCheckedOutEvent(_ConnectionIdEvent): +class ConnectionCheckedOutEvent(_ConnectionDurationEvent): """Published when the driver successfully checks out a connection. :param address: The address (host, port) pair of the server this @@ -1824,9 +1845,11 @@ class _EventListeners: except Exception: _handle_exception() - def publish_connection_ready(self, address: _Address, connection_id: int) -> None: + def publish_connection_ready( + self, address: _Address, connection_id: int, duration: float + ) -> None: """Publish a :class:`ConnectionReadyEvent` to all connection listeners.""" - event = ConnectionReadyEvent(address, connection_id) + event = ConnectionReadyEvent(address, connection_id, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_ready(event) @@ -1855,22 +1878,26 @@ class _EventListeners: except Exception: _handle_exception() - def publish_connection_check_out_failed(self, address: _Address, reason: str) -> None: + def publish_connection_check_out_failed( + self, address: _Address, reason: str, duration: float + ) -> None: """Publish a :class:`ConnectionCheckOutFailedEvent` to all connection listeners. """ - event = ConnectionCheckOutFailedEvent(address, reason) + event = ConnectionCheckOutFailedEvent(address, reason, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_check_out_failed(event) except Exception: _handle_exception() - def publish_connection_checked_out(self, address: _Address, connection_id: int) -> None: + def publish_connection_checked_out( + self, address: _Address, connection_id: int, duration: float + ) -> None: """Publish a :class:`ConnectionCheckedOutEvent` to all connection listeners. """ - event = ConnectionCheckedOutEvent(address, connection_id) + event = ConnectionCheckedOutEvent(address, connection_id, duration) for subscriber in self.__cmap_listeners: try: subscriber.connection_checked_out(event) diff --git a/pymongo/pool.py b/pymongo/pool.py index 0ece6084c..6a8cb54b9 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -760,6 +760,7 @@ class Connection: self.last_timeout = self.opts.socket_timeout self.connect_rtt = 0.0 self._client_id = pool._client_id + self.creation_time = time.monotonic() def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -1093,7 +1094,8 @@ class Connection: self.ready = True if self.enabled_for_cmap: assert self.listeners is not None - self.listeners.publish_connection_ready(self.address, self.id) + duration = time.monotonic() - self.creation_time + self.listeners.publish_connection_ready(self.address, self.id, duration) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, @@ -1102,6 +1104,7 @@ class Connection: serverHost=self.address[0], serverPort=self.address[1], driverConnectionId=self.id, + durationMS=duration, ) def validate_session( @@ -1739,6 +1742,7 @@ class Pool: :param handler: A _MongoClientErrorHandler. """ listeners = self.opts._event_listeners + checkout_started_time = time.monotonic() if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) @@ -1751,11 +1755,12 @@ class Pool: serverPort=self.address[1], ) - conn = self._get_conn(handler=handler) + conn = self._get_conn(checkout_started_time, handler=handler) if self.enabled_for_cmap: assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id) + duration = time.monotonic() - checkout_started_time + listeners.publish_connection_checked_out(self.address, conn.id, duration) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, @@ -1764,6 +1769,7 @@ class Pool: serverHost=self.address[0], serverPort=self.address[1], driverConnectionId=conn.id, + durationMS=duration, ) try: with self.lock: @@ -1794,12 +1800,13 @@ class Pool: elif conn.active: self.checkin(conn) - def _raise_if_not_ready(self, emit_event: bool) -> None: + def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: if self.enabled_for_cmap and emit_event: assert self.opts._event_listeners is not None + duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration ) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -1810,6 +1817,7 @@ class Pool: serverPort=self.address[1], reason="An error occurred while trying to establish a new connection", error=ConnectionCheckOutFailedReason.CONN_ERROR, + durationMS=duration, ) details = _get_timeout_details(self.opts) @@ -1817,7 +1825,9 @@ class Pool: self.address, AutoReconnect("connection pool paused"), timeout_details=details ) - def _get_conn(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: + def _get_conn( + self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None + ) -> Connection: """Get or create a Connection. Can raise ConnectionFailure.""" # We use the pid here to avoid issues with fork / multiprocessing. # See test.test_client:TestClient.test_fork for an example of @@ -1828,8 +1838,9 @@ class Pool: if self.closed: if self.enabled_for_cmap: assert self.opts._event_listeners is not None + duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.POOL_CLOSED + self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -1840,6 +1851,7 @@ class Pool: serverPort=self.address[1], reason="Connection pool was closed", error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" @@ -1857,15 +1869,15 @@ class Pool: deadline = None with self.size_cond: - self._raise_if_not_ready(emit_event=True) + self._raise_if_not_ready(checkout_started_time, emit_event=True) while not (self.requests < self.max_pool_size): if not _cond_wait(self.size_cond, deadline): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. if self.requests < self.max_pool_size: self.size_cond.notify() - self._raise_wait_queue_timeout() - self._raise_if_not_ready(emit_event=True) + self._raise_wait_queue_timeout(checkout_started_time) + self._raise_if_not_ready(checkout_started_time, emit_event=True) self.requests += 1 # We've now acquired the semaphore and must release it on error. @@ -1880,7 +1892,7 @@ class Pool: # CMAP: we MUST wait for either maxConnecting OR for a socket # to be checked back into the pool. with self._max_connecting_cond: - self._raise_if_not_ready(emit_event=False) + self._raise_if_not_ready(checkout_started_time, emit_event=False) while not (self.conns or self._pending < self._max_connecting): if not _cond_wait(self._max_connecting_cond, deadline): # Timed out, notify the next thread to ensure a @@ -1888,8 +1900,8 @@ class Pool: if self.conns or self._pending < self._max_connecting: self._max_connecting_cond.notify() emitted_event = True - self._raise_wait_queue_timeout() - self._raise_if_not_ready(emit_event=False) + self._raise_wait_queue_timeout(checkout_started_time) + self._raise_if_not_ready(checkout_started_time, emit_event=False) try: conn = self.conns.popleft() @@ -1918,8 +1930,9 @@ class Pool: if self.enabled_for_cmap and not emitted_event: assert self.opts._event_listeners is not None + duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration ) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -1930,6 +1943,7 @@ class Pool: serverPort=self.address[1], reason="An error occurred while trying to establish a new connection", error=ConnectionCheckOutFailedReason.CONN_ERROR, + durationMS=duration, ) raise @@ -2044,12 +2058,13 @@ class Pool: return False - def _raise_wait_queue_timeout(self) -> NoReturn: + def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners if self.enabled_for_cmap: assert listeners is not None + duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.TIMEOUT + self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -2060,6 +2075,7 @@ class Pool: serverPort=self.address[1], reason="Wait queue timeout elapsed without a connection becoming available", error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 5b3f8c1d4..bfbdbe863 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -140,6 +140,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -162,6 +169,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -222,6 +236,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -468,6 +489,13 @@ "reason": "An error occurred while trying to establish a new connection", "error": { "$$exists": true + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -492,4 +520,4 @@ ] } ] -} +} \ No newline at end of file diff --git a/test/connection_logging/connection-pool-options.json b/test/connection_logging/connection-pool-options.json index 8b79b504f..21520bf31 100644 --- a/test/connection_logging/connection-pool-options.json +++ b/test/connection_logging/connection-pool-options.json @@ -128,6 +128,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } } @@ -450,4 +457,4 @@ ] } ] -} +} \ No newline at end of file diff --git a/test/test_connection_monitoring.py b/test/test_connection_monitoring.py index ead1d033f..ee0655388 100644 --- a/test/test_connection_monitoring.py +++ b/test/test_connection_monitoring.py @@ -426,13 +426,15 @@ class TestCMAP(IntegrationTest): def test_events_repr(self): host = ("localhost", 27017) self.assertRepr(ConnectionCheckedInEvent(host, 1)) - self.assertRepr(ConnectionCheckedOutEvent(host, 1)) + self.assertRepr(ConnectionCheckedOutEvent(host, 1, time.monotonic())) self.assertRepr( - ConnectionCheckOutFailedEvent(host, ConnectionCheckOutFailedReason.POOL_CLOSED) + ConnectionCheckOutFailedEvent( + host, ConnectionCheckOutFailedReason.POOL_CLOSED, time.monotonic() + ) ) self.assertRepr(ConnectionClosedEvent(host, 1, ConnectionClosedReason.POOL_CLOSED)) self.assertRepr(ConnectionCreatedEvent(host, 1)) - self.assertRepr(ConnectionReadyEvent(host, 1)) + self.assertRepr(ConnectionReadyEvent(host, 1, time.monotonic())) self.assertRepr(ConnectionCheckOutStartedEvent(host)) self.assertRepr(PoolCreatedEvent(host, {})) self.assertRepr(PoolClearedEvent(host))