diff --git a/.evergreen/generated_configs/variants.yml b/.evergreen/generated_configs/variants.yml index 5e769173f..dbfec82f9 100644 --- a/.evergreen/generated_configs/variants.yml +++ b/.evergreen/generated_configs/variants.yml @@ -319,14 +319,6 @@ buildvariants: tags: [] # Green framework tests - - name: green-eventlet-rhel8 - tasks: - - name: .test-standard .python-3.9 .sync - display_name: Green Eventlet RHEL8 - run_on: - - rhel87-small - expansions: - GREEN_FRAMEWORK: eventlet - name: green-gevent-rhel8 tasks: - name: .test-standard .sync diff --git a/.evergreen/scripts/generate_config.py b/.evergreen/scripts/generate_config.py index a04a64d30..7b17b127f 100644 --- a/.evergreen/scripts/generate_config.py +++ b/.evergreen/scripts/generate_config.py @@ -300,12 +300,8 @@ def create_stable_api_variants(): def create_green_framework_variants(): variants = [] host = DEFAULT_HOST - for framework in ["eventlet", "gevent"]: + for framework in ["gevent"]: tasks = [".test-standard .sync"] - if framework == "eventlet": - # Eventlet has issues with dnspython > 2.0 and newer versions of CPython - # https://jira.mongodb.org/browse/PYTHON-5284 - tasks = [".test-standard .python-3.9 .sync"] expansions = dict(GREEN_FRAMEWORK=framework) display_name = get_variant_name(f"Green {framework.capitalize()}", host) variant = create_variant(tasks, display_name, host=host, expansions=expansions) diff --git a/.evergreen/scripts/run_tests.py b/.evergreen/scripts/run_tests.py index 3a1c15a41..fd4fd13e5 100644 --- a/.evergreen/scripts/run_tests.py +++ b/.evergreen/scripts/run_tests.py @@ -67,13 +67,7 @@ def handle_perf(start_time: datetime): def handle_green_framework() -> None: - if GREEN_FRAMEWORK == "eventlet": - import eventlet - - # https://github.com/eventlet/eventlet/issues/401 - eventlet.sleep() - eventlet.monkey_patch() - elif GREEN_FRAMEWORK == "gevent": + if GREEN_FRAMEWORK == "gevent": from gevent import monkey monkey.patch_all() diff --git a/.evergreen/scripts/setup-dev-env.sh b/.evergreen/scripts/setup-dev-env.sh index 6e6b5965b..38824f32b 100755 --- a/.evergreen/scripts/setup-dev-env.sh +++ b/.evergreen/scripts/setup-dev-env.sh @@ -47,13 +47,13 @@ if [ -f $HOME/.visualStudioEnv.sh ]; then SSH_TTY=1 source $HOME/.visualStudioEnv.sh set -u fi -uv sync --frozen +uv sync echo "Setting up python environment... done." # Ensure there is a pre-commit hook if there is a git checkout. if [ -d .git ] && [ ! -f .git/hooks/pre-commit ]; then - uv run --frozen pre-commit install + uv run pre-commit install fi popd > /dev/null diff --git a/.evergreen/scripts/utils.py b/.evergreen/scripts/utils.py index 8d1b466b6..50894ff63 100644 --- a/.evergreen/scripts/utils.py +++ b/.evergreen/scripts/utils.py @@ -104,7 +104,7 @@ def get_test_options( parser.add_argument( "--green-framework", nargs=1, - choices=["eventlet", "gevent"], + choices=["gevent"], help="Optional green framework to test against.", ) parser.add_argument( diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ac129f95f..d2b9d9a17 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -105,12 +105,6 @@ repos: # - test/test_client.py:188: te ==> the, be, we, to args: ["-L", "fle,fo,infinit,isnt,nin,te,aks"] -- repo: https://github.com/astral-sh/uv-pre-commit - # uv version. - rev: 0.8.17 - hooks: - - id: uv-lock - - repo: local hooks: - id: executable-shell @@ -128,3 +122,14 @@ repos: language: python require_serial: true additional_dependencies: ["shrub.py>=3.10.0", "pyyaml>=6.0.2"] + + - id: uv-lock + name: uv-lock + entry: uv lock + language: python + require_serial: true + files: ^(uv\.lock|pyproject\.toml|requirements.txt|requirements/.*\.txt)$ + pass_filenames: false + fail_fast: true + additional_dependencies: + - "uv>=0.8.4" diff --git a/bson/__init__.py b/bson/__init__.py index e677d8cfd..d260fb876 100644 --- a/bson/__init__.py +++ b/bson/__init__.py @@ -1009,7 +1009,7 @@ def _dict_to_bson( try: elements.append(_element_to_bson(key, value, check_keys, opts)) except InvalidDocument as err: - raise InvalidDocument(f"Invalid document {doc} | {err}") from err + raise InvalidDocument(f"Invalid document: {err}", doc) from err except AttributeError: raise TypeError(f"encoder expected a mapping type but got: {doc!r}") from None diff --git a/bson/_cbsonmodule.c b/bson/_cbsonmodule.c index be91e4173..bee719856 100644 --- a/bson/_cbsonmodule.c +++ b/bson/_cbsonmodule.c @@ -1645,11 +1645,11 @@ fail: } -/* Update Invalid Document error message to include doc. +/* Update Invalid Document error to include doc as a property. */ void handle_invalid_doc_error(PyObject* dict) { PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; - PyObject *msg = NULL, *dict_str = NULL, *new_msg = NULL; + PyObject *msg = NULL, *new_msg = NULL, *new_evalue = NULL; PyErr_Fetch(&etype, &evalue, &etrace); PyObject *InvalidDocument = _error("InvalidDocument"); if (InvalidDocument == NULL) { @@ -1659,26 +1659,22 @@ void handle_invalid_doc_error(PyObject* dict) { if (evalue && PyErr_GivenExceptionMatches(etype, InvalidDocument)) { PyObject *msg = PyObject_Str(evalue); if (msg) { - // Prepend doc to the existing message - PyObject *dict_str = PyObject_Str(dict); - if (dict_str == NULL) { - goto cleanup; - } - const char * dict_str_utf8 = PyUnicode_AsUTF8(dict_str); - if (dict_str_utf8 == NULL) { - goto cleanup; - } const char * msg_utf8 = PyUnicode_AsUTF8(msg); if (msg_utf8 == NULL) { goto cleanup; } - PyObject *new_msg = PyUnicode_FromFormat("Invalid document %s | %s", dict_str_utf8, msg_utf8); + PyObject *new_msg = PyUnicode_FromFormat("Invalid document: %s", msg_utf8); + if (new_msg == NULL) { + goto cleanup; + } + // Add doc to the error instance as a property. + PyObject *new_evalue = PyObject_CallFunctionObjArgs(InvalidDocument, new_msg, dict, NULL); Py_DECREF(evalue); Py_DECREF(etype); etype = InvalidDocument; InvalidDocument = NULL; - if (new_msg) { - evalue = new_msg; + if (new_evalue) { + evalue = new_evalue; } else { evalue = msg; } @@ -1689,7 +1685,7 @@ cleanup: PyErr_Restore(etype, evalue, etrace); Py_XDECREF(msg); Py_XDECREF(InvalidDocument); - Py_XDECREF(dict_str); + Py_XDECREF(new_evalue); Py_XDECREF(new_msg); } diff --git a/bson/errors.py b/bson/errors.py index a3699e704..ffc117f7a 100644 --- a/bson/errors.py +++ b/bson/errors.py @@ -15,6 +15,8 @@ """Exceptions raised by the BSON package.""" from __future__ import annotations +from typing import Any, Optional + class BSONError(Exception): """Base class for all BSON exceptions.""" @@ -31,6 +33,17 @@ class InvalidStringData(BSONError): class InvalidDocument(BSONError): """Raised when trying to create a BSON object from an invalid document.""" + def __init__(self, message: str, document: Optional[Any] = None) -> None: + super().__init__(message) + self._document = document + + @property + def document(self) -> Any: + """The invalid document that caused the error. + + ..versionadded:: 4.16""" + return self._document + class InvalidId(BSONError): """Raised when trying to create an ObjectId from invalid data.""" diff --git a/doc/changelog.rst b/doc/changelog.rst index 082c22faf..6dcb80497 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,6 +1,17 @@ Changelog ========= +Changes in Version 4.16.0 (XXXX/XX/XX) +-------------------------------------- + +PyMongo 4.16 brings a number of changes including: + +- Removed invalid documents from :class:`bson.errors.InvalidDocument` error messages as + doing so may leak sensitive user data. + Instead, invalid documents are stored in :attr:`bson.errors.InvalidDocument.document`. +- Removed support for Eventlet. + Eventlet is actively being sunset by its maintainers and has compatibility issues with PyMongo's dnspython dependency. + Changes in Version 4.15.1 (2025/09/16) -------------------------------------- diff --git a/doc/contributors.rst b/doc/contributors.rst index 4a7f5424b..08296e959 100644 --- a/doc/contributors.rst +++ b/doc/contributors.rst @@ -103,3 +103,7 @@ The following is a list of people who have contributed to - Terry Patterson - Romain Morotti - Navjot Singh (navjots18) +- Jib Adegunloye (Jibola) +- Jeffrey A. Clark (aclark4life) +- Steven Silvester (blink1073) +- Noah Stapp (NoahStapp) diff --git a/justfile b/justfile index c129b2c19..4645a4a47 100644 --- a/justfile +++ b/justfile @@ -1,7 +1,5 @@ # See https://just.systems/man/en/ for instructions set shell := ["bash", "-c"] -# Do not modify the lock file when running justfile commands. -export UV_FROZEN := "1" # Commonly used command segments. typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" @@ -16,7 +14,7 @@ default: [private] resync: - @uv sync --quiet --frozen + @uv sync --quiet install: bash .evergreen/scripts/setup-dev-env.sh diff --git a/pymongo/asynchronous/aggregation.py b/pymongo/asynchronous/aggregation.py index 059d69877..6ca60ad9c 100644 --- a/pymongo/asynchronous/aggregation.py +++ b/pymongo/asynchronous/aggregation.py @@ -50,7 +50,6 @@ class _AggregationCommand: cursor_class: type[AsyncCommandCursor[Any]], pipeline: _Pipeline, options: MutableMapping[str, Any], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, user_fields: Optional[MutableMapping[str, Any]] = None, result_processor: Optional[Callable[[Mapping[str, Any], AsyncConnection], None]] = None, @@ -92,7 +91,6 @@ class _AggregationCommand: self._options["cursor"]["batchSize"] = self._batch_size self._cursor_class = cursor_class - self._explicit_session = explicit_session self._user_fields = user_fields self._result_processor = result_processor @@ -197,7 +195,6 @@ class _AggregationCommand: batch_size=self._batch_size or 0, max_await_time_ms=self._max_await_time_ms, session=session, - explicit_session=self._explicit_session, comment=self._options.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/change_stream.py b/pymongo/asynchronous/change_stream.py index 3940111df..b2b78b066 100644 --- a/pymongo/asynchronous/change_stream.py +++ b/pymongo/asynchronous/change_stream.py @@ -236,7 +236,7 @@ class AsyncChangeStream(Generic[_DocumentType]): ) async def _run_aggregation_cmd( - self, session: Optional[AsyncClientSession], explicit_session: bool + self, session: Optional[AsyncClientSession] ) -> AsyncCommandCursor: # type: ignore[type-arg] """Run the full aggregation pipeline for this AsyncChangeStream and return the corresponding AsyncCommandCursor. @@ -246,7 +246,6 @@ class AsyncChangeStream(Generic[_DocumentType]): AsyncCommandCursor, self._aggregation_pipeline(), self._command_options(), - explicit_session, result_processor=self._process_result, comment=self._comment, ) @@ -258,10 +257,8 @@ class AsyncChangeStream(Generic[_DocumentType]): ) async def _create_cursor(self) -> AsyncCommandCursor: # type: ignore[type-arg] - async with self._client._tmp_session(self._session, close=False) as s: - return await self._run_aggregation_cmd( - session=s, explicit_session=self._session is not None - ) + async with self._client._tmp_session(self._session) as s: + return await self._run_aggregation_cmd(session=s) async def _resume(self) -> None: """Reestablish this change stream after a resumable error.""" diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 45812b340..151942c8a 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -440,6 +440,8 @@ class _AsyncClientBulk: ) -> None: """Internal helper for processing the server reply command cursor.""" if result.get("cursor"): + if session: + session._leave_alive = True coll = AsyncCollection( database=AsyncDatabase(self.client, "admin"), name="$cmd.bulkWrite", @@ -449,7 +451,6 @@ class _AsyncClientBulk: result["cursor"], conn.address, session=session, - explicit_session=session is not None, comment=self.comment, ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index be02295ce..8674e9844 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -513,6 +513,10 @@ class AsyncClientSession: # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + # Is this session attached to a cursor? + self._attached_to_cursor = False + # Should we leave the session alive when the cursor is closed? + self._leave_alive = False async def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -535,7 +539,7 @@ class AsyncClientSession: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections - if self._server_session is not None: + if not self._leave_alive and self._server_session is not None: self._client._return_server_session(self._server_session) self._server_session = None diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index 064231ccf..6af1f4f78 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -2549,7 +2549,6 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY), ) read_pref = (session and session._txn_read_preference()) or ReadPreference.PRIMARY - explicit_session = session is not None async def _cmd( session: Optional[AsyncClientSession], @@ -2576,13 +2575,12 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): cursor, conn.address, session=session, - explicit_session=explicit_session, comment=cmd.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) return cmd_cursor - async with self._database.client._tmp_session(session, False) as s: + async with self._database.client._tmp_session(session) as s: return await self._database.client._retryable_read( _cmd, read_pref, s, operation=_Op.LIST_INDEXES ) @@ -2678,7 +2676,6 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): AsyncCommandCursor, pipeline, kwargs, - explicit_session=session is not None, comment=comment, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -2900,7 +2897,6 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): pipeline: _Pipeline, cursor_class: Type[AsyncCommandCursor], # type: ignore[type-arg] session: Optional[AsyncClientSession], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, **kwargs: Any, @@ -2912,7 +2908,6 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): cursor_class, pipeline, kwargs, - explicit_session, let, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -3018,13 +3013,12 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - async with self._database.client._tmp_session(session, close=False) as s: + async with self._database.client._tmp_session(session) as s: return await self._aggregate( _CollectionAggregationCommand, pipeline, AsyncCommandCursor, session=s, - explicit_session=session is not None, let=let, comment=comment, **kwargs, @@ -3065,7 +3059,7 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): raise InvalidOperation("aggregate_raw_batches does not support auto encryption") if comment is not None: kwargs["comment"] = comment - async with self._database.client._tmp_session(session, close=False) as s: + async with self._database.client._tmp_session(session) as s: return cast( AsyncRawBatchCursor[_DocumentType], await self._aggregate( @@ -3073,7 +3067,6 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]): pipeline, AsyncRawBatchCommandCursor, session=s, - explicit_session=session is not None, **kwargs, ), ) diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index db7c2b663..e18b3a330 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -64,7 +64,6 @@ class AsyncCommandCursor(Generic[_DocumentType]): batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new command cursor.""" @@ -80,7 +79,8 @@ class AsyncCommandCursor(Generic[_DocumentType]): self._max_await_time_ms = max_await_time_ms self._timeout = self._collection.database.client.options.timeout self._session = session - self._explicit_session = explicit_session + if self._session is not None: + self._session._attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ class AsyncCommandCursor(Generic[_DocumentType]): .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session._implicit: return self._session return None @@ -218,9 +218,10 @@ class AsyncCommandCursor(Generic[_DocumentType]): """Closes this cursor without acquiring a lock.""" cursor_id, address = self._prepare_to_die() self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -232,14 +233,15 @@ class AsyncCommandCursor(Generic[_DocumentType]): address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None @@ -430,7 +432,6 @@ class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]): batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new cursor / iterator over raw batches of BSON data. @@ -449,7 +450,6 @@ class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]): batch_size, max_await_time_ms, session, - explicit_session, comment, ) diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index d9fdd576f..df060a4fa 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -138,10 +138,9 @@ class AsyncCursor(Generic[_DocumentType]): if session: self._session = session - self._explicit_session = True + self._session._attached_to_cursor = True else: self._session = None - self._explicit_session = False spec: Mapping[str, Any] = filter or {} validate_is_mapping("filter", spec) @@ -150,7 +149,7 @@ class AsyncCursor(Generic[_DocumentType]): if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and not self._explicit_session: + if no_cursor_timeout and self._session and self._session._implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -283,7 +282,7 @@ class AsyncCursor(Generic[_DocumentType]): def _clone(self, deepcopy: bool = True, base: Optional[AsyncCursor] = None) -> AsyncCursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._explicit_session: + if self._session and not self._session._implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -945,7 +944,7 @@ class AsyncCursor(Generic[_DocumentType]): .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session._implicit: return self._session return None @@ -1034,9 +1033,10 @@ class AsyncCursor(Generic[_DocumentType]): cursor_id, address = self._prepare_to_die(already_killed) self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1054,9 +1054,9 @@ class AsyncCursor(Generic[_DocumentType]): address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index f70c2b403..8e0afc9dc 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -611,6 +611,8 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): common.validate_is_mapping("clusteredIndex", clustered_index) async with self._client._tmp_session(session) as s: + if s and not s.in_transaction: + s._leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( @@ -619,6 +621,8 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): and name in await self._list_collection_names(filter={"name": name}, session=s) ): raise CollectionInvalid("collection %s already exists" % name) + if s: + s._leave_alive = False coll = AsyncCollection( self, name, @@ -699,13 +703,12 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - async with self.client._tmp_session(session, close=False) as s: + async with self.client._tmp_session(session) as s: cmd = _DatabaseAggregationCommand( self, AsyncCommandCursor, pipeline, kwargs, - session is not None, user_fields={"cursor": {"firstBatch": 1}}, ) return await self.client._retryable_read( @@ -1011,7 +1014,7 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): else: command_name = next(iter(command)) - async with self._client._tmp_session(session, close=False) as tmp_session: + async with self._client._tmp_session(session) as tmp_session: opts = codec_options or DEFAULT_CODEC_OPTIONS if read_preference is None: @@ -1043,7 +1046,6 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): conn.address, max_await_time_ms=max_await_time_ms, session=tmp_session, - explicit_session=session is not None, comment=comment, ) await cmd_cursor._maybe_pin_connection(conn) @@ -1089,7 +1091,7 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): ) cmd = {"listCollections": 1, "cursor": {}} cmd.update(kwargs) - async with self._client._tmp_session(session, close=False) as tmp_session: + async with self._client._tmp_session(session) as tmp_session: cursor = ( await self._command(conn, cmd, read_preference=read_preference, session=tmp_session) )["cursor"] @@ -1098,7 +1100,6 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]): cursor, conn.address, session=tmp_session, - explicit_session=session is not None, comment=cmd.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index b61664779..d9bf808d5 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2048,17 +2048,18 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): retryable = bool( retryable and self.options.retry_reads and not (session and session.in_transaction) ) - return await self._retry_internal( - func, - session, - None, - operation, - is_read=True, - address=address, - read_pref=read_pref, - retryable=retryable, - operation_id=operation_id, - ) + async with self._tmp_session(session) as s: + return await self._retry_internal( + func, + s, + None, + operation, + is_read=True, + address=address, + read_pref=read_pref, + retryable=retryable, + operation_id=operation_id, + ) async def _retryable_write( self, @@ -2091,7 +2092,6 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[AsyncClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from __del__ without locking. @@ -2106,7 +2106,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and not explicit_session: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() async def _cleanup_cursor_lock( @@ -2115,7 +2115,6 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[AsyncClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from cursor.close() using a lock. @@ -2127,7 +2126,6 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. - :param explicit_session: True if the session was passed explicitly. """ if cursor_id: if conn_mgr and conn_mgr.more_to_come: @@ -2140,7 +2138,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: await conn_mgr.close() - if session and not explicit_session: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() async def _close_cursor_now( @@ -2221,7 +2219,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): for address, cursor_id, conn_mgr in pinned_cursors: try: - await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) + await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it @@ -2266,7 +2264,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): @contextlib.asynccontextmanager async def _tmp_session( - self, session: Optional[client_session.AsyncClientSession], close: bool = True + self, session: Optional[client_session.AsyncClientSession] ) -> AsyncGenerator[Optional[client_session.AsyncClientSession], None]: """If provided session is None, lend a temporary session.""" if session is not None: @@ -2291,7 +2289,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]): raise finally: # Call end_session when we exit this scope. - if close: + if not s._attached_to_cursor: await s.end_session() else: yield None diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 196ec9040..f521091e3 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -628,7 +628,7 @@ class AsyncConnection: # signals and throws KeyboardInterrupt into the current frame on the # main thread. # - # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue, + # But in Gevent, the polling mechanism (epoll, kqueue, # ..) is called in Python code, which experiences the signal as a # KeyboardInterrupt from the start, rather than as an initial # socket.error, so we catch that, close the socket, and reraise it. diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index ac562af54..8db26ccea 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -138,13 +138,11 @@ def _raise_connection_failure( msg = msg_prefix + msg if "configured timeouts" not in msg: msg += format_timeout_details(timeout_details) - if isinstance(error, socket.timeout): - raise NetworkTimeout(msg) from error - elif isinstance(error, SSLErrors) and "timed out" in str(error): - # Eventlet does not distinguish TLS network timeouts from other - # SSLErrors (https://github.com/eventlet/eventlet/issues/692). - # Luckily, we can work around this limitation because the phrase - # 'timed out' appears in all the timeout related SSLErrors raised. + if ( + isinstance(error, socket.timeout) + or isinstance(error, SSLErrors) + and "timed out" in str(error) + ): raise NetworkTimeout(msg) from error else: raise AutoReconnect(msg) from error diff --git a/pymongo/synchronous/aggregation.py b/pymongo/synchronous/aggregation.py index 9845f28b0..486768ab7 100644 --- a/pymongo/synchronous/aggregation.py +++ b/pymongo/synchronous/aggregation.py @@ -50,7 +50,6 @@ class _AggregationCommand: cursor_class: type[CommandCursor[Any]], pipeline: _Pipeline, options: MutableMapping[str, Any], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, user_fields: Optional[MutableMapping[str, Any]] = None, result_processor: Optional[Callable[[Mapping[str, Any], Connection], None]] = None, @@ -92,7 +91,6 @@ class _AggregationCommand: self._options["cursor"]["batchSize"] = self._batch_size self._cursor_class = cursor_class - self._explicit_session = explicit_session self._user_fields = user_fields self._result_processor = result_processor @@ -197,7 +195,6 @@ class _AggregationCommand: batch_size=self._batch_size or 0, max_await_time_ms=self._max_await_time_ms, session=session, - explicit_session=self._explicit_session, comment=self._options.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index f5f635218..7e34d7b84 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -235,9 +235,7 @@ class ChangeStream(Generic[_DocumentType]): f"response : {result!r}" ) - def _run_aggregation_cmd( - self, session: Optional[ClientSession], explicit_session: bool - ) -> CommandCursor: # type: ignore[type-arg] + def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCursor: # type: ignore[type-arg] """Run the full aggregation pipeline for this ChangeStream and return the corresponding CommandCursor. """ @@ -246,7 +244,6 @@ class ChangeStream(Generic[_DocumentType]): CommandCursor, self._aggregation_pipeline(), self._command_options(), - explicit_session, result_processor=self._process_result, comment=self._comment, ) @@ -258,8 +255,8 @@ class ChangeStream(Generic[_DocumentType]): ) def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] - with self._client._tmp_session(self._session, close=False) as s: - return self._run_aggregation_cmd(session=s, explicit_session=self._session is not None) + with self._client._tmp_session(self._session) as s: + return self._run_aggregation_cmd(session=s) def _resume(self) -> None: """Reestablish this change stream after a resumable error.""" diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 1076ceba9..a606d028e 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -438,6 +438,8 @@ class _ClientBulk: ) -> None: """Internal helper for processing the server reply command cursor.""" if result.get("cursor"): + if session: + session._leave_alive = True coll = Collection( database=Database(self.client, "admin"), name="$cmd.bulkWrite", @@ -447,7 +449,6 @@ class _ClientBulk: result["cursor"], conn.address, session=session, - explicit_session=session is not None, comment=self.comment, ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 72a5b8e88..9b547dc94 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -512,6 +512,10 @@ class ClientSession: # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + # Is this session attached to a cursor? + self._attached_to_cursor = False + # Should we leave the session alive when the cursor is closed? + self._leave_alive = False def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -534,7 +538,7 @@ class ClientSession: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections - if self._server_session is not None: + if not self._leave_alive and self._server_session is not None: self._client._return_server_session(self._server_session) self._server_session = None diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index e5cc816cd..b68e4befe 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -2546,7 +2546,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY), ) read_pref = (session and session._txn_read_preference()) or ReadPreference.PRIMARY - explicit_session = session is not None def _cmd( session: Optional[ClientSession], @@ -2573,13 +2572,12 @@ class Collection(common.BaseObject, Generic[_DocumentType]): cursor, conn.address, session=session, - explicit_session=explicit_session, comment=cmd.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) return cmd_cursor - with self._database.client._tmp_session(session, False) as s: + with self._database.client._tmp_session(session) as s: return self._database.client._retryable_read( _cmd, read_pref, s, operation=_Op.LIST_INDEXES ) @@ -2675,7 +2673,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): CommandCursor, pipeline, kwargs, - explicit_session=session is not None, comment=comment, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -2893,7 +2890,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): pipeline: _Pipeline, cursor_class: Type[CommandCursor], # type: ignore[type-arg] session: Optional[ClientSession], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, **kwargs: Any, @@ -2905,7 +2901,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): cursor_class, pipeline, kwargs, - explicit_session, let, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -3011,13 +3006,12 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - with self._database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session) as s: return self._aggregate( _CollectionAggregationCommand, pipeline, CommandCursor, session=s, - explicit_session=session is not None, let=let, comment=comment, **kwargs, @@ -3058,7 +3052,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): raise InvalidOperation("aggregate_raw_batches does not support auto encryption") if comment is not None: kwargs["comment"] = comment - with self._database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session) as s: return cast( RawBatchCursor[_DocumentType], self._aggregate( @@ -3066,7 +3060,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): pipeline, RawBatchCommandCursor, session=s, - explicit_session=session is not None, **kwargs, ), ) diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index bcdeed5f9..a09a67efc 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -64,7 +64,6 @@ class CommandCursor(Generic[_DocumentType]): batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new command cursor.""" @@ -80,7 +79,8 @@ class CommandCursor(Generic[_DocumentType]): self._max_await_time_ms = max_await_time_ms self._timeout = self._collection.database.client.options.timeout self._session = session - self._explicit_session = explicit_session + if self._session is not None: + self._session._attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ class CommandCursor(Generic[_DocumentType]): .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session._implicit: return self._session return None @@ -218,9 +218,10 @@ class CommandCursor(Generic[_DocumentType]): """Closes this cursor without acquiring a lock.""" cursor_id, address = self._prepare_to_die() self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -232,14 +233,15 @@ class CommandCursor(Generic[_DocumentType]): address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None @@ -430,7 +432,6 @@ class RawBatchCommandCursor(CommandCursor[_DocumentType]): batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new cursor / iterator over raw batches of BSON data. @@ -449,7 +450,6 @@ class RawBatchCommandCursor(CommandCursor[_DocumentType]): batch_size, max_await_time_ms, session, - explicit_session, comment, ) diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 3dd550f4d..2cecc5b38 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -138,10 +138,9 @@ class Cursor(Generic[_DocumentType]): if session: self._session = session - self._explicit_session = True + self._session._attached_to_cursor = True else: self._session = None - self._explicit_session = False spec: Mapping[str, Any] = filter or {} validate_is_mapping("filter", spec) @@ -150,7 +149,7 @@ class Cursor(Generic[_DocumentType]): if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and not self._explicit_session: + if no_cursor_timeout and self._session and self._session._implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -283,7 +282,7 @@ class Cursor(Generic[_DocumentType]): def _clone(self, deepcopy: bool = True, base: Optional[Cursor] = None) -> Cursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._explicit_session: + if self._session and not self._session._implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -943,7 +942,7 @@ class Cursor(Generic[_DocumentType]): .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session._implicit: return self._session return None @@ -1032,9 +1031,10 @@ class Cursor(Generic[_DocumentType]): cursor_id, address = self._prepare_to_die(already_killed) self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1052,9 +1052,9 @@ class Cursor(Generic[_DocumentType]): address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index e30f97817..0d129ba97 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -611,6 +611,8 @@ class Database(common.BaseObject, Generic[_DocumentType]): common.validate_is_mapping("clusteredIndex", clustered_index) with self._client._tmp_session(session) as s: + if s and not s.in_transaction: + s._leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( @@ -619,6 +621,8 @@ class Database(common.BaseObject, Generic[_DocumentType]): and name in self._list_collection_names(filter={"name": name}, session=s) ): raise CollectionInvalid("collection %s already exists" % name) + if s: + s._leave_alive = False coll = Collection( self, name, @@ -699,13 +703,12 @@ class Database(common.BaseObject, Generic[_DocumentType]): .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - with self.client._tmp_session(session, close=False) as s: + with self.client._tmp_session(session) as s: cmd = _DatabaseAggregationCommand( self, CommandCursor, pipeline, kwargs, - session is not None, user_fields={"cursor": {"firstBatch": 1}}, ) return self.client._retryable_read( @@ -1009,7 +1012,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): else: command_name = next(iter(command)) - with self._client._tmp_session(session, close=False) as tmp_session: + with self._client._tmp_session(session) as tmp_session: opts = codec_options or DEFAULT_CODEC_OPTIONS if read_preference is None: @@ -1039,7 +1042,6 @@ class Database(common.BaseObject, Generic[_DocumentType]): conn.address, max_await_time_ms=max_await_time_ms, session=tmp_session, - explicit_session=session is not None, comment=comment, ) cmd_cursor._maybe_pin_connection(conn) @@ -1085,7 +1087,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): ) cmd = {"listCollections": 1, "cursor": {}} cmd.update(kwargs) - with self._client._tmp_session(session, close=False) as tmp_session: + with self._client._tmp_session(session) as tmp_session: cursor = ( self._command(conn, cmd, read_preference=read_preference, session=tmp_session) )["cursor"] @@ -1094,7 +1096,6 @@ class Database(common.BaseObject, Generic[_DocumentType]): cursor, conn.address, session=tmp_session, - explicit_session=session is not None, comment=cmd.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index ef0663584..6e716402f 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2044,17 +2044,18 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): retryable = bool( retryable and self.options.retry_reads and not (session and session.in_transaction) ) - return self._retry_internal( - func, - session, - None, - operation, - is_read=True, - address=address, - read_pref=read_pref, - retryable=retryable, - operation_id=operation_id, - ) + with self._tmp_session(session) as s: + return self._retry_internal( + func, + s, + None, + operation, + is_read=True, + address=address, + read_pref=read_pref, + retryable=retryable, + operation_id=operation_id, + ) def _retryable_write( self, @@ -2087,7 +2088,6 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from __del__ without locking. @@ -2102,7 +2102,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and not explicit_session: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() def _cleanup_cursor_lock( @@ -2111,7 +2111,6 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from cursor.close() using a lock. @@ -2123,7 +2122,6 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. - :param explicit_session: True if the session was passed explicitly. """ if cursor_id: if conn_mgr and conn_mgr.more_to_come: @@ -2136,7 +2134,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: conn_mgr.close() - if session and not explicit_session: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() def _close_cursor_now( @@ -2217,7 +2215,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): for address, cursor_id, conn_mgr in pinned_cursors: try: - self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) + self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it @@ -2262,7 +2260,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): @contextlib.contextmanager def _tmp_session( - self, session: Optional[client_session.ClientSession], close: bool = True + self, session: Optional[client_session.ClientSession] ) -> Generator[Optional[client_session.ClientSession], None]: """If provided session is None, lend a temporary session.""" if session is not None: @@ -2287,7 +2285,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): raise finally: # Call end_session when we exit this scope. - if close: + if not s._attached_to_cursor: s.end_session() else: yield None diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index f7f6a26c6..66258fda1 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -626,7 +626,7 @@ class Connection: # signals and throws KeyboardInterrupt into the current frame on the # main thread. # - # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue, + # But in Gevent, the polling mechanism (epoll, kqueue, # ..) is called in Python code, which experiences the signal as a # KeyboardInterrupt from the start, rather than as an initial # socket.error, so we catch that, close the socket, and reraise it. diff --git a/pyproject.toml b/pyproject.toml index 53fbfc8c1..2130c61a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,6 @@ dev = [ pip = ["pip"] # TODO: PYTHON-5464 gevent = ["gevent", "cffi>=2.0.0b1;python_version=='3.14'"] -eventlet = ["eventlet"] coverage = [ "pytest-cov", "coverage>=5,<=7.10.6" @@ -113,15 +112,12 @@ filterwarnings = [ "module:.*WindowsSelectorEventLoopPolicy:DeprecationWarning", "module:.*et_event_loop_policy:DeprecationWarning", # TODO: Remove as part of PYTHON-3923. - "module:unclosed