diff --git a/.github/workflows/release-python.yml b/.github/workflows/release-python.yml index 8ce4eaa84..5643ee1e3 100644 --- a/.github/workflows/release-python.yml +++ b/.github/workflows/release-python.yml @@ -19,6 +19,7 @@ env: PRODUCT_NAME: PyMongo # Changes per branch SILK_ASSET_GROUP: mongodb-python-driver + EVERGREEN_PROJECT: mongodb-python-driver defaults: run: @@ -55,7 +56,7 @@ jobs: needs: [pre-publish] uses: ./.github/workflows/codeql.yml with: - ref: ${{ inputs.version }} + ref: ${{ github.ref }} publish: needs: [build-dist, static-scan] @@ -82,5 +83,6 @@ jobs: following_version: ${{ inputs.following_version }} product_name: ${{ env.PRODUCT_NAME }} silk_asset_group: ${{ env.SILK_ASSET_GROUP }} + evergreen_project: ${{ env.EVERGREEN_PROJECT }} token: ${{ github.token }} dry_run: ${{ inputs.dry_run }} diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 2b7420bbc..7ec1c18c5 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -313,9 +313,10 @@ def _handle_reauth(func: F) -> F: return cast(F, inner) -async def anext(cls: Any) -> Any: - """Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext.""" - if sys.version_info >= (3, 10): - return await builtins.anext(cls) - else: +if sys.version_info >= (3, 10): + anext = builtins.anext +else: + + async def anext(cls: Any) -> Any: + """Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext.""" return await cls.__anext__() diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 75e151d95..c743c569d 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -112,7 +112,6 @@ from pymongo.server_type import SERVER_TYPE from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern if TYPE_CHECKING: - import sys from types import TracebackType from bson.objectid import ObjectId @@ -126,11 +125,6 @@ if TYPE_CHECKING: from pymongo.asynchronous.server_selectors import Selection from pymongo.read_concern import ReadConcern - if sys.version_info[:2] >= (3, 9): - pass - else: - # Deprecated since version 3.9: collections.abc.Generator now supports []. - pass T = TypeVar("T") diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 3cd7485a9..27597a6db 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -194,14 +194,9 @@ else: _METADATA: dict[str, Any] = {"driver": {"name": "PyMongo", "version": __version__}} if sys.platform.startswith("linux"): - # platform.linux_distribution was deprecated in Python 3.5 - # and removed in Python 3.8. Starting in Python 3.5 it - # raises DeprecationWarning - # DeprecationWarning: dist() and linux_distribution() functions are deprecated in Python 3.5 - _name = platform.system() _METADATA["os"] = { - "type": _name, - "name": _name, + "type": platform.system(), + "name": platform.system(), "architecture": platform.machine(), # Kernel version (e.g. 4.4.0-17-generic). "version": platform.release(), @@ -762,6 +757,7 @@ class Connection: self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_logging self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1125,20 +1121,20 @@ class Connection: await auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True + duration = time.monotonic() - self.creation_time if self.enabled_for_cmap: assert self.listeners is not None - duration = time.monotonic() - self.creation_time self.listeners.publish_connection_ready(self.address, self.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_READY, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_READY, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=self.id, + durationMS=duration, + ) def validate_session( self, client: Optional[AsyncMongoClient], session: Optional[ClientSession] @@ -1158,10 +1154,11 @@ class Connection: if self.closed: return self._close_conn() - if reason and self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if reason: + if self.enabled_for_cmap: + assert self.listeners is not None + self.listeners.publish_connection_closed(self.address, self.id, reason) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1473,12 +1470,13 @@ class Pool: self.address = address self.opts = options self.handshake = handshake - # Don't publish events in Monitor pools. + # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( self.handshake and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) + self.enabled_for_logging = self.handshake # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1500,15 +1498,15 @@ class Pool: self.opts._event_listeners.publish_pool_created( self.address, self.opts.non_default_options ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + **self.opts.non_default_options, + ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -1526,14 +1524,14 @@ class Pool: if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_ready(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_READY, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_READY, + serverHost=self.address[0], + serverPort=self.address[1], + ) @property def closed(self) -> bool: @@ -1591,23 +1589,24 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - ) - else: - if old_state != PoolState.PAUSED and self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + else: + if old_state != PoolState.PAUSED: + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1717,15 +1716,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_created(self.address, conn_id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + ) try: sock = await _configured_socket(self.address, self.opts) @@ -1735,17 +1734,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn_id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1793,31 +1792,31 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_STARTED, + serverHost=self.address[0], + serverPort=self.address[1], + ) conn = await self._get_conn(checkout_started_time, handler=handler) + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_checked_out(self.address, conn.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) try: async with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1849,13 +1848,14 @@ class Pool: def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: - if self.enabled_for_cmap and emit_event: - assert self.opts._event_listeners is not None + if emit_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1883,23 +1883,23 @@ class Pool: await self.reset_without_pause() if self.closed: + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert self.opts._event_listeners is not None - duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Connection pool was closed", + error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1975,13 +1975,14 @@ class Pool: self.active_sockets -= 1 self.size_cond.notify() - if self.enabled_for_cmap and not emitted_event: - assert self.opts._event_listeners is not None + if not emitted_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -2014,15 +2015,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKEDIN, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKEDIN, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + ) if self.pid != os.getpid(): await self.reset_without_pause() else: @@ -2035,17 +2036,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn.id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) else: async with self.lock: # Hold the lock to ensure this section does not race with @@ -2107,23 +2108,23 @@ class Pool: def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Wait queue timeout elapsed without a connection becoming available", + error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 892d6a93e..3f0df2a3c 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -313,9 +313,10 @@ def _handle_reauth(func: F) -> F: return cast(F, inner) -def next(cls: Any) -> Any: - """Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext.""" - if sys.version_info >= (3, 10): - return builtins.next(cls) - else: +if sys.version_info >= (3, 10): + next = builtins.next +else: + + def next(cls: Any) -> Any: + """Compatibility function until we drop 3.9 support: https://docs.python.org/3/library/functions.html#anext.""" return cls.__next__() diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index ddc9ae9a2..31b9cbc5f 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -111,7 +111,6 @@ from pymongo.synchronous.uri_parser import ( from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern if TYPE_CHECKING: - import sys from types import TracebackType from bson.objectid import ObjectId @@ -125,11 +124,6 @@ if TYPE_CHECKING: from pymongo.synchronous.server import Server from pymongo.synchronous.server_selectors import Selection - if sys.version_info[:2] >= (3, 9): - pass - else: - # Deprecated since version 3.9: collections.abc.Generator now supports []. - pass T = TypeVar("T") diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index b71cec220..f77f78cd7 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -194,14 +194,9 @@ else: _METADATA: dict[str, Any] = {"driver": {"name": "PyMongo", "version": __version__}} if sys.platform.startswith("linux"): - # platform.linux_distribution was deprecated in Python 3.5 - # and removed in Python 3.8. Starting in Python 3.5 it - # raises DeprecationWarning - # DeprecationWarning: dist() and linux_distribution() functions are deprecated in Python 3.5 - _name = platform.system() _METADATA["os"] = { - "type": _name, - "name": _name, + "type": platform.system(), + "name": platform.system(), "architecture": platform.machine(), # Kernel version (e.g. 4.4.0-17-generic). "version": platform.release(), @@ -762,6 +757,7 @@ class Connection: self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_logging self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -1125,20 +1121,20 @@ class Connection: auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True + duration = time.monotonic() - self.creation_time if self.enabled_for_cmap: assert self.listeners is not None - duration = time.monotonic() - self.creation_time self.listeners.publish_connection_ready(self.address, self.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_READY, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_READY, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=self.id, + durationMS=duration, + ) def validate_session( self, client: Optional[MongoClient], session: Optional[ClientSession] @@ -1156,10 +1152,11 @@ class Connection: if self.closed: return self._close_conn() - if reason and self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if reason: + if self.enabled_for_cmap: + assert self.listeners is not None + self.listeners.publish_connection_closed(self.address, self.id, reason) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1469,12 +1466,13 @@ class Pool: self.address = address self.opts = options self.handshake = handshake - # Don't publish events in Monitor pools. + # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( self.handshake and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) + self.enabled_for_logging = self.handshake # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1496,15 +1494,15 @@ class Pool: self.opts._event_listeners.publish_pool_created( self.address, self.opts.non_default_options ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + **self.opts.non_default_options, + ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -1522,14 +1520,14 @@ class Pool: if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_ready(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_READY, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_READY, + serverHost=self.address[0], + serverPort=self.address[1], + ) @property def closed(self) -> bool: @@ -1587,23 +1585,24 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - ) - else: - if old_state != PoolState.PAUSED and self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + else: + if old_state != PoolState.PAUSED: + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1711,15 +1710,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_created(self.address, conn_id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + ) try: sock = _configured_socket(self.address, self.opts) @@ -1729,17 +1728,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn_id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1787,31 +1786,31 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_STARTED, + serverHost=self.address[0], + serverPort=self.address[1], + ) conn = self._get_conn(checkout_started_time, handler=handler) + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_checked_out(self.address, conn.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) try: with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1843,13 +1842,14 @@ class Pool: def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: - if self.enabled_for_cmap and emit_event: - assert self.opts._event_listeners is not None + if emit_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1877,23 +1877,23 @@ class Pool: self.reset_without_pause() if self.closed: + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert self.opts._event_listeners is not None - duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Connection pool was closed", + error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1969,13 +1969,14 @@ class Pool: self.active_sockets -= 1 self.size_cond.notify() - if self.enabled_for_cmap and not emitted_event: - assert self.opts._event_listeners is not None + if not emitted_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -2008,15 +2009,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKEDIN, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKEDIN, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + ) if self.pid != os.getpid(): self.reset_without_pause() else: @@ -2029,17 +2030,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn.id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) else: with self.lock: # Hold the lock to ensure this section does not race with @@ -2101,23 +2102,23 @@ class Pool: def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Wait queue timeout elapsed without a connection becoming available", + error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/test/test_logger.py b/test/test_logger.py index d1f84a844..60abcadc4 100644 --- a/test/test_logger.py +++ b/test/test_logger.py @@ -16,6 +16,7 @@ from __future__ import annotations import os from test import unittest from test.test_client import IntegrationTest +from test.utils import single_client from unittest.mock import patch from bson import json_util @@ -82,6 +83,19 @@ class TestLogger(IntegrationTest): self.assertEqual(last_3_bytes, str_to_repeat) + def test_logging_without_listeners(self): + c = single_client() + self.assertEqual(len(c._event_listeners.event_listeners()), 0) + with self.assertLogs("pymongo.connection", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + with self.assertLogs("pymongo.command", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + with self.assertLogs("pymongo.serverSelection", level="DEBUG") as cm: + c.db.test.insert_one({"x": "1"}) + self.assertGreater(len(cm.records), 0) + if __name__ == "__main__": unittest.main() diff --git a/test/unified_format.py b/test/unified_format.py index fe1419c0d..0c200a15a 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -195,14 +195,10 @@ def with_metaclass(meta, *bases): # the actual metaclass. class metaclass(type): def __new__(cls, name, this_bases, d): - if sys.version_info[:2] >= (3, 7): # noqa: UP036 - # This version introduced PEP 560 that requires a bit - # of extra care (we mimic what is done by __build_class__). - resolved_bases = types.resolve_bases(bases) - if resolved_bases is not bases: - d["__orig_bases__"] = bases - else: - resolved_bases = bases + # __orig_bases__ is required by PEP 560. + resolved_bases = types.resolve_bases(bases) + if resolved_bases is not bases: + d["__orig_bases__"] = bases return meta(name, resolved_bases, d) @classmethod