Compare commits

...

20 Commits

Author SHA1 Message Date
mongodb-dbx-release-bot[bot]
966d6c7d99
BUMP 4.15.6.dev0
Signed-off-by: mongodb-dbx-release-bot[bot] <167856002+mongodb-dbx-release-bot[bot]@users.noreply.github.com>
2025-12-02 18:48:51 +00:00
Casey Clements
7a8420caad
PYTHON-5642 Prep for 4.15.5 patch release. (#2633) 2025-12-02 13:22:37 -05:00
Noah Stapp
0480525c22
[v4.15] PYTHON-5642 - getMore operations should do server selection if the server is unknown (#2624) 2025-11-24 14:25:54 -05:00
mongodb-dbx-release-bot[bot]
dfecbdb438
BUMP 4.15.5.dev0
Signed-off-by: mongodb-dbx-release-bot[bot] <167856002+mongodb-dbx-release-bot[bot]@users.noreply.github.com>
2025-11-11 20:56:50 +00:00
Steven Silvester
07d51b10a3
PYTHON-5639 Prep for 4.15.4 release (#2614) 2025-11-11 14:32:15 -06:00
Steven Silvester
477217f666
[TASK]-[PYTHON-5623]: [v4.15] Change with_transaction callback return type to Awaitable (#2612)
Co-authored-by: Noah Stapp <noah.stapp@mongodb.com>
Co-authored-by: Logan Pulley <logan@pulley.host>
2025-11-11 13:42:52 -06:00
Noah Stapp
f3ea3738bf
PYTHON-5624 - Add Python 3.14 trove classifier (#2593) 2025-10-20 12:32:15 -04:00
mongodb-dbx-release-bot[bot]
a5a50a89df
BUMP 4.15.4.dev0
Signed-off-by: mongodb-dbx-release-bot[bot] <167856002+mongodb-dbx-release-bot[bot]@users.noreply.github.com>
2025-10-07 22:01:45 +00:00
Steven Silvester
6116811407
PYTHON-5609 Prepare for 4.15.3 Release (#2584) 2025-10-07 15:51:44 -05:00
mongodb-dbx-release-bot[bot]
fef10d26aa
PYTHON-5596 Fix return type for distinct methods (#2576) [v4.15] (#2583)
Co-authored-by: Steven Silvester <steven.silvester@ieee.org>
2025-10-07 14:08:33 -05:00
Noah Stapp
039c35ba63
[v4.15] PYTHON-5571 - Fix memory leak when raising InvalidDocument with C extensions (#2579) 2025-10-07 08:02:42 -04:00
mongodb-dbx-release-bot[bot]
a71c96d2ed
BUMP 4.15.3.dev0
Signed-off-by: mongodb-dbx-release-bot[bot] <167856002+mongodb-dbx-release-bot[bot]@users.noreply.github.com>
2025-10-01 21:29:53 +00:00
Casey Clements
eda41f3e87
final preparation for v4.15.2 release (#2569) 2025-10-01 17:04:10 -04:00
Steven Silvester
f33e832beb
PYTHON-5584 Add wheels for Python 3.14 and 3.14t (#2568) 2025-09-30 11:11:21 -05:00
Steven Silvester
e0b96544fb
PYTHON-5544 [v4.15] Revert changes to base protocol layer (#2538) 2025-09-16 10:54:58 -05:00
mongodb-dbx-release-bot[bot]
09fa287839
PYTHON-5537 Update typing dependencies (#2524) [v4.15] (#2536)
Co-authored-by: Steven Silvester <steven.silvester@ieee.org>
2025-09-16 09:16:22 -05:00
Jeffrey A. Clark
7b2b221eec
Prep 4.15.1 (#2530)
Co-authored-by: Noah Stapp <noah@noahstapp.com>
2025-09-12 12:52:51 -04:00
Steven Silvester
ebe8bfb564
PYTHON-5542 Prepare for 4.15.1 Release (#2527) 2025-09-11 16:41:31 -04:00
mongodb-dbx-release-bot[bot]
bc4ee39aac
PYTHON-5540 Fix usage of text_opts for older versions of pymongocrypt (#2525) [v4.15] (#2526)
Co-authored-by: Steven Silvester <steven.silvester@ieee.org>
2025-09-11 05:32:34 -05:00
mongodb-dbx-release-bot[bot]
e2107c22dd
Prep branch v4.15
Signed-off-by: mongodb-dbx-release-bot[bot] <167856002+mongodb-dbx-release-bot[bot]@users.noreply.github.com>
2025-09-10 17:31:27 +00:00
35 changed files with 963 additions and 548 deletions

View File

@ -70,24 +70,16 @@ jobs:
platforms: all platforms: all
- name: Install cibuildwheel - name: Install cibuildwheel
# Note: the default manylinux is manylinux2014 # Note: the default manylinux is manylinux_2_28
run: | run: |
python -m pip install -U pip python -m pip install -U pip
python -m pip install "cibuildwheel>=2.20,<3" python -m pip install "cibuildwheel>=3.2.0,<4"
- name: Build wheels - name: Build wheels
env: env:
CIBW_BUILD: ${{ matrix.buildplat[2] }} CIBW_BUILD: ${{ matrix.buildplat[2] }}
run: python -m cibuildwheel --output-dir wheelhouse run: python -m cibuildwheel --output-dir wheelhouse
- name: Build manylinux1 wheels
if: ${{ matrix.buildplat[1] == 'manylinux_x86_64' || matrix.buildplat[1] == 'manylinux_i686' }}
env:
CIBW_MANYLINUX_X86_64_IMAGE: manylinux1
CIBW_MANYLINUX_I686_IMAGE: manylinux1
CIBW_BUILD: "cp39-${{ matrix.buildplat[1] }} cp39-${{ matrix.buildplat[1] }}"
run: python -m cibuildwheel --output-dir wheelhouse
- name: Assert all versions in wheelhouse - name: Assert all versions in wheelhouse
if: ${{ ! startsWith(matrix.buildplat[1], 'macos') }} if: ${{ ! startsWith(matrix.buildplat[1], 'macos') }}
run: | run: |
@ -96,8 +88,9 @@ jobs:
ls wheelhouse/*cp311*.whl ls wheelhouse/*cp311*.whl
ls wheelhouse/*cp312*.whl ls wheelhouse/*cp312*.whl
ls wheelhouse/*cp313*.whl ls wheelhouse/*cp313*.whl
ls wheelhouse/*cp314*.whl
# Free-threading builds: # Free-threading builds:
ls wheelhouse/*cp313t*.whl ls wheelhouse/*cp314t*.whl
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
with: with:
@ -107,7 +100,7 @@ jobs:
make_sdist: make_sdist:
name: Make SDist name: Make SDist
runs-on: macos-13 runs-on: macos-14
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v5
with: with:

View File

@ -16,7 +16,7 @@ env:
# Changes per repo # Changes per repo
PRODUCT_NAME: PyMongo PRODUCT_NAME: PyMongo
# Changes per branch # Changes per branch
EVERGREEN_PROJECT: mongo-python-driver EVERGREEN_PROJECT: mongo-python-driver-release
# Constant # Constant
# inputs will be empty on a scheduled run. so, we only set dry_run # inputs will be empty on a scheduled run. so, we only set dry_run
# to 'false' when the input is set to 'false'. # to 'false' when the input is set to 'false'.

View File

@ -1657,10 +1657,10 @@ void handle_invalid_doc_error(PyObject* dict) {
} }
if (evalue && PyErr_GivenExceptionMatches(etype, InvalidDocument)) { if (evalue && PyErr_GivenExceptionMatches(etype, InvalidDocument)) {
PyObject *msg = PyObject_Str(evalue); msg = PyObject_Str(evalue);
if (msg) { if (msg) {
// Prepend doc to the existing message // Prepend doc to the existing message
PyObject *dict_str = PyObject_Str(dict); dict_str = PyObject_Str(dict);
if (dict_str == NULL) { if (dict_str == NULL) {
goto cleanup; goto cleanup;
} }
@ -1672,15 +1672,17 @@ void handle_invalid_doc_error(PyObject* dict) {
if (msg_utf8 == NULL) { if (msg_utf8 == NULL) {
goto cleanup; goto cleanup;
} }
PyObject *new_msg = PyUnicode_FromFormat("Invalid document %s | %s", dict_str_utf8, msg_utf8); new_msg = PyUnicode_FromFormat("Invalid document %s | %s", dict_str_utf8, msg_utf8);
Py_DECREF(evalue); Py_DECREF(evalue);
Py_DECREF(etype); Py_DECREF(etype);
etype = InvalidDocument; etype = InvalidDocument;
InvalidDocument = NULL; InvalidDocument = NULL;
if (new_msg) { if (new_msg) {
evalue = new_msg; evalue = new_msg;
new_msg = NULL;
} else { } else {
evalue = msg; evalue = msg;
msg = NULL;
} }
} }
PyErr_NormalizeException(&etype, &evalue, &etrace); PyErr_NormalizeException(&etype, &evalue, &etrace);

View File

@ -298,7 +298,7 @@ class Binary(bytes):
def __new__( def __new__(
cls: Type[Binary], cls: Type[Binary],
data: Union[memoryview, bytes, _mmap, _array[Any]], data: Union[memoryview, bytes, bytearray, _mmap, _array[Any]],
subtype: int = BINARY_SUBTYPE, subtype: int = BINARY_SUBTYPE,
) -> Binary: ) -> Binary:
if not isinstance(subtype, int): if not isinstance(subtype, int):

View File

@ -60,7 +60,9 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS as DEFAULT
def _inflate_bson( def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument], raw_array: bool = False bson_bytes: bytes | memoryview,
codec_options: CodecOptions[RawBSONDocument],
raw_array: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Inflates the top level fields of a BSON document. """Inflates the top level fields of a BSON document.
@ -85,7 +87,9 @@ class RawBSONDocument(Mapping[str, Any]):
__codec_options: CodecOptions[RawBSONDocument] __codec_options: CodecOptions[RawBSONDocument]
def __init__( def __init__(
self, bson_bytes: bytes, codec_options: Optional[CodecOptions[RawBSONDocument]] = None self,
bson_bytes: bytes | memoryview,
codec_options: Optional[CodecOptions[RawBSONDocument]] = None,
) -> None: ) -> None:
"""Create a new :class:`RawBSONDocument` """Create a new :class:`RawBSONDocument`
@ -135,7 +139,7 @@ class RawBSONDocument(Mapping[str, Any]):
_get_object_size(bson_bytes, 0, len(bson_bytes)) _get_object_size(bson_bytes, 0, len(bson_bytes))
@property @property
def raw(self) -> bytes: def raw(self) -> bytes | memoryview:
"""The raw BSON bytes composing this document.""" """The raw BSON bytes composing this document."""
return self.__raw return self.__raw
@ -153,7 +157,7 @@ class RawBSONDocument(Mapping[str, Any]):
@staticmethod @staticmethod
def _inflate_bson( def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument] bson_bytes: bytes | memoryview, codec_options: CodecOptions[RawBSONDocument]
) -> Mapping[str, Any]: ) -> Mapping[str, Any]:
return _inflate_bson(bson_bytes, codec_options) return _inflate_bson(bson_bytes, codec_options)
@ -180,7 +184,7 @@ class _RawArrayBSONDocument(RawBSONDocument):
@staticmethod @staticmethod
def _inflate_bson( def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument] bson_bytes: bytes | memoryview, codec_options: CodecOptions[RawBSONDocument]
) -> Mapping[str, Any]: ) -> Mapping[str, Any]:
return _inflate_bson(bson_bytes, codec_options, raw_array=True) return _inflate_bson(bson_bytes, codec_options, raw_array=True)

View File

@ -143,7 +143,7 @@ class SON(Dict[_Key, _Value]):
del self[k] del self[k]
return (k, v) return (k, v)
def update(self, other: Optional[Any] = None, **kwargs: _Value) -> None: # type: ignore[override] def update(self, other: Optional[Any] = None, **kwargs: _Value) -> None:
# Make progressively weaker assumptions about "other" # Make progressively weaker assumptions about "other"
if other is None: if other is None:
pass pass

View File

@ -28,4 +28,4 @@ if TYPE_CHECKING:
_DocumentOut = Union[MutableMapping[str, Any], "RawBSONDocument"] _DocumentOut = Union[MutableMapping[str, Any], "RawBSONDocument"]
_DocumentType = TypeVar("_DocumentType", bound=Mapping[str, Any]) _DocumentType = TypeVar("_DocumentType", bound=Mapping[str, Any])
_DocumentTypeArg = TypeVar("_DocumentTypeArg", bound=Mapping[str, Any]) _DocumentTypeArg = TypeVar("_DocumentTypeArg", bound=Mapping[str, Any])
_ReadableBuffer = Union[bytes, memoryview, "mmap", "array"] # type: ignore[type-arg] _ReadableBuffer = Union[bytes, memoryview, bytearray, "mmap", "array"] # type: ignore[type-arg]

View File

@ -1,6 +1,92 @@
Changelog Changelog
========= =========
Changes in Version 4.15.5 (2025/12/02)
--------------------------------------
Version 4.15.5 is a bug fix release.
- Fixed a bug that could cause ``AutoReconnect("connection pool paused")`` errors when cursors fetched more documents from the database after SDAM heartbeat failures.
Issues Resolved
...............
See the `PyMongo 4.15.5 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.15.5 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=47640
Changes in Version 4.15.4 (2025/10/21)
--------------------------------------
Version 4.15.4 is a bug fix release.
- Relaxed the callback type of :meth:`~pymongo.asynchronous.client_session.AsyncClientSession.with_transaction` to allow the broader Awaitable type rather than only Coroutine objects.
- Added the missing Python 3.14 trove classifier to the package metadata.
Issues Resolved
...............
See the `PyMongo 4.15.4 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.15.4 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=47237
Changes in Version 4.15.3 (2025/10/07)
--------------------------------------
Version 4.15.3 is a bug fix release.
- Fixed a memory leak when raising :class:`bson.errors.InvalidDocument` with C extensions.
- Fixed the return type of the :meth:`~pymongo.asynchronous.collection.AsyncCollection.distinct`,
:meth:`~pymongo.synchronous.collection.Collection.distinct`, :meth:`pymongo.asynchronous.cursor.AsyncCursor.distinct`,
and :meth:`pymongo.asynchronous.cursor.AsyncCursor.distinct` methods.
Issues Resolved
...............
See the `PyMongo 4.15.3 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.15.3 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=47293
Changes in Version 4.15.2 (2025/10/01)
--------------------------------------
Version 4.15.2 is a bug fix release.
- Add wheels for Python 3.14 and 3.14t that were missing from 4.15.0 release. Drop the 3.13t wheel.
Issues Resolved
...............
See the `PyMongo 4.15.2 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.15.2 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=47186
Changes in Version 4.15.1 (2025/09/16)
--------------------------------------
Version 4.15.1 is a bug fix release.
- Fixed a bug in :meth:`~pymongo.synchronous.encryption.ClientEncryption.encrypt`
and :meth:`~pymongo.asynchronous.encryption.AsyncClientEncryption.encrypt`
that would cause a ``TypeError`` when using ``pymongocrypt<1.16`` by passing
an unsupported ``type_opts`` parameter even if Queryable Encryption text
queries beta was not used.
- Fixed a bug in ``AsyncMongoClient`` that caused a ``ServerSelectionTimeoutError``
when used with ``uvicorn``, ``FastAPI``, or ``uvloop``.
Issues Resolved
...............
See the `PyMongo 4.15.1 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 4.15.1 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=46486
Changes in Version 4.15.0 (2025/09/10) Changes in Version 4.15.0 (2025/09/10)
-------------------------------------- --------------------------------------
@ -13,8 +99,10 @@ PyMongo 4.15 brings a number of changes including:
:attr:`~pymongo.encryption.QueryType.SUBSTRINGPREVIEW`, :attr:`~pymongo.encryption.QueryType.SUBSTRINGPREVIEW`,
as part of the experimental Queryable Encryption text queries beta. as part of the experimental Queryable Encryption text queries beta.
``pymongocrypt>=1.16`` is required for text query support. ``pymongocrypt>=1.16`` is required for text query support.
- Added :class:`bson.decimal128.DecimalEncoder` and :class:`bson.decimal128.DecimalDecoder` - Added :class:`bson.decimal128.DecimalEncoder` and
to support encoding and decoding of BSON Decimal128 values to decimal.Decimal values using the TypeRegistry API. :class:`bson.decimal128.DecimalDecoder`
to support encoding and decoding of BSON Decimal128 values to
decimal.Decimal values using the TypeRegistry API.
- Added support for Windows ``arm64`` wheels. - Added support for Windows ``arm64`` wheels.
Changes in Version 4.14.1 (2025/08/19) Changes in Version 4.14.1 (2025/08/19)
@ -22,8 +110,9 @@ Changes in Version 4.14.1 (2025/08/19)
Version 4.14.1 is a bug fix release. Version 4.14.1 is a bug fix release.
- Fixed a bug in ``MongoClient.append_metadata()`` and ``AsyncMongoClient.append_metadata()`` - Fixed a bug in ``MongoClient.append_metadata()`` and
that allowed duplicate ``DriverInfo.name`` to be appended to the metadata. ``AsyncMongoClient.append_metadata()``
that allowed duplicate ``DriverInfo.name`` to be appended to the metadata.
Issues Resolved Issues Resolved
............... ...............

View File

@ -22,7 +22,7 @@ work with MongoDB from Python.
Getting Help Getting Help
------------ ------------
If you're having trouble or have questions about PyMongo, ask your question on If you're having trouble or have questions about PyMongo, ask your question on
our `MongoDB Community Forum <https://www.mongodb.com/community/forums/tag/python>`_. one of the platforms listed on `Technical Support <https://www.mongodb.com/docs/manual/support/>`_.
You may also want to consider a You may also want to consider a
`commercial support subscription <https://support.mongodb.com/welcome>`_. `commercial support subscription <https://support.mongodb.com/welcome>`_.
Once you get an answer, it'd be great if you could work it back into this Once you get an answer, it'd be great if you could work it back into this
@ -37,7 +37,7 @@ project.
Feature Requests / Feedback Feature Requests / Feedback
--------------------------- ---------------------------
Use our `feedback engine <https://feedback.mongodb.com/forums/924286-drivers>`_ Use our `feedback engine <https://feedback.mongodb.com/?category=7548141816650747033>`_
to send us feature requests and general feedback about PyMongo. to send us feature requests and general feedback about PyMongo.
Contributing Contributing

View File

@ -18,7 +18,7 @@ from __future__ import annotations
import re import re
from typing import List, Tuple, Union from typing import List, Tuple, Union
__version__ = "4.16.0.dev0" __version__ = "4.15.6.dev0"
def get_version_tuple(version: str) -> Tuple[Union[int, str], ...]: def get_version_tuple(version: str) -> Tuple[Union[int, str], ...]:

View File

@ -143,8 +143,8 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
AsyncContextManager, AsyncContextManager,
Awaitable,
Callable, Callable,
Coroutine,
Mapping, Mapping,
MutableMapping, MutableMapping,
NoReturn, NoReturn,
@ -600,7 +600,7 @@ class AsyncClientSession:
async def with_transaction( async def with_transaction(
self, self,
callback: Callable[[AsyncClientSession], Coroutine[Any, Any, _T]], callback: Callable[[AsyncClientSession], Awaitable[_T]],
read_concern: Optional[ReadConcern] = None, read_concern: Optional[ReadConcern] = None,
write_concern: Optional[WriteConcern] = None, write_concern: Optional[WriteConcern] = None,
read_preference: Optional[_ServerMode] = None, read_preference: Optional[_ServerMode] = None,

View File

@ -3150,7 +3150,7 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
comment: Optional[Any] = None, comment: Optional[Any] = None,
hint: Optional[_IndexKeyHint] = None, hint: Optional[_IndexKeyHint] = None,
**kwargs: Any, **kwargs: Any,
) -> list[str]: ) -> list[Any]:
"""Get a list of distinct values for `key` among all documents """Get a list of distinct values for `key` among all documents
in this collection. in this collection.

View File

@ -1009,7 +1009,7 @@ class AsyncCursor(Generic[_DocumentType]):
else: else:
if not isinstance(key, RE_TYPE): if not isinstance(key, RE_TYPE):
key = copy.deepcopy(key, memo) # noqa: PLW2901 key = copy.deepcopy(key, memo) # noqa: PLW2901
y[key] = value y[key] = value # type:ignore[index]
return y return y
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
@ -1064,7 +1064,7 @@ class AsyncCursor(Generic[_DocumentType]):
"""Explicitly close / kill this cursor.""" """Explicitly close / kill this cursor."""
await self._die_lock() await self._die_lock()
async def distinct(self, key: str) -> list[str]: async def distinct(self, key: str) -> list[Any]:
"""Get a list of distinct values for `key` among all documents """Get a list of distinct values for `key` among all documents
in the result set of this query. in the result set of this query.

View File

@ -64,7 +64,6 @@ from pymongo.asynchronous.collection import AsyncCollection
from pymongo.asynchronous.cursor import AsyncCursor from pymongo.asynchronous.cursor import AsyncCursor
from pymongo.asynchronous.database import AsyncDatabase from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.mongo_client import AsyncMongoClient from pymongo.asynchronous.mongo_client import AsyncMongoClient
from pymongo.asynchronous.pool import AsyncBaseConnection
from pymongo.common import CONNECT_TIMEOUT from pymongo.common import CONNECT_TIMEOUT
from pymongo.daemon import _spawn_daemon from pymongo.daemon import _spawn_daemon
from pymongo.encryption_options import AutoEncryptionOpts, RangeOpts, TextOpts from pymongo.encryption_options import AutoEncryptionOpts, RangeOpts, TextOpts
@ -77,11 +76,11 @@ from pymongo.errors import (
ServerSelectionTimeoutError, ServerSelectionTimeoutError,
) )
from pymongo.helpers_shared import _get_timeout_details from pymongo.helpers_shared import _get_timeout_details
from pymongo.network_layer import PyMongoKMSProtocol, async_receive_kms, async_sendall from pymongo.network_layer import async_socket_sendall
from pymongo.operations import UpdateOne from pymongo.operations import UpdateOne
from pymongo.pool_options import PoolOptions from pymongo.pool_options import PoolOptions
from pymongo.pool_shared import ( from pymongo.pool_shared import (
_configured_protocol_interface, _async_configured_socket,
_raise_connection_failure, _raise_connection_failure,
) )
from pymongo.read_concern import ReadConcern from pymongo.read_concern import ReadConcern
@ -94,8 +93,10 @@ from pymongo.write_concern import WriteConcern
if TYPE_CHECKING: if TYPE_CHECKING:
from pymongocrypt.mongocrypt import MongoCryptKmsContext from pymongocrypt.mongocrypt import MongoCryptKmsContext
from pymongo.pyopenssl_context import _sslConn
from pymongo.typings import _Address from pymongo.typings import _Address
_IS_SYNC = False _IS_SYNC = False
_HTTPS_PORT = 443 _HTTPS_PORT = 443
@ -110,10 +111,9 @@ _DATA_KEY_OPTS: CodecOptions[dict[str, Any]] = CodecOptions(
_KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument) _KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument)
async def _connect_kms(address: _Address, opts: PoolOptions) -> AsyncBaseConnection: async def _connect_kms(address: _Address, opts: PoolOptions) -> Union[socket.socket, _sslConn]:
try: try:
interface = await _configured_protocol_interface(address, opts, PyMongoKMSProtocol) return await _async_configured_socket(address, opts)
return AsyncBaseConnection(interface, opts)
except Exception as exc: except Exception as exc:
_raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts)) _raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts))
@ -198,11 +198,19 @@ class _EncryptionIO(AsyncMongoCryptCallback): # type: ignore[misc]
try: try:
conn = await _connect_kms(address, opts) conn = await _connect_kms(address, opts)
try: try:
await async_sendall(conn.conn.get_conn, message) await async_socket_sendall(conn, message)
while kms_context.bytes_needed > 0: while kms_context.bytes_needed > 0:
# CSOT: update timeout. # CSOT: update timeout.
conn.set_conn_timeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0)) conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = await async_receive_kms(conn, kms_context.bytes_needed) data: memoryview | bytes
if _IS_SYNC:
data = conn.recv(kms_context.bytes_needed)
else:
from pymongo.network_layer import ( # type: ignore[attr-defined]
async_receive_data_socket,
)
data = await async_receive_data_socket(conn, kms_context.bytes_needed)
if not data: if not data:
raise OSError("KMS connection closed") raise OSError("KMS connection closed")
kms_context.feed(data) kms_context.feed(data)
@ -221,7 +229,7 @@ class _EncryptionIO(AsyncMongoCryptCallback): # type: ignore[misc]
address, exc, msg_prefix=msg_prefix, timeout_details=_get_timeout_details(opts) address, exc, msg_prefix=msg_prefix, timeout_details=_get_timeout_details(opts)
) )
finally: finally:
await conn.close_conn(None) conn.close()
except MongoCryptError: except MongoCryptError:
raise # Propagate MongoCryptError errors directly. raise # Propagate MongoCryptError errors directly.
except Exception as exc: except Exception as exc:
@ -264,7 +272,7 @@ class _EncryptionIO(AsyncMongoCryptCallback): # type: ignore[misc]
args.extend(self.opts._mongocryptd_spawn_args) args.extend(self.opts._mongocryptd_spawn_args)
_spawn_daemon(args) _spawn_daemon(args)
async def mark_command(self, database: str, cmd: bytes) -> bytes: async def mark_command(self, database: str, cmd: bytes) -> bytes | memoryview:
"""Mark a command for encryption. """Mark a command for encryption.
:param database: The database on which to run this command. :param database: The database on which to run this command.
@ -291,7 +299,7 @@ class _EncryptionIO(AsyncMongoCryptCallback): # type: ignore[misc]
) )
return res.raw return res.raw
async def fetch_keys(self, filter: bytes) -> AsyncGenerator[bytes, None]: async def fetch_keys(self, filter: bytes) -> AsyncGenerator[bytes | memoryview, None]:
"""Yields one or more keys from the key vault. """Yields one or more keys from the key vault.
:param filter: The filter to pass to find. :param filter: The filter to pass to find.
@ -463,7 +471,7 @@ class _Encrypter:
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd. # TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
return _inflate_bson(encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS) return _inflate_bson(encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS)
async def decrypt(self, response: bytes) -> Optional[bytes]: async def decrypt(self, response: bytes | memoryview) -> Optional[bytes]:
"""Decrypt a MongoDB command response. """Decrypt a MongoDB command response.
:param response: A MongoDB command response as BSON. :param response: A MongoDB command response as BSON.
@ -935,7 +943,8 @@ class AsyncClientEncryption(Generic[_DocumentType]):
contention_factor=contention_factor, contention_factor=contention_factor,
range_opts=range_opts_bytes, range_opts=range_opts_bytes,
is_expression=is_expression, is_expression=is_expression,
text_opts=text_opts_bytes, # For compatibility with pymongocrypt < 1.16:
**{"text_opts": text_opts_bytes} if text_opts_bytes else {},
) )
return decode(encrypted_doc)["v"] return decode(encrypted_doc)["v"]

View File

@ -78,7 +78,7 @@ async def _getaddrinfo(
socket.SocketKind, socket.SocketKind,
int, int,
str, str,
tuple[str, int] | tuple[str, int, int, int], tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
] ]
]: ]:
if not _IS_SYNC: if not _IS_SYNC:

View File

@ -123,19 +123,74 @@ except ImportError:
_IS_SYNC = False _IS_SYNC = False
class AsyncBaseConnection: class AsyncConnection:
"""A base connection object for server and kms connections.""" """Store a connection with some metadata.
def __init__(self, conn: AsyncNetworkingInterface, opts: PoolOptions): :param conn: a raw connection object
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""
def __init__(
self,
conn: AsyncNetworkingInterface,
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn self.conn = conn
self.socket_checker: SocketChecker = SocketChecker() self.address = address
self.cancel_context: _CancellationContext = _CancellationContext() self.id = id
self.is_sdam = False self.is_sdam = is_sdam
self.closed = False self.closed = False
self.last_timeout: float | None = None self.last_checkin_time = time.monotonic()
self.more_to_come = False self.performed_handshake = False
self.opts = opts self.is_writable: bool = False
self.max_wire_version = -1 self.max_wire_version = MAX_WIRE_VERSION
self.max_bson_size = MAX_BSON_SIZE
self.max_message_size = MAX_MESSAGE_SIZE
self.max_write_batch_size = MAX_WRITE_BATCH_SIZE
self.supports_sessions = False
self.hello_ok: bool = False
self.is_mongos = False
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.socket_checker: SocketChecker = SocketChecker()
self.oidc_token_gen_id: Optional[int] = None
# Support for mechanism negotiation on the initial handshake.
self.negotiated_mechs: Optional[list[str]] = None
self.auth_ctx: Optional[_AuthContext] = None
# The pool's generation changes with each reset() so we can close
# sockets created before the last reset.
self.pool_gen = pool.gen
self.generation = self.pool_gen.get_overall()
self.ready = False
self.cancel_context: _CancellationContext = _CancellationContext()
self.opts = pool.opts
self.more_to_come: bool = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
# When executing a transaction in load balancing mode, this flag is
# set to true to indicate that the session now owns the connection.
self.pinned_txn = False
self.pinned_cursor = False
self.active = False
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def set_conn_timeout(self, timeout: Optional[float]) -> None: def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout.""" """Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -164,111 +219,17 @@ class AsyncBaseConnection:
formatted = format_timeout_details(timeout_details) formatted = format_timeout_details(timeout_details)
# CSOT: raise an error without running the command since we know it will time out. # CSOT: raise an error without running the command since we know it will time out.
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}" errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}"
if self.max_wire_version != -1: raise ExecutionTimeout(
raise ExecutionTimeout( errmsg,
errmsg, 50,
50, {"ok": 0, "errmsg": errmsg, "code": 50},
{"ok": 0, "errmsg": errmsg, "code": 50}, self.max_wire_version,
self.max_wire_version, )
)
else:
raise TimeoutError(errmsg)
if cmd is not None: if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000) cmd["maxTimeMS"] = int(max_time_ms * 1000)
self.set_conn_timeout(timeout) self.set_conn_timeout(timeout)
return timeout return timeout
async def close_conn(self, reason: Optional[str]) -> None:
"""Close this connection with a reason."""
if self.closed:
return
await self._close_conn()
async def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
await self.conn.close()
except Exception: # noqa: S110
pass
def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()
class AsyncConnection(AsyncBaseConnection):
"""Store a connection with some metadata.
:param conn: a raw connection object
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""
def __init__(
self,
conn: AsyncNetworkingInterface,
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
super().__init__(conn, pool.opts)
self.pool_ref = weakref.ref(pool)
self.address: tuple[str, int] = address
self.id: int = id
self.is_sdam = is_sdam
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
self.is_writable: bool = False
self.max_wire_version = MAX_WIRE_VERSION
self.max_bson_size: int = MAX_BSON_SIZE
self.max_message_size: int = MAX_MESSAGE_SIZE
self.max_write_batch_size: int = MAX_WRITE_BATCH_SIZE
self.supports_sessions = False
self.hello_ok: bool = False
self.is_mongos: bool = False
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.oidc_token_gen_id: Optional[int] = None
# Support for mechanism negotiation on the initial handshake.
self.negotiated_mechs: Optional[list[str]] = None
self.auth_ctx: Optional[_AuthContext] = None
# The pool's generation changes with each reset() so we can close
# sockets created before the last reset.
self.pool_gen = pool.gen
self.generation = self.pool_gen.get_overall()
self.ready = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
# When executing a transaction in load balancing mode, this flag is
# set to true to indicate that the session now owns the connection.
self.pinned_txn = False
self.pinned_cursor = False
self.active = False
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def pin_txn(self) -> None: def pin_txn(self) -> None:
self.pinned_txn = True self.pinned_txn = True
assert not self.pinned_cursor assert not self.pinned_cursor
@ -612,6 +573,26 @@ class AsyncConnection(AsyncBaseConnection):
error=reason, error=reason,
) )
async def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
await self.conn.close()
except Exception: # noqa: S110
pass
def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()
def send_cluster_time( def send_cluster_time(
self, self,
command: MutableMapping[str, Any], command: MutableMapping[str, Any],

View File

@ -152,7 +152,7 @@ class ZstdContext:
return zstandard.ZstdCompressor().compress(data) return zstandard.ZstdCompressor().compress(data)
def decompress(data: bytes, compressor_id: int) -> bytes: def decompress(data: bytes | memoryview, compressor_id: int) -> bytes:
if compressor_id == SnappyContext.compressor_id: if compressor_id == SnappyContext.compressor_id:
# python-snappy doesn't support the buffer interface. # python-snappy doesn't support the buffer interface.
# https://github.com/andrix/python-snappy/issues/65 # https://github.com/andrix/python-snappy/issues/65

View File

@ -1352,7 +1352,9 @@ class _OpReply:
UNPACK_FROM = struct.Struct("<iqii").unpack_from UNPACK_FROM = struct.Struct("<iqii").unpack_from
OP_CODE = 1 OP_CODE = 1
def __init__(self, flags: int, cursor_id: int, number_returned: int, documents: bytes): def __init__(
self, flags: int, cursor_id: int, number_returned: int, documents: bytes | memoryview
):
self.flags = flags self.flags = flags
self.cursor_id = Int64(cursor_id) self.cursor_id = Int64(cursor_id)
self.number_returned = number_returned self.number_returned = number_returned
@ -1360,7 +1362,7 @@ class _OpReply:
def raw_response( def raw_response(
self, cursor_id: Optional[int] = None, user_fields: Optional[Mapping[str, Any]] = None self, cursor_id: Optional[int] = None, user_fields: Optional[Mapping[str, Any]] = None
) -> list[bytes]: ) -> list[bytes | memoryview]:
"""Check the response header from the database, without decoding BSON. """Check the response header from the database, without decoding BSON.
Check the response for errors and unpack. Check the response for errors and unpack.
@ -1448,7 +1450,7 @@ class _OpReply:
return False return False
@classmethod @classmethod
def unpack(cls, msg: bytes) -> _OpReply: def unpack(cls, msg: bytes | memoryview) -> _OpReply:
"""Construct an _OpReply from raw bytes.""" """Construct an _OpReply from raw bytes."""
# PYTHON-945: ignore starting_from field. # PYTHON-945: ignore starting_from field.
flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg) flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg)
@ -1470,7 +1472,7 @@ class _OpMsg:
MORE_TO_COME = 1 << 1 MORE_TO_COME = 1 << 1
EXHAUST_ALLOWED = 1 << 16 # Only present on requests. EXHAUST_ALLOWED = 1 << 16 # Only present on requests.
def __init__(self, flags: int, payload_document: bytes): def __init__(self, flags: int, payload_document: bytes | memoryview):
self.flags = flags self.flags = flags
self.payload_document = payload_document self.payload_document = payload_document
@ -1512,7 +1514,7 @@ class _OpMsg:
"""Unpack a command response.""" """Unpack a command response."""
return self.unpack_response(codec_options=codec_options)[0] return self.unpack_response(codec_options=codec_options)[0]
def raw_command_response(self) -> bytes: def raw_command_response(self) -> bytes | memoryview:
"""Return the bytes of the command response.""" """Return the bytes of the command response."""
return self.payload_document return self.payload_document
@ -1522,7 +1524,7 @@ class _OpMsg:
return bool(self.flags & self.MORE_TO_COME) return bool(self.flags & self.MORE_TO_COME)
@classmethod @classmethod
def unpack(cls, msg: bytes) -> _OpMsg: def unpack(cls, msg: bytes | memoryview) -> _OpMsg:
"""Construct an _OpMsg from raw bytes.""" """Construct an _OpMsg from raw bytes."""
flags, first_payload_type, first_payload_size = cls.UNPACK_FROM(msg) flags, first_payload_type, first_payload_size = cls.UNPACK_FROM(msg)
if flags != 0: if flags != 0:
@ -1541,7 +1543,7 @@ class _OpMsg:
return cls(flags, payload_document) return cls(flags, payload_document)
_UNPACK_REPLY: dict[int, Callable[[bytes], Union[_OpReply, _OpMsg]]] = { _UNPACK_REPLY: dict[int, Callable[[bytes | memoryview], Union[_OpReply, _OpMsg]]] = {
_OpReply.OP_CODE: _OpReply.unpack, _OpReply.OP_CODE: _OpReply.unpack,
_OpMsg.OP_CODE: _OpMsg.unpack, _OpMsg.OP_CODE: _OpMsg.unpack,
} }

View File

@ -22,11 +22,10 @@ import socket
import struct import struct
import sys import sys
import time import time
from asyncio import BaseTransport, BufferedProtocol, Future, Protocol, Transport from asyncio import AbstractEventLoop, BaseTransport, BufferedProtocol, Future, Transport
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Callable,
Optional, Optional,
Union, Union,
) )
@ -39,30 +38,208 @@ from pymongo.errors import ProtocolError, _OperationCancelled
from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply
from pymongo.socket_checker import _errno_from_exception from pymongo.socket_checker import _errno_from_exception
if TYPE_CHECKING: try:
from pymongo.asynchronous.pool import AsyncBaseConnection, AsyncConnection from ssl import SSLError, SSLSocket
_HAVE_SSL = True
except ImportError:
_HAVE_SSL = False
try:
from pymongo.pyopenssl_context import _sslConn from pymongo.pyopenssl_context import _sslConn
from pymongo.synchronous.pool import BaseConnection, Connection
_HAVE_PYOPENSSL = True
except ImportError:
_HAVE_PYOPENSSL = False
_sslConn = SSLSocket # type: ignore[assignment, misc]
from pymongo.ssl_support import (
BLOCKING_IO_LOOKUP_ERROR,
BLOCKING_IO_READ_ERROR,
BLOCKING_IO_WRITE_ERROR,
)
if TYPE_CHECKING:
from pymongo.asynchronous.pool import AsyncConnection
from pymongo.synchronous.pool import Connection
_UNPACK_HEADER = struct.Struct("<iiii").unpack _UNPACK_HEADER = struct.Struct("<iiii").unpack
_UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack _UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack
_POLL_TIMEOUT = 0.5 _POLL_TIMEOUT = 0.5
_PYPY = "PyPy" in sys.version
_WINDOWS = sys.platform == "win32"
# Errors raised by sockets (and TLS sockets) when in non-blocking mode. # Errors raised by sockets (and TLS sockets) when in non-blocking mode.
BLOCKING_IO_ERRORS = ( BLOCKING_IO_ERRORS = (BlockingIOError, *BLOCKING_IO_LOOKUP_ERROR, *ssl_support.BLOCKING_IO_ERRORS)
BlockingIOError,
*ssl_support.BLOCKING_IO_LOOKUP_ERROR,
*ssl_support.BLOCKING_IO_ERRORS, # These socket-based I/O methods are for KMS requests and any other network operations that do not use
) # the MongoDB wire protocol
async def async_socket_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
timeout = sock.gettimeout()
sock.settimeout(0.0)
loop = asyncio.get_running_loop()
try:
if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
await asyncio.wait_for(_async_socket_sendall_ssl(sock, buf, loop), timeout=timeout)
else:
await asyncio.wait_for(loop.sock_sendall(sock, buf), timeout=timeout) # type: ignore[arg-type]
except asyncio.TimeoutError as exc:
# Convert the asyncio.wait_for timeout error to socket.timeout which pool.py understands.
raise socket.timeout("timed out") from exc
finally:
sock.settimeout(timeout)
if sys.platform != "win32":
async def _async_socket_sendall_ssl(
sock: Union[socket.socket, _sslConn], buf: bytes, loop: AbstractEventLoop
) -> None:
view = memoryview(buf)
sent = 0
def _is_ready(fut: Future[Any]) -> None:
if fut.done():
return
fut.set_result(None)
while sent < len(buf):
try:
sent += sock.send(view[sent:]) # type:ignore[arg-type]
except BLOCKING_IO_ERRORS as exc:
fd = sock.fileno()
# Check for closed socket.
if fd == -1:
raise SSLError("Underlying socket has been closed") from None
if isinstance(exc, BLOCKING_IO_READ_ERROR):
fut = loop.create_future()
loop.add_reader(fd, _is_ready, fut)
try:
await fut
finally:
loop.remove_reader(fd)
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
fut = loop.create_future()
loop.add_writer(fd, _is_ready, fut)
try:
await fut
finally:
loop.remove_writer(fd)
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
fut = loop.create_future()
loop.add_reader(fd, _is_ready, fut)
try:
loop.add_writer(fd, _is_ready, fut)
await fut
finally:
loop.remove_reader(fd)
loop.remove_writer(fd)
async def _async_socket_receive_ssl(
conn: _sslConn, length: int, loop: AbstractEventLoop, once: Optional[bool] = False
) -> memoryview:
mv = memoryview(bytearray(length))
total_read = 0
def _is_ready(fut: Future[Any]) -> None:
if fut.done():
return
fut.set_result(None)
while total_read < length:
try:
read = conn.recv_into(mv[total_read:])
if read == 0:
raise OSError("connection closed")
# KMS responses update their expected size after the first batch, stop reading after one loop
if once:
return mv[:read]
total_read += read
except BLOCKING_IO_ERRORS as exc:
fd = conn.fileno()
# Check for closed socket.
if fd == -1:
raise SSLError("Underlying socket has been closed") from None
if isinstance(exc, BLOCKING_IO_READ_ERROR):
fut = loop.create_future()
loop.add_reader(fd, _is_ready, fut)
try:
await fut
finally:
loop.remove_reader(fd)
if isinstance(exc, BLOCKING_IO_WRITE_ERROR):
fut = loop.create_future()
loop.add_writer(fd, _is_ready, fut)
try:
await fut
finally:
loop.remove_writer(fd)
if _HAVE_PYOPENSSL and isinstance(exc, BLOCKING_IO_LOOKUP_ERROR):
fut = loop.create_future()
loop.add_reader(fd, _is_ready, fut)
try:
loop.add_writer(fd, _is_ready, fut)
await fut
finally:
loop.remove_reader(fd)
loop.remove_writer(fd)
return mv
else:
# The default Windows asyncio event loop does not support loop.add_reader/add_writer:
# https://docs.python.org/3/library/asyncio-platforms.html#asyncio-platform-support
# Note: In PYTHON-4493 we plan to replace this code with asyncio streams.
async def _async_socket_sendall_ssl(
sock: Union[socket.socket, _sslConn], buf: bytes, dummy: AbstractEventLoop
) -> None:
view = memoryview(buf)
total_length = len(buf)
total_sent = 0
# Backoff starts at 1ms, doubles on timeout up to 512ms, and halves on success
# down to 1ms.
backoff = 0.001
while total_sent < total_length:
try:
sent = sock.send(view[total_sent:])
except BLOCKING_IO_ERRORS:
await asyncio.sleep(backoff)
sent = 0
if sent > 0:
backoff = max(backoff / 2, 0.001)
else:
backoff = min(backoff * 2, 0.512)
total_sent += sent
async def _async_socket_receive_ssl(
conn: _sslConn, length: int, dummy: AbstractEventLoop, once: Optional[bool] = False
) -> memoryview:
mv = memoryview(bytearray(length))
total_read = 0
# Backoff starts at 1ms, doubles on timeout up to 512ms, and halves on success
# down to 1ms.
backoff = 0.001
while total_read < length:
try:
read = conn.recv_into(mv[total_read:])
if read == 0:
raise OSError("connection closed")
# KMS responses update their expected size after the first batch, stop reading after one loop
if once:
return mv[:read]
except BLOCKING_IO_ERRORS:
await asyncio.sleep(backoff)
read = 0
if read > 0:
backoff = max(backoff / 2, 0.001)
else:
backoff = min(backoff * 2, 0.512)
total_read += read
return mv
def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: def sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
sock.sendall(buf) sock.sendall(buf)
async def _poll_cancellation(conn: AsyncBaseConnection) -> None: async def _poll_cancellation(conn: AsyncConnection) -> None:
while True: while True:
if conn.cancel_context.cancelled: if conn.cancel_context.cancelled:
return return
@ -70,7 +247,49 @@ async def _poll_cancellation(conn: AsyncBaseConnection) -> None:
await asyncio.sleep(_POLL_TIMEOUT) await asyncio.sleep(_POLL_TIMEOUT)
def wait_for_read(conn: BaseConnection, deadline: Optional[float]) -> None: async def async_receive_data_socket(
sock: Union[socket.socket, _sslConn], length: int
) -> memoryview:
sock_timeout = sock.gettimeout()
timeout = sock_timeout
sock.settimeout(0.0)
loop = asyncio.get_running_loop()
try:
if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
return await asyncio.wait_for(
_async_socket_receive_ssl(sock, length, loop, once=True), # type: ignore[arg-type]
timeout=timeout,
)
else:
return await asyncio.wait_for(
_async_socket_receive(sock, length, loop), # type: ignore[arg-type]
timeout=timeout,
)
except asyncio.TimeoutError as err:
raise socket.timeout("timed out") from err
finally:
sock.settimeout(sock_timeout)
async def _async_socket_receive(
conn: socket.socket, length: int, loop: AbstractEventLoop
) -> memoryview:
mv = memoryview(bytearray(length))
bytes_read = 0
while bytes_read < length:
chunk_length = await loop.sock_recv_into(conn, mv[bytes_read:])
if chunk_length == 0:
raise OSError("connection closed")
bytes_read += chunk_length
return mv
_PYPY = "PyPy" in sys.version
_WINDOWS = sys.platform == "win32"
def wait_for_read(conn: Connection, deadline: Optional[float]) -> None:
"""Block until at least one byte is read, or a timeout, or a cancel.""" """Block until at least one byte is read, or a timeout, or a cancel."""
sock = conn.conn.sock sock = conn.conn.sock
timed_out = False timed_out = False
@ -103,7 +322,7 @@ def wait_for_read(conn: BaseConnection, deadline: Optional[float]) -> None:
raise socket.timeout("timed out") raise socket.timeout("timed out")
def receive_data(conn: BaseConnection, length: int, deadline: Optional[float]) -> memoryview: def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> memoryview:
buf = bytearray(length) buf = bytearray(length)
mv = memoryview(buf) mv = memoryview(buf)
bytes_read = 0 bytes_read = 0
@ -193,7 +412,7 @@ class NetworkingInterfaceBase:
class AsyncNetworkingInterface(NetworkingInterfaceBase): class AsyncNetworkingInterface(NetworkingInterfaceBase):
def __init__(self, conn: tuple[Transport, PyMongoBaseProtocol]): def __init__(self, conn: tuple[Transport, PyMongoProtocol]):
super().__init__(conn) super().__init__(conn)
@property @property
@ -211,7 +430,7 @@ class AsyncNetworkingInterface(NetworkingInterfaceBase):
return self.conn[0].is_closing() return self.conn[0].is_closing()
@property @property
def get_conn(self) -> PyMongoBaseProtocol: def get_conn(self) -> PyMongoProtocol:
return self.conn[1] return self.conn[1]
@property @property
@ -246,55 +465,13 @@ class NetworkingInterface(NetworkingInterfaceBase):
def fileno(self) -> int: def fileno(self) -> int:
return self.conn.fileno() return self.conn.fileno()
def recv_into(self, buffer: bytes) -> int: def recv_into(self, buffer: bytes | memoryview) -> int:
return self.conn.recv_into(buffer) return self.conn.recv_into(buffer)
class PyMongoBaseProtocol(Protocol): class PyMongoProtocol(BufferedProtocol):
def __init__(self, timeout: Optional[float] = None): def __init__(self, timeout: Optional[float] = None):
self.transport: Transport = None # type: ignore[assignment] self.transport: Transport = None # type: ignore[assignment]
self._timeout = timeout
self._closed = asyncio.get_running_loop().create_future()
self._connection_lost = False
def settimeout(self, timeout: float | None) -> None:
self._timeout = timeout
@property
def gettimeout(self) -> float | None:
"""The configured timeout for the socket that underlies our protocol pair."""
return self._timeout
def close(self, exc: Optional[Exception] = None) -> None:
self.transport.abort()
self._resolve_pending(exc)
self._connection_lost = True
def connection_lost(self, exc: Optional[Exception] = None) -> None:
self._resolve_pending(exc)
if not self._closed.done():
self._closed.set_result(None)
def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
pass
async def wait_closed(self) -> None:
await self._closed
async def write(self, message: bytes) -> None:
"""Write a message to this connection's transport."""
if self.transport.is_closing():
raise OSError("Connection is closed")
self.transport.write(message)
self.transport.resume_reading()
async def read(self, *args: Any) -> Any:
raise NotImplementedError
class PyMongoProtocol(PyMongoBaseProtocol, BufferedProtocol):
def __init__(self, timeout: Optional[float] = None):
super().__init__(timeout)
# Each message is reader in 2-3 parts: header, compression header, and message body # Each message is reader in 2-3 parts: header, compression header, and message body
# The message buffer is allocated after the header is read. # The message buffer is allocated after the header is read.
self._header = memoryview(bytearray(16)) self._header = memoryview(bytearray(16))
@ -308,14 +485,25 @@ class PyMongoProtocol(PyMongoBaseProtocol, BufferedProtocol):
self._expecting_compression = False self._expecting_compression = False
self._message_size = 0 self._message_size = 0
self._op_code = 0 self._op_code = 0
self._connection_lost = False
self._read_waiter: Optional[Future[Any]] = None self._read_waiter: Optional[Future[Any]] = None
self._timeout = timeout
self._is_compressed = False self._is_compressed = False
self._compressor_id: Optional[int] = None self._compressor_id: Optional[int] = None
self._max_message_size = MAX_MESSAGE_SIZE self._max_message_size = MAX_MESSAGE_SIZE
self._response_to: Optional[int] = None self._response_to: Optional[int] = None
self._closed = asyncio.get_running_loop().create_future()
self._pending_messages: collections.deque[Future[Any]] = collections.deque() self._pending_messages: collections.deque[Future[Any]] = collections.deque()
self._done_messages: collections.deque[Future[Any]] = collections.deque() self._done_messages: collections.deque[Future[Any]] = collections.deque()
def settimeout(self, timeout: float | None) -> None:
self._timeout = timeout
@property
def gettimeout(self) -> float | None:
"""The configured timeout for the socket that underlies our protocol pair."""
return self._timeout
def connection_made(self, transport: BaseTransport) -> None: def connection_made(self, transport: BaseTransport) -> None:
"""Called exactly once when a connection is made. """Called exactly once when a connection is made.
The transport argument is the transport representing the write side of the connection. The transport argument is the transport representing the write side of the connection.
@ -323,6 +511,13 @@ class PyMongoProtocol(PyMongoBaseProtocol, BufferedProtocol):
self.transport = transport # type: ignore[assignment] self.transport = transport # type: ignore[assignment]
self.transport.set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE) self.transport.set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE)
async def write(self, message: bytes) -> None:
"""Write a message to this connection's transport."""
if self.transport.is_closing():
raise OSError("Connection is closed")
self.transport.write(message)
self.transport.resume_reading()
async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[bytes, int]: async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[bytes, int]:
"""Read a single MongoDB Wire Protocol message from this connection.""" """Read a single MongoDB Wire Protocol message from this connection."""
if self.transport: if self.transport:
@ -465,7 +660,7 @@ class PyMongoProtocol(PyMongoBaseProtocol, BufferedProtocol):
op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(self._compression_header) op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(self._compression_header)
return op_code, compressor_id return op_code, compressor_id
def _resolve_pending(self, exc: Optional[Exception] = None) -> None: def _resolve_pending_messages(self, exc: Optional[Exception] = None) -> None:
pending = list(self._pending_messages) pending = list(self._pending_messages)
for msg in pending: for msg in pending:
if not msg.done(): if not msg.done():
@ -475,92 +670,21 @@ class PyMongoProtocol(PyMongoBaseProtocol, BufferedProtocol):
msg.set_exception(exc) msg.set_exception(exc)
self._done_messages.append(msg) self._done_messages.append(msg)
def close(self, exc: Optional[Exception] = None) -> None:
self.transport.abort()
self._resolve_pending_messages(exc)
self._connection_lost = True
class PyMongoKMSProtocol(PyMongoBaseProtocol): def connection_lost(self, exc: Optional[Exception] = None) -> None:
def __init__(self, timeout: Optional[float] = None): self._resolve_pending_messages(exc)
super().__init__(timeout) if not self._closed.done():
self._buffers: collections.deque[memoryview[bytes]] = collections.deque() self._closed.set_result(None)
self._bytes_ready = 0
self._pending_reads: collections.deque[int] = collections.deque()
self._pending_listeners: collections.deque[Future[Any]] = collections.deque()
def connection_made(self, transport: BaseTransport) -> None: async def wait_closed(self) -> None:
"""Called exactly once when a connection is made. await self._closed
The transport argument is the transport representing the write side of the connection.
"""
self.transport = transport # type: ignore[assignment]
def data_received(self, data: bytes) -> None:
if self._connection_lost:
return
self._bytes_ready += len(data)
self._buffers.append(memoryview(data))
if not len(self._pending_reads):
return
bytes_needed = self._pending_reads.popleft()
data = self._read(bytes_needed)
waiter = self._pending_listeners.popleft()
waiter.set_result(data)
async def read(self, bytes_needed: int) -> bytes:
"""Read up to the requested bytes from this connection."""
# Note: all reads are "up-to" bytes_needed because we don't know if the kms_context
# has processed a Content-Length header and is requesting a response or not.
# Wait for other listeners first.
if len(self._pending_listeners):
await asyncio.gather(*self._pending_listeners)
# If there are bytes ready, then there is no need to wait further.
if self._bytes_ready > 0:
return self._read(bytes_needed)
if self.transport:
try:
self.transport.resume_reading()
# Known bug in SSL Protocols, fixed in Python 3.11: https://github.com/python/cpython/issues/89322
except AttributeError:
raise OSError("connection is already closed") from None
if self.transport and self.transport.is_closing():
raise OSError("connection is already closed")
self._pending_reads.append(bytes_needed)
read_waiter = asyncio.get_running_loop().create_future()
self._pending_listeners.append(read_waiter)
return await read_waiter
def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
while self._pending_listeners:
fut = self._pending_listeners.popleft()
fut.set_result(b"")
def _read(self, bytes_needed: int) -> memoryview:
"""Read bytes."""
# Send the bytes to the listener.
if self._bytes_ready < bytes_needed:
bytes_needed = self._bytes_ready
self._bytes_ready -= bytes_needed
output_buf = bytearray(bytes_needed)
n_remaining = bytes_needed
out_index = 0
while n_remaining > 0:
buffer = self._buffers.popleft()
buf_size = len(buffer)
# if we didn't exhaust the buffer, read the partial data and return the buffer.
if buf_size > n_remaining:
output_buf[out_index : n_remaining + out_index] = buffer[:n_remaining]
buffer = buffer[n_remaining:]
n_remaining = 0
self._buffers.appendleft(buffer)
# otherwise exhaust the buffer.
else:
output_buf[out_index : out_index + buf_size] = buffer[:]
out_index += buf_size
n_remaining -= buf_size
return memoryview(output_buf)
async def async_sendall(conn: PyMongoBaseProtocol, buf: bytes) -> None: async def async_sendall(conn: PyMongoProtocol, buf: bytes) -> None:
try: try:
await asyncio.wait_for(conn.write(buf), timeout=conn.gettimeout) await asyncio.wait_for(conn.write(buf), timeout=conn.gettimeout)
except asyncio.TimeoutError as exc: except asyncio.TimeoutError as exc:
@ -568,18 +692,12 @@ async def async_sendall(conn: PyMongoBaseProtocol, buf: bytes) -> None:
raise socket.timeout("timed out") from exc raise socket.timeout("timed out") from exc
async def async_receive_kms(conn: AsyncBaseConnection, bytes_needed: int) -> bytes: async def async_receive_message(
"""Receive raw bytes from the kms connection.""" conn: AsyncConnection,
request_id: Optional[int],
def callback(result: Any) -> bytes: max_message_size: int = MAX_MESSAGE_SIZE,
return result ) -> Union[_OpReply, _OpMsg]:
"""Receive a raw BSON message or raise socket.error."""
return await _async_receive_data(conn, callback, bytes_needed)
async def _async_receive_data(
conn: AsyncBaseConnection, callback: Callable[..., Any], *args: Any
) -> Any:
timeout: Optional[Union[float, int]] timeout: Optional[Union[float, int]]
timeout = conn.conn.gettimeout timeout = conn.conn.gettimeout
if _csot.get_timeout(): if _csot.get_timeout():
@ -595,8 +713,8 @@ async def _async_receive_data(
# timeouts on AWS Lambda and other FaaS environments. # timeouts on AWS Lambda and other FaaS environments.
timeout = max(deadline - time.monotonic(), 0) timeout = max(deadline - time.monotonic(), 0)
read_task = create_task(conn.conn.get_conn.read(*args))
cancellation_task = create_task(_poll_cancellation(conn)) cancellation_task = create_task(_poll_cancellation(conn))
read_task = create_task(conn.conn.get_conn.read(request_id, max_message_size))
tasks = [read_task, cancellation_task] tasks = [read_task, cancellation_task]
try: try:
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
@ -609,7 +727,14 @@ async def _async_receive_data(
if len(done) == 0: if len(done) == 0:
raise socket.timeout("timed out") raise socket.timeout("timed out")
if read_task in done: if read_task in done:
return callback(read_task.result()) data, op_code = read_task.result()
try:
unpack_reply = _UNPACK_REPLY[op_code]
except KeyError:
raise ProtocolError(
f"Got opcode {op_code!r} but expected {_UNPACK_REPLY.keys()!r}"
) from None
return unpack_reply(data)
raise _OperationCancelled("operation cancelled") raise _OperationCancelled("operation cancelled")
except asyncio.CancelledError: except asyncio.CancelledError:
for task in tasks: for task in tasks:
@ -618,31 +743,6 @@ async def _async_receive_data(
raise raise
async def async_receive_message(
conn: AsyncConnection,
request_id: Optional[int],
max_message_size: int = MAX_MESSAGE_SIZE,
) -> Union[_OpReply, _OpMsg]:
"""Receive a raw BSON message or raise socket.error."""
def callback(result: Any) -> _OpMsg | _OpReply:
data, op_code = result
try:
unpack_reply = _UNPACK_REPLY[op_code]
except KeyError:
raise ProtocolError(
f"Got opcode {op_code!r} but expected {_UNPACK_REPLY.keys()!r}"
) from None
return unpack_reply(data)
return await _async_receive_data(conn, callback, request_id, max_message_size)
def receive_kms(conn: BaseConnection, bytes_needed: int) -> bytes:
"""Receive raw bytes from the kms connection."""
return conn.conn.sock.recv(bytes_needed)
def receive_message( def receive_message(
conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
) -> Union[_OpReply, _OpMsg]: ) -> Union[_OpReply, _OpMsg]:
@ -670,6 +770,7 @@ def receive_message(
f"Message length ({length!r}) is larger than server max " f"Message length ({length!r}) is larger than server max "
f"message size ({max_message_size!r})" f"message size ({max_message_size!r})"
) )
data: memoryview | bytes
if op_code == 2012: if op_code == 2012:
op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(receive_data(conn, 9, deadline)) op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(receive_data(conn, 9, deadline))
data = decompress(receive_data(conn, length - 25, deadline), compressor_id) data = decompress(receive_data(conn, length - 25, deadline), compressor_id)

View File

@ -16,6 +16,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import functools
import socket import socket
import ssl import ssl
import sys import sys
@ -24,6 +25,7 @@ from typing import (
Any, Any,
NoReturn, NoReturn,
Optional, Optional,
Union,
) )
from pymongo import _csot from pymongo import _csot
@ -35,17 +37,13 @@ from pymongo.errors import ( # type:ignore[attr-defined]
_CertificateError, _CertificateError,
) )
from pymongo.helpers_shared import _get_timeout_details, format_timeout_details from pymongo.helpers_shared import _get_timeout_details, format_timeout_details
from pymongo.network_layer import ( from pymongo.network_layer import AsyncNetworkingInterface, NetworkingInterface, PyMongoProtocol
AsyncNetworkingInterface,
NetworkingInterface,
PyMongoBaseProtocol,
PyMongoProtocol,
)
from pymongo.pool_options import PoolOptions from pymongo.pool_options import PoolOptions
from pymongo.ssl_support import PYSSLError, SSLError, _has_sni from pymongo.ssl_support import PYSSLError, SSLError, _has_sni
SSLErrors = (PYSSLError, SSLError) SSLErrors = (PYSSLError, SSLError)
if TYPE_CHECKING: if TYPE_CHECKING:
from pymongo.pyopenssl_context import _sslConn
from pymongo.typings import _Address from pymongo.typings import _Address
try: try:
@ -246,10 +244,64 @@ async def _async_create_connection(address: _Address, options: PoolOptions) -> s
raise OSError("getaddrinfo failed") raise OSError("getaddrinfo failed")
async def _async_configured_socket(
address: _Address, options: PoolOptions
) -> Union[socket.socket, _sslConn]:
"""Given (host, port) and PoolOptions, return a raw configured socket.
Can raise socket.error, ConnectionFailure, or _CertificateError.
Sets socket's SSL and timeout options.
"""
sock = await _async_create_connection(address, options)
ssl_context = options._ssl_context
if ssl_context is None:
sock.settimeout(options.socket_timeout)
return sock
host = address[0]
try:
# We have to pass hostname / ip address to wrap_socket
# to use SSLContext.check_hostname.
if _has_sni(False):
loop = asyncio.get_running_loop()
ssl_sock = await loop.run_in_executor(
None,
functools.partial(ssl_context.wrap_socket, sock, server_hostname=host), # type: ignore[assignment, misc, unused-ignore]
)
else:
loop = asyncio.get_running_loop()
ssl_sock = await loop.run_in_executor(None, ssl_context.wrap_socket, sock) # type: ignore[assignment, misc, unused-ignore]
except _CertificateError:
sock.close()
# Raise _CertificateError directly like we do after match_hostname
# below.
raise
except (OSError, *SSLErrors) as exc:
sock.close()
# We raise AutoReconnect for transient and permanent SSL handshake
# failures alike. Permanent handshake failures, like protocol
# mismatch, will be turned into ServerSelectionTimeoutErrors later.
details = _get_timeout_details(options)
_raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details)
if (
ssl_context.verify_mode
and not ssl_context.check_hostname
and not options.tls_allow_invalid_hostnames
):
try:
ssl.match_hostname(ssl_sock.getpeercert(), hostname=host) # type:ignore[attr-defined, unused-ignore]
except _CertificateError:
ssl_sock.close()
raise
ssl_sock.settimeout(options.socket_timeout)
return ssl_sock
async def _configured_protocol_interface( async def _configured_protocol_interface(
address: _Address, address: _Address, options: PoolOptions
options: PoolOptions,
protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol,
) -> AsyncNetworkingInterface: ) -> AsyncNetworkingInterface:
"""Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface.
@ -264,7 +316,7 @@ async def _configured_protocol_interface(
if ssl_context is None: if ssl_context is None:
return AsyncNetworkingInterface( return AsyncNetworkingInterface(
await asyncio.get_running_loop().create_connection( await asyncio.get_running_loop().create_connection(
lambda: protocol_kls(timeout=timeout), sock=sock lambda: PyMongoProtocol(timeout=timeout), sock=sock
) )
) )
@ -273,7 +325,7 @@ async def _configured_protocol_interface(
# We have to pass hostname / ip address to wrap_socket # We have to pass hostname / ip address to wrap_socket
# to use SSLContext.check_hostname. # to use SSLContext.check_hostname.
transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload] transport, protocol = await asyncio.get_running_loop().create_connection( # type: ignore[call-overload]
lambda: protocol_kls(timeout=timeout), lambda: PyMongoProtocol(timeout=timeout),
sock=sock, sock=sock,
server_hostname=host, server_hostname=host,
ssl=ssl_context, ssl=ssl_context,
@ -373,9 +425,56 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket
raise OSError("getaddrinfo failed") raise OSError("getaddrinfo failed")
def _configured_socket_interface( def _configured_socket(address: _Address, options: PoolOptions) -> Union[socket.socket, _sslConn]:
address: _Address, options: PoolOptions, *args: Any """Given (host, port) and PoolOptions, return a raw configured socket.
) -> NetworkingInterface:
Can raise socket.error, ConnectionFailure, or _CertificateError.
Sets socket's SSL and timeout options.
"""
sock = _create_connection(address, options)
ssl_context = options._ssl_context
if ssl_context is None:
sock.settimeout(options.socket_timeout)
return sock
host = address[0]
try:
# We have to pass hostname / ip address to wrap_socket
# to use SSLContext.check_hostname.
if _has_sni(True):
ssl_sock = ssl_context.wrap_socket(sock, server_hostname=host) # type: ignore[assignment, misc, unused-ignore]
else:
ssl_sock = ssl_context.wrap_socket(sock) # type: ignore[assignment, misc, unused-ignore]
except _CertificateError:
sock.close()
# Raise _CertificateError directly like we do after match_hostname
# below.
raise
except (OSError, *SSLErrors) as exc:
sock.close()
# We raise AutoReconnect for transient and permanent SSL handshake
# failures alike. Permanent handshake failures, like protocol
# mismatch, will be turned into ServerSelectionTimeoutErrors later.
details = _get_timeout_details(options)
_raise_connection_failure(address, exc, "SSL handshake failed: ", timeout_details=details)
if (
ssl_context.verify_mode
and not ssl_context.check_hostname
and not options.tls_allow_invalid_hostnames
):
try:
ssl.match_hostname(ssl_sock.getpeercert(), hostname=host) # type:ignore[attr-defined, unused-ignore]
except _CertificateError:
ssl_sock.close()
raise
ssl_sock.settimeout(options.socket_timeout)
return ssl_sock
def _configured_socket_interface(address: _Address, options: PoolOptions) -> NetworkingInterface:
"""Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket.
Can raise socket.error, ConnectionFailure, or _CertificateError. Can raise socket.error, ConnectionFailure, or _CertificateError.

View File

@ -3143,7 +3143,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
comment: Optional[Any] = None, comment: Optional[Any] = None,
hint: Optional[_IndexKeyHint] = None, hint: Optional[_IndexKeyHint] = None,
**kwargs: Any, **kwargs: Any,
) -> list[str]: ) -> list[Any]:
"""Get a list of distinct values for `key` among all documents """Get a list of distinct values for `key` among all documents
in this collection. in this collection.

View File

@ -1007,7 +1007,7 @@ class Cursor(Generic[_DocumentType]):
else: else:
if not isinstance(key, RE_TYPE): if not isinstance(key, RE_TYPE):
key = copy.deepcopy(key, memo) # noqa: PLW2901 key = copy.deepcopy(key, memo) # noqa: PLW2901
y[key] = value y[key] = value # type:ignore[index]
return y return y
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
@ -1062,7 +1062,7 @@ class Cursor(Generic[_DocumentType]):
"""Explicitly close / kill this cursor.""" """Explicitly close / kill this cursor."""
self._die_lock() self._die_lock()
def distinct(self, key: str) -> list[str]: def distinct(self, key: str) -> list[Any]:
"""Get a list of distinct values for `key` among all documents """Get a list of distinct values for `key` among all documents
in the result set of this query. in the result set of this query.

View File

@ -71,11 +71,11 @@ from pymongo.errors import (
ServerSelectionTimeoutError, ServerSelectionTimeoutError,
) )
from pymongo.helpers_shared import _get_timeout_details from pymongo.helpers_shared import _get_timeout_details
from pymongo.network_layer import PyMongoKMSProtocol, receive_kms, sendall from pymongo.network_layer import sendall
from pymongo.operations import UpdateOne from pymongo.operations import UpdateOne
from pymongo.pool_options import PoolOptions from pymongo.pool_options import PoolOptions
from pymongo.pool_shared import ( from pymongo.pool_shared import (
_configured_socket_interface, _configured_socket,
_raise_connection_failure, _raise_connection_failure,
) )
from pymongo.read_concern import ReadConcern from pymongo.read_concern import ReadConcern
@ -85,7 +85,6 @@ from pymongo.synchronous.collection import Collection
from pymongo.synchronous.cursor import Cursor from pymongo.synchronous.cursor import Cursor
from pymongo.synchronous.database import Database from pymongo.synchronous.database import Database
from pymongo.synchronous.mongo_client import MongoClient from pymongo.synchronous.mongo_client import MongoClient
from pymongo.synchronous.pool import BaseConnection
from pymongo.typings import _DocumentType, _DocumentTypeArg from pymongo.typings import _DocumentType, _DocumentTypeArg
from pymongo.uri_parser_shared import _parse_kms_tls_options, parse_host from pymongo.uri_parser_shared import _parse_kms_tls_options, parse_host
from pymongo.write_concern import WriteConcern from pymongo.write_concern import WriteConcern
@ -93,8 +92,10 @@ from pymongo.write_concern import WriteConcern
if TYPE_CHECKING: if TYPE_CHECKING:
from pymongocrypt.mongocrypt import MongoCryptKmsContext from pymongocrypt.mongocrypt import MongoCryptKmsContext
from pymongo.pyopenssl_context import _sslConn
from pymongo.typings import _Address from pymongo.typings import _Address
_IS_SYNC = True _IS_SYNC = True
_HTTPS_PORT = 443 _HTTPS_PORT = 443
@ -109,10 +110,9 @@ _DATA_KEY_OPTS: CodecOptions[dict[str, Any]] = CodecOptions(
_KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument) _KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument)
def _connect_kms(address: _Address, opts: PoolOptions) -> BaseConnection: def _connect_kms(address: _Address, opts: PoolOptions) -> Union[socket.socket, _sslConn]:
try: try:
interface = _configured_socket_interface(address, opts, PyMongoKMSProtocol) return _configured_socket(address, opts)
return BaseConnection(interface, opts)
except Exception as exc: except Exception as exc:
_raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts)) _raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts))
@ -197,11 +197,19 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore[misc]
try: try:
conn = _connect_kms(address, opts) conn = _connect_kms(address, opts)
try: try:
sendall(conn.conn.get_conn, message) sendall(conn, message)
while kms_context.bytes_needed > 0: while kms_context.bytes_needed > 0:
# CSOT: update timeout. # CSOT: update timeout.
conn.set_conn_timeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0)) conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))
data = receive_kms(conn, kms_context.bytes_needed) data: memoryview | bytes
if _IS_SYNC:
data = conn.recv(kms_context.bytes_needed)
else:
from pymongo.network_layer import ( # type: ignore[attr-defined]
receive_data_socket,
)
data = receive_data_socket(conn, kms_context.bytes_needed)
if not data: if not data:
raise OSError("KMS connection closed") raise OSError("KMS connection closed")
kms_context.feed(data) kms_context.feed(data)
@ -220,7 +228,7 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore[misc]
address, exc, msg_prefix=msg_prefix, timeout_details=_get_timeout_details(opts) address, exc, msg_prefix=msg_prefix, timeout_details=_get_timeout_details(opts)
) )
finally: finally:
conn.close_conn(None) conn.close()
except MongoCryptError: except MongoCryptError:
raise # Propagate MongoCryptError errors directly. raise # Propagate MongoCryptError errors directly.
except Exception as exc: except Exception as exc:
@ -261,7 +269,7 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore[misc]
args.extend(self.opts._mongocryptd_spawn_args) args.extend(self.opts._mongocryptd_spawn_args)
_spawn_daemon(args) _spawn_daemon(args)
def mark_command(self, database: str, cmd: bytes) -> bytes: def mark_command(self, database: str, cmd: bytes) -> bytes | memoryview:
"""Mark a command for encryption. """Mark a command for encryption.
:param database: The database on which to run this command. :param database: The database on which to run this command.
@ -288,7 +296,7 @@ class _EncryptionIO(MongoCryptCallback): # type: ignore[misc]
) )
return res.raw return res.raw
def fetch_keys(self, filter: bytes) -> Generator[bytes, None]: def fetch_keys(self, filter: bytes) -> Generator[bytes | memoryview, None]:
"""Yields one or more keys from the key vault. """Yields one or more keys from the key vault.
:param filter: The filter to pass to find. :param filter: The filter to pass to find.
@ -460,7 +468,7 @@ class _Encrypter:
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd. # TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
return _inflate_bson(encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS) return _inflate_bson(encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS)
def decrypt(self, response: bytes) -> Optional[bytes]: def decrypt(self, response: bytes | memoryview) -> Optional[bytes]:
"""Decrypt a MongoDB command response. """Decrypt a MongoDB command response.
:param response: A MongoDB command response as BSON. :param response: A MongoDB command response as BSON.
@ -928,7 +936,8 @@ class ClientEncryption(Generic[_DocumentType]):
contention_factor=contention_factor, contention_factor=contention_factor,
range_opts=range_opts_bytes, range_opts=range_opts_bytes,
is_expression=is_expression, is_expression=is_expression,
text_opts=text_opts_bytes, # For compatibility with pymongocrypt < 1.16:
**{"text_opts": text_opts_bytes} if text_opts_bytes else {},
) )
return decode(encrypted_doc)["v"] return decode(encrypted_doc)["v"]

View File

@ -78,7 +78,7 @@ def _getaddrinfo(
socket.SocketKind, socket.SocketKind,
int, int,
str, str,
tuple[str, int] | tuple[str, int, int, int], tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
] ]
]: ]:
if not _IS_SYNC: if not _IS_SYNC:

View File

@ -123,19 +123,74 @@ except ImportError:
_IS_SYNC = True _IS_SYNC = True
class BaseConnection: class Connection:
"""A base connection object for server and kms connections.""" """Store a connection with some metadata.
def __init__(self, conn: NetworkingInterface, opts: PoolOptions): :param conn: a raw connection object
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""
def __init__(
self,
conn: NetworkingInterface,
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn self.conn = conn
self.socket_checker: SocketChecker = SocketChecker() self.address = address
self.cancel_context: _CancellationContext = _CancellationContext() self.id = id
self.is_sdam = False self.is_sdam = is_sdam
self.closed = False self.closed = False
self.last_timeout: float | None = None self.last_checkin_time = time.monotonic()
self.more_to_come = False self.performed_handshake = False
self.opts = opts self.is_writable: bool = False
self.max_wire_version = -1 self.max_wire_version = MAX_WIRE_VERSION
self.max_bson_size = MAX_BSON_SIZE
self.max_message_size = MAX_MESSAGE_SIZE
self.max_write_batch_size = MAX_WRITE_BATCH_SIZE
self.supports_sessions = False
self.hello_ok: bool = False
self.is_mongos = False
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.socket_checker: SocketChecker = SocketChecker()
self.oidc_token_gen_id: Optional[int] = None
# Support for mechanism negotiation on the initial handshake.
self.negotiated_mechs: Optional[list[str]] = None
self.auth_ctx: Optional[_AuthContext] = None
# The pool's generation changes with each reset() so we can close
# sockets created before the last reset.
self.pool_gen = pool.gen
self.generation = self.pool_gen.get_overall()
self.ready = False
self.cancel_context: _CancellationContext = _CancellationContext()
self.opts = pool.opts
self.more_to_come: bool = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
# When executing a transaction in load balancing mode, this flag is
# set to true to indicate that the session now owns the connection.
self.pinned_txn = False
self.pinned_cursor = False
self.active = False
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def set_conn_timeout(self, timeout: Optional[float]) -> None: def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout.""" """Cache last timeout to avoid duplicate calls to conn.settimeout."""
@ -164,111 +219,17 @@ class BaseConnection:
formatted = format_timeout_details(timeout_details) formatted = format_timeout_details(timeout_details)
# CSOT: raise an error without running the command since we know it will time out. # CSOT: raise an error without running the command since we know it will time out.
errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}" errmsg = f"operation would exceed time limit, remaining timeout:{timeout:.5f} <= network round trip time:{rtt:.5f} {formatted}"
if self.max_wire_version != -1: raise ExecutionTimeout(
raise ExecutionTimeout( errmsg,
errmsg, 50,
50, {"ok": 0, "errmsg": errmsg, "code": 50},
{"ok": 0, "errmsg": errmsg, "code": 50}, self.max_wire_version,
self.max_wire_version, )
)
else:
raise TimeoutError(errmsg)
if cmd is not None: if cmd is not None:
cmd["maxTimeMS"] = int(max_time_ms * 1000) cmd["maxTimeMS"] = int(max_time_ms * 1000)
self.set_conn_timeout(timeout) self.set_conn_timeout(timeout)
return timeout return timeout
def close_conn(self, reason: Optional[str]) -> None:
"""Close this connection with a reason."""
if self.closed:
return
self._close_conn()
def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
self.conn.close()
except Exception: # noqa: S110
pass
def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()
class Connection(BaseConnection):
"""Store a connection with some metadata.
:param conn: a raw connection object
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""
def __init__(
self,
conn: NetworkingInterface,
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
super().__init__(conn, pool.opts)
self.pool_ref = weakref.ref(pool)
self.address: tuple[str, int] = address
self.id: int = id
self.is_sdam = is_sdam
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
self.is_writable: bool = False
self.max_wire_version = MAX_WIRE_VERSION
self.max_bson_size: int = MAX_BSON_SIZE
self.max_message_size: int = MAX_MESSAGE_SIZE
self.max_write_batch_size: int = MAX_WRITE_BATCH_SIZE
self.supports_sessions = False
self.hello_ok: bool = False
self.is_mongos: bool = False
self.op_msg_enabled = False
self.listeners = pool.opts._event_listeners
self.enabled_for_cmap = pool.enabled_for_cmap
self.enabled_for_logging = pool.enabled_for_logging
self.compression_settings = pool.opts._compression_settings
self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None
self.oidc_token_gen_id: Optional[int] = None
# Support for mechanism negotiation on the initial handshake.
self.negotiated_mechs: Optional[list[str]] = None
self.auth_ctx: Optional[_AuthContext] = None
# The pool's generation changes with each reset() so we can close
# sockets created before the last reset.
self.pool_gen = pool.gen
self.generation = self.pool_gen.get_overall()
self.ready = False
# For load balancer support.
self.service_id: Optional[ObjectId] = None
self.server_connection_id: Optional[int] = None
# When executing a transaction in load balancing mode, this flag is
# set to true to indicate that the session now owns the connection.
self.pinned_txn = False
self.pinned_cursor = False
self.active = False
self.last_timeout = self.opts.socket_timeout
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
def pin_txn(self) -> None: def pin_txn(self) -> None:
self.pinned_txn = True self.pinned_txn = True
assert not self.pinned_cursor assert not self.pinned_cursor
@ -610,6 +571,26 @@ class Connection(BaseConnection):
error=reason, error=reason,
) )
def _close_conn(self) -> None:
"""Close this connection."""
if self.closed:
return
self.closed = True
self.cancel_context.cancel()
# Note: We catch exceptions to avoid spurious errors on interpreter
# shutdown.
try:
self.conn.close()
except Exception: # noqa: S110
pass
def conn_closed(self) -> bool:
"""Return True if we know socket has been closed, False otherwise."""
if _IS_SYNC:
return self.socket_checker.socket_closed(self.conn.get_conn)
else:
return self.conn.is_closing()
def send_cluster_time( def send_cluster_time(
self, self,
command: MutableMapping[str, Any], command: MutableMapping[str, Any],

View File

@ -322,7 +322,7 @@ class TopologyDescription:
if address: if address:
# Ignore selectors when explicit address is requested. # Ignore selectors when explicit address is requested.
description = self.server_descriptions().get(address) description = self.server_descriptions().get(address)
return [description] if description else [] return [description] if description and description.is_server_type_known else []
# Primary selection fast path. # Primary selection fast path.
if self.topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary and type(selector) is Primary: if self.topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary and type(selector) is Primary:

View File

@ -35,6 +35,7 @@ classifiers = [
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: Database", "Topic :: Database",
"Typing :: Typed", "Typing :: Typed",
] ]
@ -270,8 +271,6 @@ partial_branches = ["if (.*and +)*not _use_c( and.*)*:"]
directory = "htmlcov" directory = "htmlcov"
[tool.cibuildwheel] [tool.cibuildwheel]
# Enable free-threaded support
free-threaded-support = true
skip = "pp* *-musllinux*" skip = "pp* *-musllinux*"
build-frontend = "build" build-frontend = "build"
test-command = "python {project}/tools/fail_if_no_c.py" test-command = "python {project}/tools/fail_if_no_c.py"

View File

@ -17,9 +17,10 @@ from __future__ import annotations
import os import os
import sys import sys
import time
from pathlib import Path from pathlib import Path
from pymongo import AsyncMongoClient, ReadPreference from pymongo import AsyncMongoClient, ReadPreference, monitoring
from pymongo.asynchronous.settings import TopologySettings from pymongo.asynchronous.settings import TopologySettings
from pymongo.asynchronous.topology import Topology from pymongo.asynchronous.topology import Topology
from pymongo.errors import ServerSelectionTimeoutError from pymongo.errors import ServerSelectionTimeoutError
@ -30,7 +31,7 @@ from pymongo.typings import strip_optional
sys.path[0:0] = [""] sys.path[0:0] = [""]
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous import AsyncIntegrationTest, async_client_context, client_knobs, unittest
from test.asynchronous.utils import async_wait_until from test.asynchronous.utils import async_wait_until
from test.asynchronous.utils_selection_tests import ( from test.asynchronous.utils_selection_tests import (
create_selection_tests, create_selection_tests,
@ -42,6 +43,7 @@ from test.utils_selection_tests_shared import (
) )
from test.utils_shared import ( from test.utils_shared import (
FunctionCallRecorder, FunctionCallRecorder,
HeartbeatEventListener,
OvertCommandListener, OvertCommandListener,
) )
@ -207,6 +209,40 @@ class TestCustomServerSelectorFunction(AsyncIntegrationTest):
) )
self.assertEqual(selector.call_count, 0) self.assertEqual(selector.call_count, 0)
@async_client_context.require_replica_set
@async_client_context.require_failCommand_appName
async def test_server_selection_getMore_blocks(self):
hb_listener = HeartbeatEventListener()
client = await self.async_rs_client(
event_listeners=[hb_listener], heartbeatFrequencyMS=500, appName="heartbeatFailedClient"
)
coll = client.db.test
await coll.drop()
docs = [{"x": 1} for _ in range(5)]
await coll.insert_many(docs)
fail_heartbeat = {
"configureFailPoint": "failCommand",
"mode": {"times": 4},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"closeConnection": True,
"appName": "heartbeatFailedClient",
},
}
def hb_failed(event):
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
cursor = coll.find({}, batch_size=1)
await cursor.next() # force initial query that will pin the address for the getMore
async with self.fail_point(fail_heartbeat):
await async_wait_until(
lambda: hb_listener.matching(hb_failed), "published failed event"
)
self.assertEqual(len(await cursor.to_list()), 4)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -15,6 +15,7 @@
"""Execute Transactions Spec tests.""" """Execute Transactions Spec tests."""
from __future__ import annotations from __future__ import annotations
import asyncio
import sys import sys
from io import BytesIO from io import BytesIO
from test.asynchronous.utils_spec_runner import AsyncSpecRunner from test.asynchronous.utils_spec_runner import AsyncSpecRunner
@ -469,6 +470,17 @@ class TestTransactionsConvenientAPI(AsyncTransactionsBase):
async with self.client.start_session() as s: async with self.client.start_session() as s:
self.assertEqual(await s.with_transaction(callback2), "Foo") self.assertEqual(await s.with_transaction(callback2), "Foo")
@async_client_context.require_transactions
@async_client_context.require_async
async def test_callback_awaitable_no_coroutine(self):
def callback(_):
future = asyncio.Future()
future.set_result("Foo")
return future
async with self.client.start_session() as s:
self.assertEqual(await s.with_transaction(callback), "Foo")
@async_client_context.require_transactions @async_client_context.require_transactions
async def test_callback_not_retried_after_timeout(self): async def test_callback_not_retried_after_timeout(self):
listener = OvertCommandListener() listener = OvertCommandListener()

View File

@ -17,9 +17,10 @@ from __future__ import annotations
import os import os
import sys import sys
import time
from pathlib import Path from pathlib import Path
from pymongo import MongoClient, ReadPreference from pymongo import MongoClient, ReadPreference, monitoring
from pymongo.errors import ServerSelectionTimeoutError from pymongo.errors import ServerSelectionTimeoutError
from pymongo.hello import HelloCompat from pymongo.hello import HelloCompat
from pymongo.operations import _Op from pymongo.operations import _Op
@ -30,7 +31,7 @@ from pymongo.typings import strip_optional
sys.path[0:0] = [""] sys.path[0:0] = [""]
from test import IntegrationTest, client_context, unittest from test import IntegrationTest, client_context, client_knobs, unittest
from test.utils import wait_until from test.utils import wait_until
from test.utils_selection_tests import ( from test.utils_selection_tests import (
create_selection_tests, create_selection_tests,
@ -42,6 +43,7 @@ from test.utils_selection_tests_shared import (
) )
from test.utils_shared import ( from test.utils_shared import (
FunctionCallRecorder, FunctionCallRecorder,
HeartbeatEventListener,
OvertCommandListener, OvertCommandListener,
) )
@ -205,6 +207,38 @@ class TestCustomServerSelectorFunction(IntegrationTest):
topology.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1) topology.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1)
self.assertEqual(selector.call_count, 0) self.assertEqual(selector.call_count, 0)
@client_context.require_replica_set
@client_context.require_failCommand_appName
def test_server_selection_getMore_blocks(self):
hb_listener = HeartbeatEventListener()
client = self.rs_client(
event_listeners=[hb_listener], heartbeatFrequencyMS=500, appName="heartbeatFailedClient"
)
coll = client.db.test
coll.drop()
docs = [{"x": 1} for _ in range(5)]
coll.insert_many(docs)
fail_heartbeat = {
"configureFailPoint": "failCommand",
"mode": {"times": 4},
"data": {
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
"closeConnection": True,
"appName": "heartbeatFailedClient",
},
}
def hb_failed(event):
return isinstance(event, monitoring.ServerHeartbeatFailedEvent)
cursor = coll.find({}, batch_size=1)
cursor.next() # force initial query that will pin the address for the getMore
with self.fail_point(fail_heartbeat):
wait_until(lambda: hb_listener.matching(hb_failed), "published failed event")
self.assertEqual(len(cursor.to_list()), 4)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -15,6 +15,7 @@
"""Execute Transactions Spec tests.""" """Execute Transactions Spec tests."""
from __future__ import annotations from __future__ import annotations
import asyncio
import sys import sys
from io import BytesIO from io import BytesIO
from test.utils_spec_runner import SpecRunner from test.utils_spec_runner import SpecRunner
@ -461,6 +462,17 @@ class TestTransactionsConvenientAPI(TransactionsBase):
with self.client.start_session() as s: with self.client.start_session() as s:
self.assertEqual(s.with_transaction(callback2), "Foo") self.assertEqual(s.with_transaction(callback2), "Foo")
@client_context.require_transactions
@client_context.require_async
def test_callback_awaitable_no_coroutine(self):
def callback(_):
future = asyncio.Future()
future.set_result("Foo")
return future
with self.client.start_session() as s:
self.assertEqual(s.with_transaction(callback), "Foo")
@client_context.require_transactions @client_context.require_transactions
def test_callback_not_retried_after_timeout(self): def test_callback_not_retried_after_timeout(self):
listener = OvertCommandListener() listener = OvertCommandListener()

View File

@ -69,7 +69,7 @@ sys.path[0:0] = [""]
from test import IntegrationTest, PyMongoTestCase, client_context from test import IntegrationTest, PyMongoTestCase, client_context
from bson import CodecOptions, decode, decode_all, decode_file_iter, decode_iter, encode from bson import CodecOptions, ObjectId, decode, decode_all, decode_file_iter, decode_iter, encode
from bson.raw_bson import RawBSONDocument from bson.raw_bson import RawBSONDocument
from bson.son import SON from bson.son import SON
from pymongo import ASCENDING, MongoClient from pymongo import ASCENDING, MongoClient
@ -141,6 +141,32 @@ class TestPymongo(IntegrationTest):
docs = to_list(cursor) docs = to_list(cursor)
self.assertTrue(docs) self.assertTrue(docs)
def test_distinct(self) -> None:
self.coll.delete_many({})
self.coll.insert_many(
[
{"_id": None},
{"_id": 0},
{"_id": ""},
{"_id": ObjectId()},
{"_id": True},
]
)
def collection_distinct(
collection: Collection,
) -> list[None | int | str | ObjectId | bool]:
return collection.distinct("_id")
def cursor_distinct(
collection: Collection,
) -> list[None | int | str | ObjectId | bool]:
cursor = collection.find()
return cursor.distinct("_id")
collection_distinct(self.coll)
cursor_distinct(self.coll)
@only_type_check @only_type_check
def test_bulk_write(self) -> None: def test_bulk_write(self) -> None:
self.coll.insert_one({}) self.coll.insert_one({})
@ -475,7 +501,7 @@ class TestDocumentType(PyMongoTestCase):
# This should fail because the output is a Movie. # This should fail because the output is a Movie.
assert out["foo"] # type:ignore[typeddict-item] assert out["foo"] # type:ignore[typeddict-item]
# pyright gives reportTypedDictNotRequiredAccess for the following: # pyright gives reportTypedDictNotRequiredAccess for the following:
assert out["_id"] # type:ignore assert out["_id"] # type:ignore[unused-ignore]
@only_type_check @only_type_check
def test_typeddict_empty_document_type(self) -> None: def test_typeddict_empty_document_type(self) -> None:
@ -496,7 +522,7 @@ class TestDocumentType(PyMongoTestCase):
out = coll.find_one({}) out = coll.find_one({})
assert out is not None assert out is not None
# pyright gives reportTypedDictNotRequiredAccess for the following: # pyright gives reportTypedDictNotRequiredAccess for the following:
assert out["_id"] # type:ignore assert out["_id"] # type:ignore[unused-ignore]
@only_type_check @only_type_check
def test_raw_bson_document_type(self) -> None: def test_raw_bson_document_type(self) -> None:

View File

@ -37,7 +37,7 @@ from test.helpers_shared import (
LOCAL_MASTER_KEY, LOCAL_MASTER_KEY,
) )
from test.utils_shared import CMAPListener, camel_to_snake, parse_collection_options from test.utils_shared import CMAPListener, camel_to_snake, parse_collection_options
from typing import Any, Union from typing import Any, MutableMapping, Union
from bson import ( from bson import (
RE_TYPE, RE_TYPE,
@ -162,7 +162,9 @@ def with_metaclass(meta, *bases):
return meta(name, resolved_bases, d) return meta(name, resolved_bases, d)
@classmethod @classmethod
def __prepare__(cls, name, this_bases): def __prepare__(
cls, name: str, this_bases: tuple[type, ...], /, **kwds: Any
) -> MutableMapping[str, object]:
return meta.__prepare__(name, bases) return meta.__prepare__(name, bases)
return type.__new__(metaclass, "temporary_class", (), {}) return type.__new__(metaclass, "temporary_class", (), {})

View File

@ -120,9 +120,9 @@ replacements = {
"_async_create_lock": "_create_lock", "_async_create_lock": "_create_lock",
"_async_create_condition": "_create_condition", "_async_create_condition": "_create_condition",
"_async_cond_wait": "_cond_wait", "_async_cond_wait": "_cond_wait",
"async_receive_kms": "receive_kms",
"AsyncNetworkingInterface": "NetworkingInterface", "AsyncNetworkingInterface": "NetworkingInterface",
"_configured_protocol_interface": "_configured_socket_interface", "_configured_protocol_interface": "_configured_socket_interface",
"_async_configured_socket": "_configured_socket",
"SpecRunnerTask": "SpecRunnerThread", "SpecRunnerTask": "SpecRunnerThread",
"AsyncMockConnection": "MockConnection", "AsyncMockConnection": "MockConnection",
"AsyncMockPool": "MockPool", "AsyncMockPool": "MockPool",
@ -322,6 +322,14 @@ def translate_coroutine_types(lines: list[str]) -> list[str]:
index = lines.index(type) index = lines.index(type)
new = type.replace(old, res.group(3)) new = type.replace(old, res.group(3))
lines[index] = new lines[index] = new
coroutine_types = [line for line in lines if "Awaitable[" in line]
for type in coroutine_types:
res = re.search(r"Awaitable\[([A-z]+)\]", type)
if res:
old = res[0]
index = lines.index(type)
new = type.replace(old, res.group(1))
lines[index] = new
return lines return lines

96
uv.lock generated
View File

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.9" requires-python = ">=3.9"
resolution-markers = [ resolution-markers = [
"python_full_version == '3.14.*'", "python_full_version == '3.14.*'",
@ -1047,46 +1047,53 @@ dependencies = [
[[package]] [[package]]
name = "mypy" name = "mypy"
version = "1.14.1" version = "1.17.1"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "mypy-extensions" }, { name = "mypy-extensions" },
{ name = "pathspec" },
{ name = "tomli", marker = "python_full_version < '3.11'" }, { name = "tomli", marker = "python_full_version < '3.11'" },
{ name = "typing-extensions" }, { name = "typing-extensions" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/b9/eb/2c92d8ea1e684440f54fa49ac5d9a5f19967b7b472a281f419e69a8d228e/mypy-1.14.1.tar.gz", hash = "sha256:7ec88144fe9b510e8475ec2f5f251992690fcf89ccb4500b214b4226abcd32d6", size = 3216051, upload-time = "2024-12-30T16:39:07.335Z" } sdist = { url = "https://files.pythonhosted.org/packages/8e/22/ea637422dedf0bf36f3ef238eab4e455e2a0dcc3082b5cc067615347ab8e/mypy-1.17.1.tar.gz", hash = "sha256:25e01ec741ab5bb3eec8ba9cdb0f769230368a22c959c4937360efb89b7e9f01", size = 3352570, upload-time = "2025-07-31T07:54:19.204Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/9b/7a/87ae2adb31d68402da6da1e5f30c07ea6063e9f09b5e7cfc9dfa44075e74/mypy-1.14.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:52686e37cf13d559f668aa398dd7ddf1f92c5d613e4f8cb262be2fb4fedb0fcb", size = 11211002, upload-time = "2024-12-30T16:37:22.435Z" }, { url = "https://files.pythonhosted.org/packages/77/a9/3d7aa83955617cdf02f94e50aab5c830d205cfa4320cf124ff64acce3a8e/mypy-1.17.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:3fbe6d5555bf608c47203baa3e72dbc6ec9965b3d7c318aa9a4ca76f465bd972", size = 11003299, upload-time = "2025-07-31T07:54:06.425Z" },
{ url = "https://files.pythonhosted.org/packages/e1/23/eada4c38608b444618a132be0d199b280049ded278b24cbb9d3fc59658e4/mypy-1.14.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1fb545ca340537d4b45d3eecdb3def05e913299ca72c290326be19b3804b39c0", size = 10358400, upload-time = "2024-12-30T16:37:53.526Z" }, { url = "https://files.pythonhosted.org/packages/83/e8/72e62ff837dd5caaac2b4a5c07ce769c8e808a00a65e5d8f94ea9c6f20ab/mypy-1.17.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:80ef5c058b7bce08c83cac668158cb7edea692e458d21098c7d3bce35a5d43e7", size = 10125451, upload-time = "2025-07-31T07:53:52.974Z" },
{ url = "https://files.pythonhosted.org/packages/43/c9/d6785c6f66241c62fd2992b05057f404237deaad1566545e9f144ced07f5/mypy-1.14.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:90716d8b2d1f4cd503309788e51366f07c56635a3309b0f6a32547eaaa36a64d", size = 12095172, upload-time = "2024-12-30T16:37:50.332Z" }, { url = "https://files.pythonhosted.org/packages/7d/10/f3f3543f6448db11881776f26a0ed079865926b0c841818ee22de2c6bbab/mypy-1.17.1-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c4a580f8a70c69e4a75587bd925d298434057fe2a428faaf927ffe6e4b9a98df", size = 11916211, upload-time = "2025-07-31T07:53:18.879Z" },
{ url = "https://files.pythonhosted.org/packages/c3/62/daa7e787770c83c52ce2aaf1a111eae5893de9e004743f51bfcad9e487ec/mypy-1.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2ae753f5c9fef278bcf12e1a564351764f2a6da579d4a81347e1d5a15819997b", size = 12828732, upload-time = "2024-12-30T16:37:29.96Z" }, { url = "https://files.pythonhosted.org/packages/06/bf/63e83ed551282d67bb3f7fea2cd5561b08d2bb6eb287c096539feb5ddbc5/mypy-1.17.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:dd86bb649299f09d987a2eebb4d52d10603224500792e1bee18303bbcc1ce390", size = 12652687, upload-time = "2025-07-31T07:53:30.544Z" },
{ url = "https://files.pythonhosted.org/packages/1b/a2/5fb18318a3637f29f16f4e41340b795da14f4751ef4f51c99ff39ab62e52/mypy-1.14.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e0fe0f5feaafcb04505bcf439e991c6d8f1bf8b15f12b05feeed96e9e7bf1427", size = 13012197, upload-time = "2024-12-30T16:38:05.037Z" }, { url = "https://files.pythonhosted.org/packages/69/66/68f2eeef11facf597143e85b694a161868b3b006a5fbad50e09ea117ef24/mypy-1.17.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:a76906f26bd8d51ea9504966a9c25419f2e668f012e0bdf3da4ea1526c534d94", size = 12896322, upload-time = "2025-07-31T07:53:50.74Z" },
{ url = "https://files.pythonhosted.org/packages/28/99/e153ce39105d164b5f02c06c35c7ba958aaff50a2babba7d080988b03fe7/mypy-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:7d54bd85b925e501c555a3227f3ec0cfc54ee8b6930bd6141ec872d1c572f81f", size = 9780836, upload-time = "2024-12-30T16:37:19.726Z" }, { url = "https://files.pythonhosted.org/packages/a3/87/8e3e9c2c8bd0d7e071a89c71be28ad088aaecbadf0454f46a540bda7bca6/mypy-1.17.1-cp310-cp310-win_amd64.whl", hash = "sha256:e79311f2d904ccb59787477b7bd5d26f3347789c06fcd7656fa500875290264b", size = 9507962, upload-time = "2025-07-31T07:53:08.431Z" },
{ url = "https://files.pythonhosted.org/packages/da/11/a9422850fd506edbcdc7f6090682ecceaf1f87b9dd847f9df79942da8506/mypy-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:f995e511de847791c3b11ed90084a7a0aafdc074ab88c5a9711622fe4751138c", size = 11120432, upload-time = "2024-12-30T16:37:11.533Z" }, { url = "https://files.pythonhosted.org/packages/46/cf/eadc80c4e0a70db1c08921dcc220357ba8ab2faecb4392e3cebeb10edbfa/mypy-1.17.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ad37544be07c5d7fba814eb370e006df58fed8ad1ef33ed1649cb1889ba6ff58", size = 10921009, upload-time = "2025-07-31T07:53:23.037Z" },
{ url = "https://files.pythonhosted.org/packages/b6/9e/47e450fd39078d9c02d620545b2cb37993a8a8bdf7db3652ace2f80521ca/mypy-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d64169ec3b8461311f8ce2fd2eb5d33e2d0f2c7b49116259c51d0d96edee48d1", size = 10279515, upload-time = "2024-12-30T16:37:40.724Z" }, { url = "https://files.pythonhosted.org/packages/5d/c1/c869d8c067829ad30d9bdae051046561552516cfb3a14f7f0347b7d973ee/mypy-1.17.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:064e2ff508e5464b4bd807a7c1625bc5047c5022b85c70f030680e18f37273a5", size = 10047482, upload-time = "2025-07-31T07:53:26.151Z" },
{ url = "https://files.pythonhosted.org/packages/01/b5/6c8d33bd0f851a7692a8bfe4ee75eb82b6983a3cf39e5e32a5d2a723f0c1/mypy-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ba24549de7b89b6381b91fbc068d798192b1b5201987070319889e93038967a8", size = 12025791, upload-time = "2024-12-30T16:36:58.73Z" }, { url = "https://files.pythonhosted.org/packages/98/b9/803672bab3fe03cee2e14786ca056efda4bb511ea02dadcedde6176d06d0/mypy-1.17.1-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:70401bbabd2fa1aa7c43bb358f54037baf0586f41e83b0ae67dd0534fc64edfd", size = 11832883, upload-time = "2025-07-31T07:53:47.948Z" },
{ url = "https://files.pythonhosted.org/packages/f0/4c/e10e2c46ea37cab5c471d0ddaaa9a434dc1d28650078ac1b56c2d7b9b2e4/mypy-1.14.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:183cf0a45457d28ff9d758730cd0210419ac27d4d3f285beda038c9083363b1f", size = 12749203, upload-time = "2024-12-30T16:37:03.741Z" }, { url = "https://files.pythonhosted.org/packages/88/fb/fcdac695beca66800918c18697b48833a9a6701de288452b6715a98cfee1/mypy-1.17.1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e92bdc656b7757c438660f775f872a669b8ff374edc4d18277d86b63edba6b8b", size = 12566215, upload-time = "2025-07-31T07:54:04.031Z" },
{ url = "https://files.pythonhosted.org/packages/88/55/beacb0c69beab2153a0f57671ec07861d27d735a0faff135a494cd4f5020/mypy-1.14.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f2a0ecc86378f45347f586e4163d1769dd81c5a223d577fe351f26b179e148b1", size = 12885900, upload-time = "2024-12-30T16:37:57.948Z" }, { url = "https://files.pythonhosted.org/packages/7f/37/a932da3d3dace99ee8eb2043b6ab03b6768c36eb29a02f98f46c18c0da0e/mypy-1.17.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:c1fdf4abb29ed1cb091cf432979e162c208a5ac676ce35010373ff29247bcad5", size = 12751956, upload-time = "2025-07-31T07:53:36.263Z" },
{ url = "https://files.pythonhosted.org/packages/a2/75/8c93ff7f315c4d086a2dfcde02f713004357d70a163eddb6c56a6a5eff40/mypy-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:ad3301ebebec9e8ee7135d8e3109ca76c23752bac1e717bc84cd3836b4bf3eae", size = 9777869, upload-time = "2024-12-30T16:37:33.428Z" }, { url = "https://files.pythonhosted.org/packages/8c/cf/6438a429e0f2f5cab8bc83e53dbebfa666476f40ee322e13cac5e64b79e7/mypy-1.17.1-cp311-cp311-win_amd64.whl", hash = "sha256:ff2933428516ab63f961644bc49bc4cbe42bbffb2cd3b71cc7277c07d16b1a8b", size = 9507307, upload-time = "2025-07-31T07:53:59.734Z" },
{ url = "https://files.pythonhosted.org/packages/43/1b/b38c079609bb4627905b74fc6a49849835acf68547ac33d8ceb707de5f52/mypy-1.14.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:30ff5ef8519bbc2e18b3b54521ec319513a26f1bba19a7582e7b1f58a6e69f14", size = 11266668, upload-time = "2024-12-30T16:38:02.211Z" }, { url = "https://files.pythonhosted.org/packages/17/a2/7034d0d61af8098ec47902108553122baa0f438df8a713be860f7407c9e6/mypy-1.17.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:69e83ea6553a3ba79c08c6e15dbd9bfa912ec1e493bf75489ef93beb65209aeb", size = 11086295, upload-time = "2025-07-31T07:53:28.124Z" },
{ url = "https://files.pythonhosted.org/packages/6b/75/2ed0d2964c1ffc9971c729f7a544e9cd34b2cdabbe2d11afd148d7838aa2/mypy-1.14.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:cb9f255c18052343c70234907e2e532bc7e55a62565d64536dbc7706a20b78b9", size = 10254060, upload-time = "2024-12-30T16:37:46.131Z" }, { url = "https://files.pythonhosted.org/packages/14/1f/19e7e44b594d4b12f6ba8064dbe136505cec813549ca3e5191e40b1d3cc2/mypy-1.17.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1b16708a66d38abb1e6b5702f5c2c87e133289da36f6a1d15f6a5221085c6403", size = 10112355, upload-time = "2025-07-31T07:53:21.121Z" },
{ url = "https://files.pythonhosted.org/packages/a1/5f/7b8051552d4da3c51bbe8fcafffd76a6823779101a2b198d80886cd8f08e/mypy-1.14.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8b4e3413e0bddea671012b063e27591b953d653209e7a4fa5e48759cda77ca11", size = 11933167, upload-time = "2024-12-30T16:37:43.534Z" }, { url = "https://files.pythonhosted.org/packages/5b/69/baa33927e29e6b4c55d798a9d44db5d394072eef2bdc18c3e2048c9ed1e9/mypy-1.17.1-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:89e972c0035e9e05823907ad5398c5a73b9f47a002b22359b177d40bdaee7056", size = 11875285, upload-time = "2025-07-31T07:53:55.293Z" },
{ url = "https://files.pythonhosted.org/packages/04/90/f53971d3ac39d8b68bbaab9a4c6c58c8caa4d5fd3d587d16f5927eeeabe1/mypy-1.14.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:553c293b1fbdebb6c3c4030589dab9fafb6dfa768995a453d8a5d3b23784af2e", size = 12864341, upload-time = "2024-12-30T16:37:36.249Z" }, { url = "https://files.pythonhosted.org/packages/90/13/f3a89c76b0a41e19490b01e7069713a30949d9a6c147289ee1521bcea245/mypy-1.17.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:03b6d0ed2b188e35ee6d5c36b5580cffd6da23319991c49ab5556c023ccf1341", size = 12737895, upload-time = "2025-07-31T07:53:43.623Z" },
{ url = "https://files.pythonhosted.org/packages/03/d2/8bc0aeaaf2e88c977db41583559319f1821c069e943ada2701e86d0430b7/mypy-1.14.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fad79bfe3b65fe6a1efaed97b445c3d37f7be9fdc348bdb2d7cac75579607c89", size = 12972991, upload-time = "2024-12-30T16:37:06.743Z" }, { url = "https://files.pythonhosted.org/packages/23/a1/c4ee79ac484241301564072e6476c5a5be2590bc2e7bfd28220033d2ef8f/mypy-1.17.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:c837b896b37cd103570d776bda106eabb8737aa6dd4f248451aecf53030cdbeb", size = 12931025, upload-time = "2025-07-31T07:54:17.125Z" },
{ url = "https://files.pythonhosted.org/packages/6f/17/07815114b903b49b0f2cf7499f1c130e5aa459411596668267535fe9243c/mypy-1.14.1-cp312-cp312-win_amd64.whl", hash = "sha256:8fa2220e54d2946e94ab6dbb3ba0a992795bd68b16dc852db33028df2b00191b", size = 9879016, upload-time = "2024-12-30T16:37:15.02Z" }, { url = "https://files.pythonhosted.org/packages/89/b8/7409477be7919a0608900e6320b155c72caab4fef46427c5cc75f85edadd/mypy-1.17.1-cp312-cp312-win_amd64.whl", hash = "sha256:665afab0963a4b39dff7c1fa563cc8b11ecff7910206db4b2e64dd1ba25aed19", size = 9584664, upload-time = "2025-07-31T07:54:12.842Z" },
{ url = "https://files.pythonhosted.org/packages/9e/15/bb6a686901f59222275ab228453de741185f9d54fecbaacec041679496c6/mypy-1.14.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:92c3ed5afb06c3a8e188cb5da4984cab9ec9a77ba956ee419c68a388b4595255", size = 11252097, upload-time = "2024-12-30T16:37:25.144Z" }, { url = "https://files.pythonhosted.org/packages/5b/82/aec2fc9b9b149f372850291827537a508d6c4d3664b1750a324b91f71355/mypy-1.17.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93378d3203a5c0800c6b6d850ad2f19f7a3cdf1a3701d3416dbf128805c6a6a7", size = 11075338, upload-time = "2025-07-31T07:53:38.873Z" },
{ url = "https://files.pythonhosted.org/packages/f8/b3/8b0f74dfd072c802b7fa368829defdf3ee1566ba74c32a2cb2403f68024c/mypy-1.14.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:dbec574648b3e25f43d23577309b16534431db4ddc09fda50841f1e34e64ed34", size = 10239728, upload-time = "2024-12-30T16:38:08.634Z" }, { url = "https://files.pythonhosted.org/packages/07/ac/ee93fbde9d2242657128af8c86f5d917cd2887584cf948a8e3663d0cd737/mypy-1.17.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:15d54056f7fe7a826d897789f53dd6377ec2ea8ba6f776dc83c2902b899fee81", size = 10113066, upload-time = "2025-07-31T07:54:14.707Z" },
{ url = "https://files.pythonhosted.org/packages/c5/9b/4fd95ab20c52bb5b8c03cc49169be5905d931de17edfe4d9d2986800b52e/mypy-1.14.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8c6d94b16d62eb3e947281aa7347d78236688e21081f11de976376cf010eb31a", size = 11924965, upload-time = "2024-12-30T16:38:12.132Z" }, { url = "https://files.pythonhosted.org/packages/5a/68/946a1e0be93f17f7caa56c45844ec691ca153ee8b62f21eddda336a2d203/mypy-1.17.1-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:209a58fed9987eccc20f2ca94afe7257a8f46eb5df1fb69958650973230f91e6", size = 11875473, upload-time = "2025-07-31T07:53:14.504Z" },
{ url = "https://files.pythonhosted.org/packages/56/9d/4a236b9c57f5d8f08ed346914b3f091a62dd7e19336b2b2a0d85485f82ff/mypy-1.14.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d4b19b03fdf54f3c5b2fa474c56b4c13c9dbfb9a2db4370ede7ec11a2c5927d9", size = 12867660, upload-time = "2024-12-30T16:38:17.342Z" }, { url = "https://files.pythonhosted.org/packages/9f/0f/478b4dce1cb4f43cf0f0d00fba3030b21ca04a01b74d1cd272a528cf446f/mypy-1.17.1-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:099b9a5da47de9e2cb5165e581f158e854d9e19d2e96b6698c0d64de911dd849", size = 12744296, upload-time = "2025-07-31T07:53:03.896Z" },
{ url = "https://files.pythonhosted.org/packages/40/88/a61a5497e2f68d9027de2bb139c7bb9abaeb1be1584649fa9d807f80a338/mypy-1.14.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:0c911fde686394753fff899c409fd4e16e9b294c24bfd5e1ea4675deae1ac6fd", size = 12969198, upload-time = "2024-12-30T16:38:32.839Z" }, { url = "https://files.pythonhosted.org/packages/ca/70/afa5850176379d1b303f992a828de95fc14487429a7139a4e0bdd17a8279/mypy-1.17.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ffadfbe6994d724c5a1bb6123a7d27dd68fc9c059561cd33b664a79578e14", size = 12914657, upload-time = "2025-07-31T07:54:08.576Z" },
{ url = "https://files.pythonhosted.org/packages/54/da/3d6fc5d92d324701b0c23fb413c853892bfe0e1dbe06c9138037d459756b/mypy-1.14.1-cp313-cp313-win_amd64.whl", hash = "sha256:8b21525cb51671219f5307be85f7e646a153e5acc656e5cebf64bfa076c50107", size = 9885276, upload-time = "2024-12-30T16:38:20.828Z" }, { url = "https://files.pythonhosted.org/packages/53/f9/4a83e1c856a3d9c8f6edaa4749a4864ee98486e9b9dbfbc93842891029c2/mypy-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:9a2b7d9180aed171f033c9f2fc6c204c1245cf60b0cb61cf2e7acc24eea78e0a", size = 9593320, upload-time = "2025-07-31T07:53:01.341Z" },
{ url = "https://files.pythonhosted.org/packages/ca/1f/186d133ae2514633f8558e78cd658070ba686c0e9275c5a5c24a1e1f0d67/mypy-1.14.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3888a1816d69f7ab92092f785a462944b3ca16d7c470d564165fe703b0970c35", size = 11200493, upload-time = "2024-12-30T16:38:26.935Z" }, { url = "https://files.pythonhosted.org/packages/38/56/79c2fac86da57c7d8c48622a05873eaab40b905096c33597462713f5af90/mypy-1.17.1-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:15a83369400454c41ed3a118e0cc58bd8123921a602f385cb6d6ea5df050c733", size = 11040037, upload-time = "2025-07-31T07:54:10.942Z" },
{ url = "https://files.pythonhosted.org/packages/af/fc/4842485d034e38a4646cccd1369f6b1ccd7bc86989c52770d75d719a9941/mypy-1.14.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:46c756a444117c43ee984bd055db99e498bc613a70bbbc120272bd13ca579fbc", size = 10357702, upload-time = "2024-12-30T16:38:50.623Z" }, { url = "https://files.pythonhosted.org/packages/4d/c3/adabe6ff53638e3cad19e3547268482408323b1e68bf082c9119000cd049/mypy-1.17.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:55b918670f692fc9fba55c3298d8a3beae295c5cded0a55dccdc5bbead814acd", size = 10131550, upload-time = "2025-07-31T07:53:41.307Z" },
{ url = "https://files.pythonhosted.org/packages/b4/e6/457b83f2d701e23869cfec013a48a12638f75b9d37612a9ddf99072c1051/mypy-1.14.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:27fc248022907e72abfd8e22ab1f10e903915ff69961174784a3900a8cba9ad9", size = 12091104, upload-time = "2024-12-30T16:38:53.735Z" }, { url = "https://files.pythonhosted.org/packages/b8/c5/2e234c22c3bdeb23a7817af57a58865a39753bde52c74e2c661ee0cfc640/mypy-1.17.1-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:62761474061feef6f720149d7ba876122007ddc64adff5ba6f374fda35a018a0", size = 11872963, upload-time = "2025-07-31T07:53:16.878Z" },
{ url = "https://files.pythonhosted.org/packages/f1/bf/76a569158db678fee59f4fd30b8e7a0d75bcbaeef49edd882a0d63af6d66/mypy-1.14.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:499d6a72fb7e5de92218db961f1a66d5f11783f9ae549d214617edab5d4dbdbb", size = 12830167, upload-time = "2024-12-30T16:38:56.437Z" }, { url = "https://files.pythonhosted.org/packages/ab/26/c13c130f35ca8caa5f2ceab68a247775648fdcd6c9a18f158825f2bc2410/mypy-1.17.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c49562d3d908fd49ed0938e5423daed8d407774a479b595b143a3d7f87cdae6a", size = 12710189, upload-time = "2025-07-31T07:54:01.962Z" },
{ url = "https://files.pythonhosted.org/packages/43/bc/0bc6b694b3103de9fed61867f1c8bd33336b913d16831431e7cb48ef1c92/mypy-1.14.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:57961db9795eb566dc1d1b4e9139ebc4c6b0cb6e7254ecde69d1552bf7613f60", size = 13013834, upload-time = "2024-12-30T16:38:59.204Z" }, { url = "https://files.pythonhosted.org/packages/82/df/c7d79d09f6de8383fe800521d066d877e54d30b4fb94281c262be2df84ef/mypy-1.17.1-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:397fba5d7616a5bc60b45c7ed204717eaddc38f826e3645402c426057ead9a91", size = 12900322, upload-time = "2025-07-31T07:53:10.551Z" },
{ url = "https://files.pythonhosted.org/packages/b0/79/5f5ec47849b6df1e6943d5fd8e6632fbfc04b4fd4acfa5a5a9535d11b4e2/mypy-1.14.1-cp39-cp39-win_amd64.whl", hash = "sha256:07ba89fdcc9451f2ebb02853deb6aaaa3d2239a236669a63ab3801bbf923ef5c", size = 9781231, upload-time = "2024-12-30T16:39:05.124Z" }, { url = "https://files.pythonhosted.org/packages/b8/98/3d5a48978b4f708c55ae832619addc66d677f6dc59f3ebad71bae8285ca6/mypy-1.17.1-cp314-cp314-win_amd64.whl", hash = "sha256:9d6b20b97d373f41617bd0708fd46aa656059af57f2ef72aa8c7d6a2b73b74ed", size = 9751879, upload-time = "2025-07-31T07:52:56.683Z" },
{ url = "https://files.pythonhosted.org/packages/a0/b5/32dd67b69a16d088e533962e5044e51004176a9952419de0370cdaead0f8/mypy-1.14.1-py3-none-any.whl", hash = "sha256:b66a60cc4073aeb8ae00057f9c1f64d49e90f918fbcef9a977eb121da8b8f1d1", size = 2752905, upload-time = "2024-12-30T16:38:42.021Z" }, { url = "https://files.pythonhosted.org/packages/29/cb/673e3d34e5d8de60b3a61f44f80150a738bff568cd6b7efb55742a605e98/mypy-1.17.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5d1092694f166a7e56c805caaf794e0585cabdbf1df36911c414e4e9abb62ae9", size = 10992466, upload-time = "2025-07-31T07:53:57.574Z" },
{ url = "https://files.pythonhosted.org/packages/0c/d0/fe1895836eea3a33ab801561987a10569df92f2d3d4715abf2cfeaa29cb2/mypy-1.17.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:79d44f9bfb004941ebb0abe8eff6504223a9c1ac51ef967d1263c6572bbebc99", size = 10117638, upload-time = "2025-07-31T07:53:34.256Z" },
{ url = "https://files.pythonhosted.org/packages/97/f3/514aa5532303aafb95b9ca400a31054a2bd9489de166558c2baaeea9c522/mypy-1.17.1-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b01586eed696ec905e61bd2568f48740f7ac4a45b3a468e6423a03d3788a51a8", size = 11915673, upload-time = "2025-07-31T07:52:59.361Z" },
{ url = "https://files.pythonhosted.org/packages/ab/c3/c0805f0edec96fe8e2c048b03769a6291523d509be8ee7f56ae922fa3882/mypy-1.17.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:43808d9476c36b927fbcd0b0255ce75efe1b68a080154a38ae68a7e62de8f0f8", size = 12649022, upload-time = "2025-07-31T07:53:45.92Z" },
{ url = "https://files.pythonhosted.org/packages/45/3e/d646b5a298ada21a8512fa7e5531f664535a495efa672601702398cea2b4/mypy-1.17.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:feb8cc32d319edd5859da2cc084493b3e2ce5e49a946377663cc90f6c15fb259", size = 12895536, upload-time = "2025-07-31T07:53:06.17Z" },
{ url = "https://files.pythonhosted.org/packages/14/55/e13d0dcd276975927d1f4e9e2ec4fd409e199f01bdc671717e673cc63a22/mypy-1.17.1-cp39-cp39-win_amd64.whl", hash = "sha256:d7598cf74c3e16539d4e2f0b8d8c318e00041553d83d4861f87c7a72e95ac24d", size = 9512564, upload-time = "2025-07-31T07:53:12.346Z" },
{ url = "https://files.pythonhosted.org/packages/1d/f3/8fcd2af0f5b806f6cf463efaffd3c9548a28f84220493ecd38d127b6b66d/mypy-1.17.1-py3-none-any.whl", hash = "sha256:a9f52c0351c21fe24c21d8c0eb1f62967b262d6729393397b6f443c3b773c3b9", size = 2283411, upload-time = "2025-07-31T07:53:24.664Z" },
] ]
[[package]] [[package]]
@ -1116,6 +1123,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" }, { url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
] ]
[[package]]
name = "pathspec"
version = "0.12.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ca/bc/f35b8446f4531a7cb215605d100cd88b7ac6f44ab3fc94870c120ab3adbf/pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712", size = 51043, upload-time = "2023-12-10T22:30:45Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/20/ff623b09d963f88bfde16306a54e12ee5ea43e9b597108672ff3a408aad6/pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08", size = 31191, upload-time = "2023-12-10T22:30:43.14Z" },
]
[[package]] [[package]]
name = "pip" name = "pip"
version = "25.2" version = "25.2"
@ -1315,7 +1331,7 @@ provides-extras = ["aws", "docs", "encryption", "gssapi", "ocsp", "snappy", "tes
[package.metadata.requires-dev] [package.metadata.requires-dev]
coverage = [ coverage = [
{ name = "coverage", specifier = ">=5,<=7.5" }, { name = "coverage", specifier = ">=5,<=7.10.6" },
{ name = "pytest-cov" }, { name = "pytest-cov" },
] ]
dev = [{ name = "pre-commit", specifier = ">=4.0" }] dev = [{ name = "pre-commit", specifier = ">=4.0" }]
@ -1329,9 +1345,9 @@ perf = [{ name = "simplejson" }]
pip = [{ name = "pip" }] pip = [{ name = "pip" }]
pymongocrypt-source = [{ name = "pymongocrypt", git = "https://github.com/mongodb/libmongocrypt?subdirectory=bindings%2Fpython&rev=master" }] pymongocrypt-source = [{ name = "pymongocrypt", git = "https://github.com/mongodb/libmongocrypt?subdirectory=bindings%2Fpython&rev=master" }]
typing = [ typing = [
{ name = "mypy", specifier = "==1.14.1" }, { name = "mypy", specifier = "==1.17.1" },
{ name = "pip" }, { name = "pip" },
{ name = "pyright", specifier = "==1.1.392.post0" }, { name = "pyright", specifier = "==1.1.405" },
{ name = "typing-extensions" }, { name = "typing-extensions" },
] ]
@ -1375,15 +1391,15 @@ wheels = [
[[package]] [[package]]
name = "pyright" name = "pyright"
version = "1.1.392.post0" version = "1.1.405"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "nodeenv" }, { name = "nodeenv" },
{ name = "typing-extensions" }, { name = "typing-extensions" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/66/df/3c6f6b08fba7ccf49b114dfc4bb33e25c299883fd763f93fad47ef8bc58d/pyright-1.1.392.post0.tar.gz", hash = "sha256:3b7f88de74a28dcfa90c7d90c782b6569a48c2be5f9d4add38472bdaac247ebd", size = 3789911, upload-time = "2025-01-15T15:01:20.913Z" } sdist = { url = "https://files.pythonhosted.org/packages/fb/6c/ba4bbee22e76af700ea593a1d8701e3225080956753bee9750dcc25e2649/pyright-1.1.405.tar.gz", hash = "sha256:5c2a30e1037af27eb463a1cc0b9f6d65fec48478ccf092c1ac28385a15c55763", size = 4068319, upload-time = "2025-09-04T03:37:06.776Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/e7/b1/a18de17f40e4f61ca58856b9ef9b0febf74ff88978c3f7776f910071f567/pyright-1.1.392.post0-py3-none-any.whl", hash = "sha256:252f84458a46fa2f0fd4e2f91fc74f50b9ca52c757062e93f6c250c0d8329eb2", size = 5595487, upload-time = "2025-01-15T15:01:17.775Z" }, { url = "https://files.pythonhosted.org/packages/d5/1a/524f832e1ff1962a22a1accc775ca7b143ba2e9f5924bb6749dce566784a/pyright-1.1.405-py3-none-any.whl", hash = "sha256:a2cb13700b5508ce8e5d4546034cb7ea4aedb60215c6c33f56cec7f53996035a", size = 5905038, upload-time = "2025-09-04T03:37:04.913Z" },
] ]
[[package]] [[package]]