diff --git a/pymongo/pool.py b/pymongo/pool.py new file mode 100644 index 000000000..fbbb70fc6 --- /dev/null +++ b/pymongo/pool.py @@ -0,0 +1,22 @@ +# Copyright 2024-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Re-import of synchronous Pool API for compatibility.""" +from __future__ import annotations + +from pymongo.synchronous.pool import * # noqa: F403 +from pymongo.synchronous.pool import __doc__ as original_doc + +__doc__ = original_doc +__all__ = ["PoolOptions"] # noqa: F405 diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index bbd49339d..94a1d1043 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -14,23 +14,22 @@ from __future__ import annotations +import asyncio import collections import contextlib -import copy +import functools import logging import os -import platform import socket import ssl import sys import threading import time import weakref -from pathlib import Path from typing import ( TYPE_CHECKING, Any, - Iterator, + Generator, Mapping, MutableMapping, NoReturn, @@ -39,21 +38,14 @@ from typing import ( Union, ) -import bson from bson import DEFAULT_CODEC_OPTIONS -from pymongo import __version__, _csot, auth, helpers -from pymongo.client_session import _validate_session_write_concern +from pymongo import _csot, helpers_shared from pymongo.common import ( MAX_BSON_SIZE, - MAX_CONNECTING, - MAX_IDLE_TIME_SEC, MAX_MESSAGE_SIZE, - MAX_POOL_SIZE, MAX_WIRE_VERSION, MAX_WRITE_BATCH_SIZE, - MIN_POOL_SIZE, ORDERED_TYPES, - WAIT_QUEUE_TIMEOUT, ) from pymongo.errors import ( # type:ignore[attr-defined] AutoReconnect, @@ -70,7 +62,6 @@ from pymongo.errors import ( # type:ignore[attr-defined] _CertificateError, ) from pymongo.hello import Hello, HelloCompat -from pymongo.helpers import _handle_reauth from pymongo.lock import _create_lock from pymongo.logger import ( _CONNECTION_LOGGER, @@ -81,33 +72,33 @@ from pymongo.logger import ( from pymongo.monitoring import ( ConnectionCheckOutFailedReason, ConnectionClosedReason, - _EventListeners, ) -from pymongo.network import command, receive_message +from pymongo.network_layer import sendall +from pymongo.pool_options import PoolOptions from pymongo.read_preferences import ReadPreference from pymongo.server_api import _add_to_command from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.ssl_support import HAS_SNI, SSLError +from pymongo.synchronous.client_session import _validate_session_write_concern +from pymongo.synchronous.helpers import _handle_reauth +from pymongo.synchronous.network import command, receive_message if TYPE_CHECKING: from bson import CodecOptions from bson.objectid import ObjectId - from pymongo.auth import MongoCredential, _AuthContext - from pymongo.client_session import ClientSession from pymongo.compression_support import ( - CompressionSettings, SnappyContext, ZlibContext, ZstdContext, ) - from pymongo.driver_info import DriverInfo from pymongo.message import _OpMsg, _OpReply - from pymongo.mongo_client import MongoClient, _MongoClientErrorHandler - from pymongo.pyopenssl_context import SSLContext, _sslConn + from pymongo.pyopenssl_context import _sslConn from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode - from pymongo.server_api import ServerApi + from pymongo.synchronous.auth import _AuthContext + from pymongo.synchronous.client_session import ClientSession + from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler from pymongo.typings import ClusterTime, _Address, _CollationIn from pymongo.write_concern import WriteConcern @@ -127,6 +118,8 @@ except ImportError: """Dummy function for platforms that don't provide fcntl.""" +_IS_SYNC = True + _MAX_TCP_KEEPIDLE = 120 _MAX_TCP_KEEPINTVL = 10 _MAX_TCP_KEEPCNT = 9 @@ -186,217 +179,6 @@ else: _set_tcp_option(sock, "TCP_KEEPCNT", _MAX_TCP_KEEPCNT) -_METADATA: dict[str, Any] = {"driver": {"name": "PyMongo", "version": __version__}} - -if sys.platform.startswith("linux"): - # platform.linux_distribution was deprecated in Python 3.5 - # and removed in Python 3.8. Starting in Python 3.5 it - # raises DeprecationWarning - # DeprecationWarning: dist() and linux_distribution() functions are deprecated in Python 3.5 - _name = platform.system() - _METADATA["os"] = { - "type": _name, - "name": _name, - "architecture": platform.machine(), - # Kernel version (e.g. 4.4.0-17-generic). - "version": platform.release(), - } -elif sys.platform == "darwin": - _METADATA["os"] = { - "type": platform.system(), - "name": platform.system(), - "architecture": platform.machine(), - # (mac|i|tv)OS(X) version (e.g. 10.11.6) instead of darwin - # kernel version. - "version": platform.mac_ver()[0], - } -elif sys.platform == "win32": - _METADATA["os"] = { - "type": platform.system(), - # "Windows XP", "Windows 7", "Windows 10", etc. - "name": " ".join((platform.system(), platform.release())), - "architecture": platform.machine(), - # Windows patch level (e.g. 5.1.2600-SP3) - "version": "-".join(platform.win32_ver()[1:3]), - } -elif sys.platform.startswith("java"): - _name, _ver, _arch = platform.java_ver()[-1] - _METADATA["os"] = { - # Linux, Windows 7, Mac OS X, etc. - "type": _name, - "name": _name, - # x86, x86_64, AMD64, etc. - "architecture": _arch, - # Linux kernel version, OSX version, etc. - "version": _ver, - } -else: - # Get potential alias (e.g. SunOS 5.11 becomes Solaris 2.11) - _aliased = platform.system_alias(platform.system(), platform.release(), platform.version()) - _METADATA["os"] = { - "type": platform.system(), - "name": " ".join([part for part in _aliased[:2] if part]), - "architecture": platform.machine(), - "version": _aliased[2], - } - -if platform.python_implementation().startswith("PyPy"): - _METADATA["platform"] = " ".join( - ( - platform.python_implementation(), - ".".join(map(str, sys.pypy_version_info)), # type: ignore - "(Python %s)" % ".".join(map(str, sys.version_info)), - ) - ) -elif sys.platform.startswith("java"): - _METADATA["platform"] = " ".join( - ( - platform.python_implementation(), - ".".join(map(str, sys.version_info)), - "(%s)" % " ".join((platform.system(), platform.release())), - ) - ) -else: - _METADATA["platform"] = " ".join( - (platform.python_implementation(), ".".join(map(str, sys.version_info))) - ) - -DOCKER_ENV_PATH = "/.dockerenv" -ENV_VAR_K8S = "KUBERNETES_SERVICE_HOST" - -RUNTIME_NAME_DOCKER = "docker" -ORCHESTRATOR_NAME_K8S = "kubernetes" - - -def get_container_env_info() -> dict[str, str]: - """Returns the runtime and orchestrator of a container. - If neither value is present, the metadata client.env.container field will be omitted.""" - container = {} - - if Path(DOCKER_ENV_PATH).exists(): - container["runtime"] = RUNTIME_NAME_DOCKER - if os.getenv(ENV_VAR_K8S): - container["orchestrator"] = ORCHESTRATOR_NAME_K8S - - return container - - -def _is_lambda() -> bool: - if os.getenv("AWS_LAMBDA_RUNTIME_API"): - return True - env = os.getenv("AWS_EXECUTION_ENV") - if env: - return env.startswith("AWS_Lambda_") - return False - - -def _is_azure_func() -> bool: - return bool(os.getenv("FUNCTIONS_WORKER_RUNTIME")) - - -def _is_gcp_func() -> bool: - return bool(os.getenv("K_SERVICE") or os.getenv("FUNCTION_NAME")) - - -def _is_vercel() -> bool: - return bool(os.getenv("VERCEL")) - - -def _is_faas() -> bool: - return _is_lambda() or _is_azure_func() or _is_gcp_func() or _is_vercel() - - -def _getenv_int(key: str) -> Optional[int]: - """Like os.getenv but returns an int, or None if the value is missing/malformed.""" - val = os.getenv(key) - if not val: - return None - try: - return int(val) - except ValueError: - return None - - -def _metadata_env() -> dict[str, Any]: - env: dict[str, Any] = {} - container = get_container_env_info() - if container: - env["container"] = container - # Skip if multiple (or no) envs are matched. - if (_is_lambda(), _is_azure_func(), _is_gcp_func(), _is_vercel()).count(True) != 1: - return env - if _is_lambda(): - env["name"] = "aws.lambda" - region = os.getenv("AWS_REGION") - if region: - env["region"] = region - memory_mb = _getenv_int("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") - if memory_mb is not None: - env["memory_mb"] = memory_mb - elif _is_azure_func(): - env["name"] = "azure.func" - elif _is_gcp_func(): - env["name"] = "gcp.func" - region = os.getenv("FUNCTION_REGION") - if region: - env["region"] = region - memory_mb = _getenv_int("FUNCTION_MEMORY_MB") - if memory_mb is not None: - env["memory_mb"] = memory_mb - timeout_sec = _getenv_int("FUNCTION_TIMEOUT_SEC") - if timeout_sec is not None: - env["timeout_sec"] = timeout_sec - elif _is_vercel(): - env["name"] = "vercel" - region = os.getenv("VERCEL_REGION") - if region: - env["region"] = region - return env - - -_MAX_METADATA_SIZE = 512 - - -# See: https://github.com/mongodb/specifications/blob/5112bcc/source/mongodb-handshake/handshake.rst#limitations -def _truncate_metadata(metadata: MutableMapping[str, Any]) -> None: - """Perform metadata truncation.""" - if len(bson.encode(metadata)) <= _MAX_METADATA_SIZE: - return - # 1. Omit fields from env except env.name. - env_name = metadata.get("env", {}).get("name") - if env_name: - metadata["env"] = {"name": env_name} - if len(bson.encode(metadata)) <= _MAX_METADATA_SIZE: - return - # 2. Omit fields from os except os.type. - os_type = metadata.get("os", {}).get("type") - if os_type: - metadata["os"] = {"type": os_type} - if len(bson.encode(metadata)) <= _MAX_METADATA_SIZE: - return - # 3. Omit the env document entirely. - metadata.pop("env", None) - encoded_size = len(bson.encode(metadata)) - if encoded_size <= _MAX_METADATA_SIZE: - return - # 4. Truncate platform. - overflow = encoded_size - _MAX_METADATA_SIZE - plat = metadata.get("platform", "") - if plat: - plat = plat[:-overflow] - if plat: - metadata["platform"] = plat - else: - metadata.pop("platform", None) - - -# If the first getaddrinfo call of this interpreter's life is on a thread, -# while the main thread holds the import lock, getaddrinfo deadlocks trying -# to import the IDNA codec. Import it here, where presumably we're on the -# main thread, to avoid the deadlock. See PYTHON-607. -"foo".encode("idna") - - def _raise_connection_failure( address: Any, error: Exception, @@ -457,238 +239,6 @@ def format_timeout_details(details: Optional[dict[str, float]]) -> str: return result -class PoolOptions: - """Read only connection pool options for a MongoClient. - - Should not be instantiated directly by application developers. Access - a client's pool options via - :attr:`~pymongo.client_options.ClientOptions.pool_options` instead:: - - pool_opts = client.options.pool_options - pool_opts.max_pool_size - pool_opts.min_pool_size - - """ - - __slots__ = ( - "__max_pool_size", - "__min_pool_size", - "__max_idle_time_seconds", - "__connect_timeout", - "__socket_timeout", - "__wait_queue_timeout", - "__ssl_context", - "__tls_allow_invalid_hostnames", - "__event_listeners", - "__appname", - "__driver", - "__metadata", - "__compression_settings", - "__max_connecting", - "__pause_enabled", - "__server_api", - "__load_balanced", - "__credentials", - ) - - def __init__( - self, - max_pool_size: int = MAX_POOL_SIZE, - min_pool_size: int = MIN_POOL_SIZE, - max_idle_time_seconds: Optional[int] = MAX_IDLE_TIME_SEC, - connect_timeout: Optional[float] = None, - socket_timeout: Optional[float] = None, - wait_queue_timeout: Optional[int] = WAIT_QUEUE_TIMEOUT, - ssl_context: Optional[SSLContext] = None, - tls_allow_invalid_hostnames: bool = False, - event_listeners: Optional[_EventListeners] = None, - appname: Optional[str] = None, - driver: Optional[DriverInfo] = None, - compression_settings: Optional[CompressionSettings] = None, - max_connecting: int = MAX_CONNECTING, - pause_enabled: bool = True, - server_api: Optional[ServerApi] = None, - load_balanced: Optional[bool] = None, - credentials: Optional[MongoCredential] = None, - ): - self.__max_pool_size = max_pool_size - self.__min_pool_size = min_pool_size - self.__max_idle_time_seconds = max_idle_time_seconds - self.__connect_timeout = connect_timeout - self.__socket_timeout = socket_timeout - self.__wait_queue_timeout = wait_queue_timeout - self.__ssl_context = ssl_context - self.__tls_allow_invalid_hostnames = tls_allow_invalid_hostnames - self.__event_listeners = event_listeners - self.__appname = appname - self.__driver = driver - self.__compression_settings = compression_settings - self.__max_connecting = max_connecting - self.__pause_enabled = pause_enabled - self.__server_api = server_api - self.__load_balanced = load_balanced - self.__credentials = credentials - self.__metadata = copy.deepcopy(_METADATA) - if appname: - self.__metadata["application"] = {"name": appname} - - # Combine the "driver" MongoClient option with PyMongo's info, like: - # { - # 'driver': { - # 'name': 'PyMongo|MyDriver', - # 'version': '4.2.0|1.2.3', - # }, - # 'platform': 'CPython 3.8.0|MyPlatform' - # } - if driver: - if driver.name: - self.__metadata["driver"]["name"] = "{}|{}".format( - _METADATA["driver"]["name"], - driver.name, - ) - if driver.version: - self.__metadata["driver"]["version"] = "{}|{}".format( - _METADATA["driver"]["version"], - driver.version, - ) - if driver.platform: - self.__metadata["platform"] = "{}|{}".format(_METADATA["platform"], driver.platform) - - env = _metadata_env() - if env: - self.__metadata["env"] = env - - _truncate_metadata(self.__metadata) - - @property - def _credentials(self) -> Optional[MongoCredential]: - """A :class:`~pymongo.auth.MongoCredentials` instance or None.""" - return self.__credentials - - @property - def non_default_options(self) -> dict[str, Any]: - """The non-default options this pool was created with. - - Added for CMAP's :class:`PoolCreatedEvent`. - """ - opts = {} - if self.__max_pool_size != MAX_POOL_SIZE: - opts["maxPoolSize"] = self.__max_pool_size - if self.__min_pool_size != MIN_POOL_SIZE: - opts["minPoolSize"] = self.__min_pool_size - if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC: - assert self.__max_idle_time_seconds is not None - opts["maxIdleTimeMS"] = self.__max_idle_time_seconds * 1000 - if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT: - assert self.__wait_queue_timeout is not None - opts["waitQueueTimeoutMS"] = self.__wait_queue_timeout * 1000 - if self.__max_connecting != MAX_CONNECTING: - opts["maxConnecting"] = self.__max_connecting - return opts - - @property - def max_pool_size(self) -> float: - """The maximum allowable number of concurrent connections to each - connected server. Requests to a server will block if there are - `maxPoolSize` outstanding connections to the requested server. - Defaults to 100. Cannot be 0. - - When a server's pool has reached `max_pool_size`, operations for that - server block waiting for a socket to be returned to the pool. If - ``waitQueueTimeoutMS`` is set, a blocked operation will raise - :exc:`~pymongo.errors.ConnectionFailure` after a timeout. - By default ``waitQueueTimeoutMS`` is not set. - """ - return self.__max_pool_size - - @property - def min_pool_size(self) -> int: - """The minimum required number of concurrent connections that the pool - will maintain to each connected server. Default is 0. - """ - return self.__min_pool_size - - @property - def max_connecting(self) -> int: - """The maximum number of concurrent connection creation attempts per - pool. Defaults to 2. - """ - return self.__max_connecting - - @property - def pause_enabled(self) -> bool: - return self.__pause_enabled - - @property - def max_idle_time_seconds(self) -> Optional[int]: - """The maximum number of seconds that a connection can remain - idle in the pool before being removed and replaced. Defaults to - `None` (no limit). - """ - return self.__max_idle_time_seconds - - @property - def connect_timeout(self) -> Optional[float]: - """How long a connection can take to be opened before timing out.""" - return self.__connect_timeout - - @property - def socket_timeout(self) -> Optional[float]: - """How long a send or receive on a socket can take before timing out.""" - return self.__socket_timeout - - @property - def wait_queue_timeout(self) -> Optional[int]: - """How long a thread will wait for a socket from the pool if the pool - has no free sockets. - """ - return self.__wait_queue_timeout - - @property - def _ssl_context(self) -> Optional[SSLContext]: - """An SSLContext instance or None.""" - return self.__ssl_context - - @property - def tls_allow_invalid_hostnames(self) -> bool: - """If True skip ssl.match_hostname.""" - return self.__tls_allow_invalid_hostnames - - @property - def _event_listeners(self) -> Optional[_EventListeners]: - """An instance of pymongo.monitoring._EventListeners.""" - return self.__event_listeners - - @property - def appname(self) -> Optional[str]: - """The application name, for sending with hello in server handshake.""" - return self.__appname - - @property - def driver(self) -> Optional[DriverInfo]: - """Driver name and version, for sending with hello in handshake.""" - return self.__driver - - @property - def _compression_settings(self) -> Optional[CompressionSettings]: - return self.__compression_settings - - @property - def metadata(self) -> dict[str, Any]: - """A dict of metadata about the application, driver, os, and platform.""" - return self.__metadata.copy() - - @property - def server_api(self) -> Optional[ServerApi]: - """A pymongo.server_api.ServerApi or None.""" - return self.__server_api - - @property - def load_balanced(self) -> Optional[bool]: - """True if this Pool is configured in load balanced mode.""" - return self.__load_balanced - - class _CancellationContext: def __init__(self) -> None: self._cancelled = False @@ -733,6 +283,7 @@ class Connection: self.op_msg_enabled = False self.listeners = pool.opts._event_listeners self.enabled_for_cmap = pool.enabled_for_cmap + self.enabled_for_logging = pool.enabled_for_logging self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -824,7 +375,7 @@ class Connection: else: return {HelloCompat.LEGACY_CMD: 1, "helloOk": True} - def hello(self) -> Hello[dict[str, Any]]: + def hello(self) -> Hello: return self._hello(None, None, None) def _hello( @@ -859,6 +410,8 @@ class Connection: if creds: if creds.mechanism == "DEFAULT" and creds.username: cmd["saslSupportedMechs"] = creds.source + "." + creds.username + from pymongo.synchronous import auth + auth_ctx = auth._AuthContext.from_credentials(creds, self.address) if auth_ctx: speculative_authenticate = auth_ctx.speculate_command() @@ -919,7 +472,7 @@ class Connection: self.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response() response_doc = unpacked_docs[0] - helpers._check_command_response(response_doc, self.max_wire_version) + helpers_shared._check_command_response(response_doc, self.max_wire_version) return response_doc @_handle_reauth @@ -1024,7 +577,7 @@ class Connection: ) try: - self.conn.sendall(message) + sendall(self.conn, message) except BaseException as error: self._raise_connection_failure(error) @@ -1072,7 +625,7 @@ class Connection: result = reply.command_response(codec_options) # Raises NotPrimaryError or OperationFailure. - helpers._check_command_response(result, self.max_wire_version) + helpers_shared._check_command_response(result, self.max_wire_version) return result def authenticate(self, reauthenticate: bool = False) -> None: @@ -1090,22 +643,24 @@ class Connection: if not self.ready: creds = self.opts._credentials if creds: + from pymongo.synchronous import auth + auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True + duration = time.monotonic() - self.creation_time if self.enabled_for_cmap: assert self.listeners is not None - duration = time.monotonic() - self.creation_time self.listeners.publish_connection_ready(self.address, self.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_READY, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_READY, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=self.id, + durationMS=duration, + ) def validate_session( self, client: Optional[MongoClient], session: Optional[ClientSession] @@ -1123,10 +678,11 @@ class Connection: if self.closed: return self._close_conn() - if reason and self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if reason: + if self.enabled_for_cmap: + assert self.listeners is not None + self.listeners.publish_connection_closed(self.address, self.id, reason) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1315,9 +871,26 @@ def _configured_socket(address: _Address, options: PoolOptions) -> Union[socket. # We have to pass hostname / ip address to wrap_socket # to use SSLContext.check_hostname. if HAS_SNI: - ssl_sock = ssl_context.wrap_socket(sock, server_hostname=host) + if _IS_SYNC: + ssl_sock = ssl_context.wrap_socket(sock, server_hostname=host) + else: + if hasattr(ssl_context, "a_wrap_socket"): + ssl_sock = ssl_context.a_wrap_socket(sock, server_hostname=host) # type: ignore[assignment, misc] + else: + loop = asyncio.get_running_loop() + ssl_sock = loop.run_in_executor( + None, + functools.partial(ssl_context.wrap_socket, sock, server_hostname=host), # type: ignore[assignment, misc] + ) else: - ssl_sock = ssl_context.wrap_socket(sock) + if _IS_SYNC: + ssl_sock = ssl_context.wrap_socket(sock) + else: + if hasattr(ssl_context, "a_wrap_socket"): + ssl_sock = ssl_context.a_wrap_socket(sock) # type: ignore[assignment, misc] + else: + loop = asyncio.get_running_loop() + ssl_sock = loop.run_in_executor(None, ssl_context.wrap_socket, sock) # type: ignore[assignment, misc] except _CertificateError: sock.close() # Raise _CertificateError directly like we do after match_hostname @@ -1430,17 +1003,18 @@ class Pool: self.address = address self.opts = options self.handshake = handshake - # Don't publish events in Monitor pools. + # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( self.handshake and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) + self.enabled_for_logging = self.handshake # The first portion of the wait queue. # Enforces: maxPoolSize # Also used for: clearing the wait queue - self.size_cond = threading.Condition(self.lock) + self.size_cond = threading.Condition(self.lock) # type: ignore[arg-type] self.requests = 0 self.max_pool_size = self.opts.max_pool_size if not self.max_pool_size: @@ -1448,7 +1022,7 @@ class Pool: # The second portion of the wait queue. # Enforces: maxConnecting # Also used for: clearing the wait queue - self._max_connecting_cond = threading.Condition(self.lock) + self._max_connecting_cond = threading.Condition(self.lock) # type: ignore[arg-type] self._max_connecting = self.opts.max_connecting self._pending = 0 self._client_id = client_id @@ -1457,15 +1031,15 @@ class Pool: self.opts._event_listeners.publish_pool_created( self.address, self.opts.non_default_options ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + **self.opts.non_default_options, + ) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -1483,14 +1057,14 @@ class Pool: if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_ready(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_READY, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_READY, + serverHost=self.address[0], + serverPort=self.address[1], + ) @property def closed(self) -> bool: @@ -1548,23 +1122,24 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_pool_closed(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.POOL_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - ) - else: - if old_state != PoolState.PAUSED and self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.POOL_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + else: + if old_state != PoolState.PAUSED: + if self.enabled_for_cmap: + assert listeners is not None + listeners.publish_pool_cleared( + self.address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1672,15 +1247,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_created(self.address, conn_id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CREATED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CREATED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + ) try: sock = _configured_socket(self.address, self.opts) @@ -1690,17 +1265,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn_id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) if isinstance(error, (IOError, OSError, SSLError)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1725,7 +1300,9 @@ class Pool: return conn @contextlib.contextmanager - def checkout(self, handler: Optional[_MongoClientErrorHandler] = None) -> Iterator[Connection]: + def checkout( + self, handler: Optional[_MongoClientErrorHandler] = None + ) -> Generator[Connection, None]: """Get a connection from the pool. Use with a "with" statement. Returns a :class:`Connection` object wrapping a connected @@ -1746,31 +1323,31 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_check_out_started(self.address) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - serverHost=self.address[0], - serverPort=self.address[1], - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_STARTED, + serverHost=self.address[0], + serverPort=self.address[1], + ) conn = self._get_conn(checkout_started_time, handler=handler) + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_checked_out(self.address, conn.id, duration) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + durationMS=duration, + ) try: with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1802,13 +1379,14 @@ class Pool: def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: - if self.enabled_for_cmap and emit_event: - assert self.opts._event_listeners is not None + if emit_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1836,23 +1414,23 @@ class Pool: self.reset_without_pause() if self.closed: + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert self.opts._event_listeners is not None - duration = time.monotonic() - checkout_started_time self.opts._event_listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Connection pool was closed", + error=ConnectionCheckOutFailedReason.POOL_CLOSED, + durationMS=duration, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1928,13 +1506,14 @@ class Pool: self.active_sockets -= 1 self.size_cond.notify() - if self.enabled_for_cmap and not emitted_event: - assert self.opts._event_listeners is not None + if not emitted_event: duration = time.monotonic() - checkout_started_time - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_connection_check_out_failed( + self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration + ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _CONNECTION_LOGGER, clientId=self._client_id, @@ -1967,15 +1546,15 @@ class Pool: if self.enabled_for_cmap: assert listeners is not None listeners.publish_connection_checked_in(self.address, conn.id) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKEDIN, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKEDIN, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + ) if self.pid != os.getpid(): self.reset_without_pause() else: @@ -1988,17 +1567,17 @@ class Pool: listeners.publish_connection_closed( self.address, conn.id, ConnectionClosedReason.ERROR ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CONN_CLOSED, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CONN_CLOSED, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn.id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), + error=ConnectionClosedReason.ERROR, + ) else: with self.lock: # Hold the lock to ensure this section does not race with @@ -2060,23 +1639,23 @@ class Pool: def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: listeners = self.opts._event_listeners + duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: assert listeners is not None - duration = time.monotonic() - checkout_started_time listeners.publish_connection_check_out_failed( self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration ) - if _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - clientId=self._client_id, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + clientId=self._client_id, + message=_ConnectionStatusMessage.CHECKOUT_FAILED, + serverHost=self.address[0], + serverPort=self.address[1], + reason="Wait queue timeout elapsed without a connection becoming available", + error=ConnectionCheckOutFailedReason.TIMEOUT, + durationMS=duration, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns