diff --git a/.evergreen/generated_configs/tasks.yml b/.evergreen/generated_configs/tasks.yml index f69afcdb9..60ee6ed13 100644 --- a/.evergreen/generated_configs/tasks.yml +++ b/.evergreen/generated_configs/tasks.yml @@ -102,7 +102,7 @@ tasks: AWS_ROLE_SESSION_NAME: test TOOLCHAIN_VERSION: "3.14" tags: [auth-aws, auth-aws-web-identity] - - name: test-auth-aws-latest-ecs-python3.10 + - name: test-auth-aws-latest-regular-python3.10-min-deps commands: - func: run server vars: @@ -112,9 +112,21 @@ tasks: - func: run tests vars: TEST_NAME: auth_aws - SUB_TEST_NAME: ecs + SUB_TEST_NAME: regular TOOLCHAIN_VERSION: "3.10" - tags: [auth-aws, auth-aws-ecs] + TEST_MIN_DEPS: "1" + tags: [auth-aws, auth-aws-regular] + - name: test-auth-aws-ecs + commands: + - func: assume ec2 role + - func: run server + vars: + VERSION: "8.0" + - func: run tests + vars: + TEST_NAME: auth_aws + SUB_TEST_NAME: ecs + tags: [auth-aws-ecs] # Backport pr tests - name: backport-pr diff --git a/.evergreen/generated_configs/variants.yml b/.evergreen/generated_configs/variants.yml index 1b0c3fc73..42a677609 100644 --- a/.evergreen/generated_configs/variants.yml +++ b/.evergreen/generated_configs/variants.yml @@ -70,27 +70,34 @@ buildvariants: tags: [pr] # Aws auth tests - - name: auth-aws-ubuntu-20 + - name: auth-aws-rhel8 tasks: - name: .auth-aws - display_name: Auth AWS Ubuntu-20 + display_name: Auth AWS RHEL8 run_on: - - ubuntu2004-small + - rhel87-small tags: [] - name: auth-aws-win64 tasks: - - name: .auth-aws !.auth-aws-ecs + - name: .auth-aws display_name: Auth AWS Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small tags: [] - name: auth-aws-macos tasks: - - name: .auth-aws !.auth-aws-web-identity !.auth-aws-ecs !.auth-aws-ec2 + - name: .auth-aws !.auth-aws-web-identity !.auth-aws-ec2 display_name: Auth AWS macOS run_on: - macos-14 tags: [pr] + - name: auth-aws-ecs-macos + tasks: + - name: .auth-aws-ecs + display_name: Auth AWS ECS macOS + run_on: + - ubuntu2404-small + tags: [pr] # Aws lambda tests - name: faas-lambda @@ -199,7 +206,7 @@ buildvariants: - name: .test-non-standard !.pypy display_name: Encryption Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small batchtime: 1440 expansions: TEST_NAME: encryption @@ -231,7 +238,7 @@ buildvariants: - name: .test-non-standard !.pypy display_name: Encryption crypt_shared Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small batchtime: 1440 expansions: TEST_NAME: encryption @@ -270,10 +277,10 @@ buildvariants: AUTH: auth - name: auth-enterprise-win64 tasks: - - name: .test-standard-auth !.pypy .auth + - name: .test-standard-auth !.pypy .auth !.free-threaded display_name: Auth Enterprise Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small expansions: TEST_NAME: enterprise_auth AUTH: auth @@ -384,7 +391,7 @@ buildvariants: - name: .ocsp-rsa !.ocsp-staple .4.4 display_name: OCSP Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small batchtime: 10080 - name: ocsp-macos tasks: @@ -423,7 +430,7 @@ buildvariants: - name: "!.auth_oidc_remote .auth_oidc" display_name: Auth OIDC Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small batchtime: 1440 # Perf tests @@ -462,7 +469,7 @@ buildvariants: - name: .test-standard !.pypy .async .replica_set-noauth-ssl display_name: PyOpenSSL Win64 run_on: - - rhel87-small + - windows-2022-latest-small batchtime: 1440 expansions: SUB_TEST_NAME: pyopenssl @@ -606,7 +613,7 @@ buildvariants: - name: .test-standard !.pypy display_name: "* Test Win64" run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small tags: [standard-non-linux] - name: test-win32 tasks: @@ -655,7 +662,7 @@ buildvariants: - name: .test-numpy display_name: Test Numpy Win64 run_on: - - windows-64-vsMulti-small + - windows-2022-latest-small tags: [binary, vector] - name: test-numpy-win32 tasks: diff --git a/.evergreen/scripts/generate_config.py b/.evergreen/scripts/generate_config.py index 05afc16db..04579c521 100644 --- a/.evergreen/scripts/generate_config.py +++ b/.evergreen/scripts/generate_config.py @@ -223,7 +223,8 @@ def create_enterprise_auth_variants(): if host == "macos": tasks = [".test-standard-auth !.pypy .auth !.free-threaded"] if host == "win64": - tasks = [".test-standard-auth !.pypy .auth"] + # https://jira.mongodb.org/browse/PYTHON-5704 + tasks = [".test-standard-auth !.pypy .auth !.free-threaded"] variant = create_variant(tasks, display_name, host=host, expansions=expansions) variants.append(variant) return variants @@ -481,15 +482,15 @@ def create_perf_variants(): def create_aws_auth_variants(): variants = [] - for host_name in ["ubuntu20", "win64", "macos"]: + for host_name in ["rhel8", "win64", "macos"]: expansions = dict() tasks = [".auth-aws"] tags = [] if host_name == "macos": - tasks = [".auth-aws !.auth-aws-web-identity !.auth-aws-ecs !.auth-aws-ec2"] + tasks = [".auth-aws !.auth-aws-web-identity !.auth-aws-ec2"] tags = ["pr"] elif host_name == "win64": - tasks = [".auth-aws !.auth-aws-ecs"] + tasks = [".auth-aws"] host = HOSTS[host_name] variant = create_variant( tasks, @@ -499,6 +500,16 @@ def create_aws_auth_variants(): expansions=expansions, ) variants.append(variant) + + # The ECS test must be run on Ubuntu 24 to match the Fargate Config. + variant = create_variant( + [".auth-aws-ecs"], + get_variant_name("Auth AWS ECS", host), + host=HOSTS["ubuntu24"], + tags=tags, + expansions=expansions, + ) + variants.append(variant) return variants @@ -788,19 +799,18 @@ def create_aws_tasks(): "env-creds", "session-creds", "web-identity", - "ecs", ] + assume_func = FunctionCall(func="assume ec2 role") for version, test_type, python in zip_cycle(get_versions_from("4.4"), aws_test_types, CPYTHONS): base_name = f"test-auth-aws-{version}" base_tags = ["auth-aws"] server_vars = dict(AUTH_AWS="1", VERSION=version) server_func = FunctionCall(func="run server", vars=server_vars) - assume_func = FunctionCall(func="assume ec2 role") tags = [*base_tags, f"auth-aws-{test_type}"] if "t" in python: tags.append("free-threaded") test_vars = dict(TEST_NAME="auth_aws", SUB_TEST_NAME=test_type, TOOLCHAIN_VERSION=python) - if python == ALL_PYTHONS[0] and test_type != "ecs": + if python == ALL_PYTHONS[0]: test_vars["TEST_MIN_DEPS"] = "1" name = get_task_name(f"{base_name}-{test_type}", **test_vars) test_func = FunctionCall(func="run tests", vars=test_vars) @@ -822,6 +832,16 @@ def create_aws_tasks(): funcs = [server_func, assume_func, test_func] tasks.append(EvgTask(name=name, tags=tags, commands=funcs)) + # Add the ECS task. This will run on Ubuntu 24 to match the + # Fargate environment. + tags = ["auth-aws-ecs"] + test_vars = dict(TEST_NAME="auth_aws", SUB_TEST_NAME="ecs") + name = get_task_name("test-auth-aws-ecs", **test_vars) + test_func = FunctionCall(func="run tests", vars=test_vars) + server_func = FunctionCall(func="run server", vars=dict(VERSION="8.0")) + funcs = [assume_func, server_func, test_func] + tasks.append(EvgTask(name=name, tags=tags, commands=funcs)) + return tasks diff --git a/.evergreen/scripts/generate_config_utils.py b/.evergreen/scripts/generate_config_utils.py index 573afa36e..ee5da4bcd 100644 --- a/.evergreen/scripts/generate_config_utils.py +++ b/.evergreen/scripts/generate_config_utils.py @@ -59,11 +59,12 @@ class Host: # Hosts with toolchains. HOSTS["rhel8"] = Host("rhel8", "rhel87-small", "RHEL8", dict()) HOSTS["win64"] = Host("win64", "windows-64-vsMulti-small", "Win64", dict()) +HOSTS["win-latest"] = Host("win-latest", "windows-2022-latest-small", "WinLatest", dict()) HOSTS["win32"] = Host("win32", "windows-64-vsMulti-small", "Win32", dict()) HOSTS["macos"] = Host("macos", "macos-14", "macOS", dict()) HOSTS["macos-arm64"] = Host("macos-arm64", "macos-14-arm64", "macOS Arm64", dict()) -HOSTS["ubuntu20"] = Host("ubuntu20", "ubuntu2004-small", "Ubuntu-20", dict()) HOSTS["ubuntu22"] = Host("ubuntu22", "ubuntu2204-small", "Ubuntu-22", dict()) +HOSTS["ubuntu24"] = Host("ubuntu24", "ubuntu2404-small", "Ubuntu-24", dict()) HOSTS["perf"] = Host("perf", "rhel90-dbx-perf-large", "", dict()) HOSTS["debian11"] = Host("debian11", "debian11-small", "Debian11", dict()) DEFAULT_HOST = HOSTS["rhel8"] @@ -139,6 +140,14 @@ def create_variant( expansions = expansions and expansions.copy() or dict() if version: expansions["VERSION"] = version + # 8.0+ Windows builds must run on win-latest + if ( + "win64" in display_name.lower() + or "win32" in display_name.lower() + and version + and version >= "8.0" + ): + kwargs["run_on"] = HOSTS["win-latest"].run_on return create_variant_generic( tasks, display_name, version=version, host=host, expansions=expansions, **kwargs ) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 5bf500ba1..b810b64fc 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,6 +5,8 @@ updates: directory: "/" schedule: interval: "weekly" + cooldown: + default-days: 7 groups: actions: patterns: diff --git a/.github/workflows/sbom.yml b/.github/workflows/sbom.yml index c5d055bdc..69a07c8be 100644 --- a/.github/workflows/sbom.yml +++ b/.github/workflows/sbom.yml @@ -74,7 +74,7 @@ jobs: if-no-files-found: error - name: Create Pull Request - uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 + uses: peter-evans/create-pull-request@c0f553fe549906ede9cf27b5156039d195d2ece0 # v8 with: token: ${{ secrets.GITHUB_TOKEN }} commit-message: 'chore: Update SBOM after dependency changes' diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 3d32a852a..086e22fae 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -26,7 +26,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "3.10" @@ -68,7 +68,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: ${{ matrix.python-version }} @@ -87,7 +87,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "3.10" @@ -112,7 +112,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "3.10" @@ -131,7 +131,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "3.10" @@ -153,7 +153,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "${{matrix.python}}" @@ -174,7 +174,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: enable-cache: true python-version: "3.10" @@ -264,7 +264,7 @@ jobs: with: persist-credentials: false - name: Install uv - uses: astral-sh/setup-uv@681c641aba71e4a1c380be3ab5e12ad51f415867 # v7 + uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7 with: python-version: "3.9" - id: setup-mongodb diff --git a/.github/workflows/zizmor.yml b/.github/workflows/zizmor.yml index 2980210d7..26f75fa79 100644 --- a/.github/workflows/zizmor.yml +++ b/.github/workflows/zizmor.yml @@ -18,4 +18,4 @@ jobs: with: persist-credentials: false - name: Run zizmor 🌈 - uses: zizmorcore/zizmor-action@706c51b5bce7adb027de71ab36d865f5d3fcc7b7 + uses: zizmorcore/zizmor-action@135698455da5c3b3e55f73f4419e481ab68cdd95 # v0.4.1 diff --git a/doc/api/pymongo/asynchronous/command_cursor.rst b/doc/api/pymongo/asynchronous/command_cursor.rst index 1f94c6e52..73488d1da 100644 --- a/doc/api/pymongo/asynchronous/command_cursor.rst +++ b/doc/api/pymongo/asynchronous/command_cursor.rst @@ -5,3 +5,4 @@ .. automodule:: pymongo.asynchronous.command_cursor :synopsis: Tools for iterating over MongoDB command results :members: + :inherited-members: diff --git a/doc/api/pymongo/asynchronous/cursor.rst b/doc/api/pymongo/asynchronous/cursor.rst index f511734de..635805c2a 100644 --- a/doc/api/pymongo/asynchronous/cursor.rst +++ b/doc/api/pymongo/asynchronous/cursor.rst @@ -7,6 +7,8 @@ .. autoclass:: pymongo.asynchronous.cursor.AsyncCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None) :members: + :inherited-members: + .. describe:: c[index] diff --git a/doc/api/pymongo/command_cursor.rst b/doc/api/pymongo/command_cursor.rst index 2d0597a00..b6cc78905 100644 --- a/doc/api/pymongo/command_cursor.rst +++ b/doc/api/pymongo/command_cursor.rst @@ -4,3 +4,4 @@ .. automodule:: pymongo.command_cursor :synopsis: Tools for iterating over MongoDB command results :members: + :inherited-members: diff --git a/doc/api/pymongo/cursor.rst b/doc/api/pymongo/cursor.rst index 513f051ab..2eb14c9cb 100644 --- a/doc/api/pymongo/cursor.rst +++ b/doc/api/pymongo/cursor.rst @@ -17,6 +17,7 @@ .. autoclass:: pymongo.cursor.Cursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None) :members: + :inherited-members: .. describe:: c[index] diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index c74d91154..5967651b5 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -159,7 +159,7 @@ from bson.binary import Binary from bson.int64 import Int64 from bson.timestamp import Timestamp from pymongo import _csot -from pymongo.asynchronous.cursor import _ConnectionManager +from pymongo.asynchronous.cursor_base import _ConnectionManager from pymongo.errors import ( ConfigurationError, ConnectionFailure, diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index e18b3a330..5a59c67a1 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -20,7 +20,6 @@ from typing import ( TYPE_CHECKING, Any, AsyncIterator, - Generic, Mapping, NoReturn, Optional, @@ -29,17 +28,10 @@ from typing import ( ) from bson import CodecOptions, _convert_raw_document_lists_to_streams -from pymongo import _csot -from pymongo.asynchronous.cursor import _ConnectionManager +from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import ( - _CursorAddress, - _GetMore, - _OpMsg, - _OpReply, - _RawBatchGetMore, -) +from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore from pymongo.response import PinnedResponse from pymongo.typings import _Address, _DocumentOut, _DocumentType @@ -51,7 +43,7 @@ if TYPE_CHECKING: _IS_SYNC = False -class AsyncCommandCursor(Generic[_DocumentType]): +class AsyncCommandCursor(_AsyncCursorBase[_DocumentType]): """An asynchronous cursor / iterator over command cursors.""" _getmore_class = _GetMore @@ -98,8 +90,8 @@ class AsyncCommandCursor(Generic[_DocumentType]): f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}" ) - def __del__(self) -> None: - self._die_no_lock() + def _get_namespace(self) -> str: + return self._ns def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]: """Limits the number of documents returned in one batch. Each batch @@ -161,94 +153,12 @@ class AsyncCommandCursor(Generic[_DocumentType]): ) -> Sequence[_DocumentOut]: return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response) - @property - def alive(self) -> bool: - """Does this cursor have the potential to return more data? - - Even if :attr:`alive` is ``True``, :meth:`next` can raise - :exc:`StopIteration`. Best to use a for loop:: - - async for doc in collection.aggregate(pipeline): - print(doc) - - .. note:: :attr:`alive` can be True while iterating a cursor from - a failed server. In this case :attr:`alive` will return False after - :meth:`next` fails to retrieve the next batch of results from the - server. - """ - return bool(len(self._data) or (not self._killed)) - - @property - def cursor_id(self) -> int: - """Returns the id of the cursor.""" - return self._id - - @property - def address(self) -> Optional[_Address]: - """The (host, port) of the server used, or None. - - .. versionadded:: 3.0 - """ - return self._address - - @property - def session(self) -> Optional[AsyncClientSession]: - """The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None. - - .. versionadded:: 3.6 - """ - if self._session and not self._session._implicit: - return self._session - return None - - def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]: - already_killed = self._killed - self._killed = True - if self._id and not already_killed: - cursor_id = self._id - assert self._address is not None - address = _CursorAddress(self._address, self._ns) - else: - # Skip killCursors. - cursor_id = 0 - address = None - return cursor_id, address - - def _die_no_lock(self) -> None: - """Closes this cursor without acquiring a lock.""" - cursor_id, address = self._prepare_to_die() - self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - async def _die_lock(self) -> None: - """Closes this cursor.""" - cursor_id, address = self._prepare_to_die() - await self._collection.database.client._cleanup_cursor_lock( - cursor_id, - address, - self._sock_mgr, - self._session, - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - def _end_session(self) -> None: if self._session and self._session._implicit: self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None - async def close(self) -> None: - """Explicitly close / kill this cursor.""" - await self._die_lock() - async def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" client = self._collection.database.client @@ -330,6 +240,9 @@ class AsyncCommandCursor(Generic[_DocumentType]): def __aiter__(self) -> AsyncIterator[_DocumentType]: return self + async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]: + return self + async def next(self) -> _DocumentType: """Advance the cursor.""" # Block until a document is returnable. @@ -385,41 +298,6 @@ class AsyncCommandCursor(Generic[_DocumentType]): """ return await self._try_next(get_more_allowed=True) - async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]: - return self - - async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - await self.close() - - @_csot.apply - async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: - """Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``. - - To use:: - - >>> await cursor.to_list() - - Or, so read at most n items from the cursor:: - - >>> await cursor.to_list(n) - - If the cursor is empty or has no more results, an empty list will be returned. - - .. versionadded:: 4.9 - """ - res: list[_DocumentType] = [] - remaining = length - if isinstance(length, int) and length < 1: - raise ValueError("to_list() length must be greater than 0") - while self.alive: - if not await self._next_batch(res, remaining): - break - if length is not None: - remaining = length - len(res) - if remaining == 0: - break - return res - class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]): _getmore_class = _RawBatchGetMore diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 67494192f..a60c082ad 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -21,7 +21,6 @@ from collections import deque from typing import ( TYPE_CHECKING, Any, - Generic, Iterable, List, Mapping, @@ -36,7 +35,8 @@ from typing import ( from bson import RE_TYPE, _convert_raw_document_lists_to_streams from bson.code import Code from bson.son import SON -from pymongo import _csot, helpers_shared +from pymongo import helpers_shared +from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager from pymongo.asynchronous.helpers import anext from pymongo.collation import validate_collation_or_none from pymongo.common import ( @@ -45,9 +45,7 @@ from pymongo.common import ( ) from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.lock import _async_create_lock from pymongo.message import ( - _CursorAddress, _GetMore, _OpMsg, _OpReply, @@ -65,31 +63,12 @@ if TYPE_CHECKING: from bson.codec_options import CodecOptions from pymongo.asynchronous.client_session import AsyncClientSession from pymongo.asynchronous.collection import AsyncCollection - from pymongo.asynchronous.pool import AsyncConnection from pymongo.read_preferences import _ServerMode _IS_SYNC = False -class _ConnectionManager: - """Used with exhaust cursors to ensure the connection is returned.""" - - def __init__(self, conn: AsyncConnection, more_to_come: bool): - self.conn: Optional[AsyncConnection] = conn - self.more_to_come = more_to_come - self._lock = _async_create_lock() - - def update_exhaust(self, more_to_come: bool) -> None: - self.more_to_come = more_to_come - - async def close(self) -> None: - """Return this instance's connection to the connection pool.""" - if self.conn: - await self.conn.unpin() - self.conn = None - - -class AsyncCursor(Generic[_DocumentType]): +class AsyncCursor(_AsyncCursorBase[_DocumentType]): _query_class = _Query _getmore_class = _GetMore @@ -266,8 +245,8 @@ class AsyncCursor(Generic[_DocumentType]): """The number of documents retrieved so far.""" return self._retrieved - def __del__(self) -> None: - self._die_no_lock() + def _get_namespace(self) -> str: + return f"{self._dbname}.{self._collname}" def clone(self) -> AsyncCursor[_DocumentType]: """Get a clone of this cursor. @@ -899,55 +878,6 @@ class AsyncCursor(Generic[_DocumentType]): self._read_preference = self._collection._read_preference_for(self.session) return self._read_preference - @property - def alive(self) -> bool: - """Does this cursor have the potential to return more data? - - This is mostly useful with `tailable cursors - `_ - since they will stop iterating even though they *may* return more - results in the future. - - With regular cursors, simply use an asynchronous for loop instead of :attr:`alive`:: - - async for doc in collection.find(): - print(doc) - - .. note:: Even if :attr:`alive` is True, :meth:`next` can raise - :exc:`StopIteration`. :attr:`alive` can also be True while iterating - a cursor from a failed server. In this case :attr:`alive` will - return False after :meth:`next` fails to retrieve the next batch - of results from the server. - """ - return bool(len(self._data) or (not self._killed)) - - @property - def cursor_id(self) -> Optional[int]: - """Returns the id of the cursor - - .. versionadded:: 2.2 - """ - return self._id - - @property - def address(self) -> Optional[tuple[str, Any]]: - """The (host, port) of the server used, or None. - - .. versionchanged:: 3.0 - Renamed from "conn_id". - """ - return self._address - - @property - def session(self) -> Optional[AsyncClientSession]: - """The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None. - - .. versionadded:: 3.6 - """ - if self._session and not self._session._implicit: - return self._session - return None - def __copy__(self) -> AsyncCursor[_DocumentType]: """Support function for `copy.copy()`. @@ -1011,59 +941,6 @@ class AsyncCursor(Generic[_DocumentType]): y[key] = value # type:ignore[index] return y - def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: - self._killed = True - if self._id and not already_killed: - cursor_id = self._id - assert self._address is not None - address = _CursorAddress(self._address, f"{self._dbname}.{self._collname}") - else: - # Skip killCursors. - cursor_id = 0 - address = None - return cursor_id, address - - def _die_no_lock(self) -> None: - """Closes this cursor without acquiring a lock.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - - cursor_id, address = self._prepare_to_die(already_killed) - self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - async def _die_lock(self) -> None: - """Closes this cursor.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - - cursor_id, address = self._prepare_to_die(already_killed) - await self._collection.database.client._cleanup_cursor_lock( - cursor_id, - address, - self._sock_mgr, - self._session, - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - async def close(self) -> None: - """Explicitly close / kill this cursor.""" - await self._die_lock() - async def distinct(self, key: str) -> list[Any]: """Get a list of distinct values for `key` among all documents in the result set of this query. @@ -1296,40 +1173,8 @@ class AsyncCursor(Generic[_DocumentType]): async def __aenter__(self) -> AsyncCursor[_DocumentType]: return self - async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - await self.close() - @_csot.apply - async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: - """Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``. - - To use:: - - >>> await cursor.to_list() - - Or, to read at most n items from the cursor:: - - >>> await cursor.to_list(n) - - If the cursor is empty or has no more results, an empty list will be returned. - - .. versionadded:: 4.9 - """ - res: list[_DocumentType] = [] - remaining = length - if isinstance(length, int) and length < 1: - raise ValueError("to_list() length must be greater than 0") - while self.alive: - if not await self._next_batch(res, remaining): - break - if length is not None: - remaining = length - len(res) - if remaining == 0: - break - return res - - -class AsyncRawBatchCursor(AsyncCursor, Generic[_DocumentType]): # type: ignore[type-arg] +class AsyncRawBatchCursor(AsyncCursor[_DocumentType]): """An asynchronous cursor / iterator over raw batches of BSON data from a query result.""" _query_class = _RawBatchQuery diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py new file mode 100644 index 000000000..1e293a181 --- /dev/null +++ b/pymongo/asynchronous/cursor_base.py @@ -0,0 +1,122 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Asynchronous cursor base extending the shared agnostic cursor base.""" +from __future__ import annotations + +from abc import abstractmethod +from typing import TYPE_CHECKING, Any, Optional + +from pymongo import _csot +from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.lock import _async_create_lock +from pymongo.typings import _DocumentType + +if TYPE_CHECKING: + from pymongo.asynchronous.client_session import AsyncClientSession + from pymongo.asynchronous.pool import AsyncConnection + +_IS_SYNC = False + + +class _ConnectionManager: + """Used with exhaust cursors to ensure the connection is returned.""" + + def __init__(self, conn: AsyncConnection, more_to_come: bool): + self.conn: Optional[AsyncConnection] = conn + self.more_to_come = more_to_come + self._lock = _async_create_lock() + + def update_exhaust(self, more_to_come: bool) -> None: + self.more_to_come = more_to_come + + async def close(self) -> None: + """Return this instance's connection to the connection pool.""" + if self.conn: + await self.conn.unpin() + self.conn = None + + +class _AsyncCursorBase(_AgnosticCursorBase[_DocumentType]): + """Asynchronous cursor base class.""" + + @property + def session(self) -> Optional[AsyncClientSession]: + """The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None. + + .. versionadded:: 3.6 + """ + if self._session and not self._session._implicit: + return self._session + return None + + @abstractmethod + async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] + ... + + async def _die_lock(self) -> None: + """Closes this cursor.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + await self._collection.database.client._cleanup_cursor_lock( + cursor_id, + address, + self._sock_mgr, + self._session, + ) + if self._session and self._session._implicit: + self._session._attached_to_cursor = False + self._session = None + self._sock_mgr = None + + async def close(self) -> None: + """Explicitly close / kill this cursor.""" + await self._die_lock() + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + await self.close() + + @_csot.apply + async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: + """Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``. + + To use:: + + >>> await cursor.to_list() + + Or, to read at most n items from the cursor:: + + >>> await cursor.to_list(n) + + If the cursor is empty or has no more results, an empty list will be returned. + + .. versionadded:: 4.9 + """ + res: list[_DocumentType] = [] + remaining = length + if isinstance(length, int) and length < 1: + raise ValueError("to_list() length must be greater than 0") + while self.alive: + if not await self._next_batch(res, remaining): + break + if length is not None: + remaining = length - len(res) + if remaining == 0: + break + return res diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 4a8d41aca..adab640fd 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -144,7 +144,7 @@ if TYPE_CHECKING: from bson.objectid import ObjectId from pymongo.asynchronous.bulk import _AsyncBulk from pymongo.asynchronous.client_session import AsyncClientSession, _ServerSession - from pymongo.asynchronous.cursor import _ConnectionManager + from pymongo.asynchronous.cursor_base import _ConnectionManager from pymongo.asynchronous.encryption import _Encrypter from pymongo.asynchronous.pool import AsyncConnection from pymongo.asynchronous.server import Server diff --git a/pymongo/cursor_shared.py b/pymongo/cursor_shared.py index de6126c4f..e5b76a404 100644 --- a/pymongo/cursor_shared.py +++ b/pymongo/cursor_shared.py @@ -16,7 +16,104 @@ """Constants and types shared across all cursor classes.""" from __future__ import annotations -from typing import Any, Mapping, Sequence, Tuple, Union +from abc import ABC, abstractmethod +from typing import Any, Generic, Mapping, Optional, Sequence, Tuple, Union + +from pymongo.message import _CursorAddress +from pymongo.typings import _Address, _DocumentType + + +class _AgnosticCursorBase(Generic[_DocumentType], ABC): + """ + Shared IO-agnostic cursor base used by both async and sync cursor classes. + All IO-specific behavior is implemented in subclasses. + """ + + # These are all typed more accurately in subclasses. + _collection: Any + _id: Optional[int] + _data: Any + _address: Optional[_Address] + _sock_mgr: Any + _session: Optional[Any] + _killed: bool + + @abstractmethod + def _get_namespace(self) -> str: + """Return the full namespace (dbname.collname) for this cursor.""" + ... + + def __del__(self) -> None: + self._die_no_lock() + + @property + def alive(self) -> bool: + """Does this cursor have the potential to return more data? + + This is mostly useful with `tailable cursors + `_ + since they will stop iterating even though they *may* return more + results in the future. + + With regular cursors, simply use an asynchronous for loop instead of :attr:`alive`:: + + async for doc in collection.find(): + print(doc) + + .. note:: Even if :attr:`alive` is True, :meth:`next` can raise + :exc:`StopIteration`. :attr:`alive` can also be True while iterating + a cursor from a failed server. In this case :attr:`alive` will + return False after :meth:`next` fails to retrieve the next batch + of results from the server. + """ + return bool(len(self._data) or (not self._killed)) + + @property + def cursor_id(self) -> Optional[int]: + """Returns the id of the cursor. + + .. versionadded:: 2.2 + """ + return self._id + + @property + def address(self) -> Optional[_Address]: + """The (host, port) of the server used, or None. + + .. versionchanged:: 3.0 + Renamed from "conn_id". + """ + return self._address + + def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: + self._killed = True + if self._id and not already_killed: + cursor_id = self._id + assert self._address is not None + address = _CursorAddress(self._address, self._get_namespace()) + else: + # Skip killCursors. + cursor_id = 0 + address = None + return cursor_id, address + + def _die_no_lock(self) -> None: + """Closes this cursor without acquiring a lock.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + self._collection.database.client._cleanup_cursor_no_lock( + cursor_id, address, self._sock_mgr, self._session + ) + if self._session and self._session._implicit: + self._session._attached_to_cursor = False + self._session = None + self._sock_mgr = None + # These errors mean that the server has already killed the cursor so there is # no need to send killCursors. diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index d67bf4424..dcda05dc4 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -169,7 +169,7 @@ from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference, _ServerMode from pymongo.server_type import SERVER_TYPE -from pymongo.synchronous.cursor import _ConnectionManager +from pymongo.synchronous.cursor_base import _ConnectionManager from pymongo.write_concern import WriteConcern if TYPE_CHECKING: diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index a09a67efc..34f60c654 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -19,7 +19,6 @@ from collections import deque from typing import ( TYPE_CHECKING, Any, - Generic, Iterator, Mapping, NoReturn, @@ -29,18 +28,11 @@ from typing import ( ) from bson import CodecOptions, _convert_raw_document_lists_to_streams -from pymongo import _csot from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.message import ( - _CursorAddress, - _GetMore, - _OpMsg, - _OpReply, - _RawBatchGetMore, -) +from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore from pymongo.response import PinnedResponse -from pymongo.synchronous.cursor import _ConnectionManager +from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase from pymongo.typings import _Address, _DocumentOut, _DocumentType if TYPE_CHECKING: @@ -51,7 +43,7 @@ if TYPE_CHECKING: _IS_SYNC = True -class CommandCursor(Generic[_DocumentType]): +class CommandCursor(_CursorBase[_DocumentType]): """A cursor / iterator over command cursors.""" _getmore_class = _GetMore @@ -98,8 +90,8 @@ class CommandCursor(Generic[_DocumentType]): f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}" ) - def __del__(self) -> None: - self._die_no_lock() + def _get_namespace(self) -> str: + return self._ns def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]: """Limits the number of documents returned in one batch. Each batch @@ -161,94 +153,12 @@ class CommandCursor(Generic[_DocumentType]): ) -> Sequence[_DocumentOut]: return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response) - @property - def alive(self) -> bool: - """Does this cursor have the potential to return more data? - - Even if :attr:`alive` is ``True``, :meth:`next` can raise - :exc:`StopIteration`. Best to use a for loop:: - - for doc in collection.aggregate(pipeline): - print(doc) - - .. note:: :attr:`alive` can be True while iterating a cursor from - a failed server. In this case :attr:`alive` will return False after - :meth:`next` fails to retrieve the next batch of results from the - server. - """ - return bool(len(self._data) or (not self._killed)) - - @property - def cursor_id(self) -> int: - """Returns the id of the cursor.""" - return self._id - - @property - def address(self) -> Optional[_Address]: - """The (host, port) of the server used, or None. - - .. versionadded:: 3.0 - """ - return self._address - - @property - def session(self) -> Optional[ClientSession]: - """The cursor's :class:`~pymongo.client_session.ClientSession`, or None. - - .. versionadded:: 3.6 - """ - if self._session and not self._session._implicit: - return self._session - return None - - def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]: - already_killed = self._killed - self._killed = True - if self._id and not already_killed: - cursor_id = self._id - assert self._address is not None - address = _CursorAddress(self._address, self._ns) - else: - # Skip killCursors. - cursor_id = 0 - address = None - return cursor_id, address - - def _die_no_lock(self) -> None: - """Closes this cursor without acquiring a lock.""" - cursor_id, address = self._prepare_to_die() - self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - def _die_lock(self) -> None: - """Closes this cursor.""" - cursor_id, address = self._prepare_to_die() - self._collection.database.client._cleanup_cursor_lock( - cursor_id, - address, - self._sock_mgr, - self._session, - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - def _end_session(self) -> None: if self._session and self._session._implicit: self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None - def close(self) -> None: - """Explicitly close / kill this cursor.""" - self._die_lock() - def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" client = self._collection.database.client @@ -330,6 +240,9 @@ class CommandCursor(Generic[_DocumentType]): def __iter__(self) -> Iterator[_DocumentType]: return self + def __enter__(self) -> CommandCursor[_DocumentType]: + return self + def next(self) -> _DocumentType: """Advance the cursor.""" # Block until a document is returnable. @@ -385,41 +298,6 @@ class CommandCursor(Generic[_DocumentType]): """ return self._try_next(get_more_allowed=True) - def __enter__(self) -> CommandCursor[_DocumentType]: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.close() - - @_csot.apply - def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: - """Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``. - - To use:: - - >>> cursor.to_list() - - Or, so read at most n items from the cursor:: - - >>> cursor.to_list(n) - - If the cursor is empty or has no more results, an empty list will be returned. - - .. versionadded:: 4.9 - """ - res: list[_DocumentType] = [] - remaining = length - if isinstance(length, int) and length < 1: - raise ValueError("to_list() length must be greater than 0") - while self.alive: - if not self._next_batch(res, remaining): - break - if length is not None: - remaining = length - len(res) - if remaining == 0: - break - return res - class RawBatchCommandCursor(CommandCursor[_DocumentType]): _getmore_class = _RawBatchGetMore diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index d1e9731d9..5a721d8e0 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -21,7 +21,6 @@ from collections import deque from typing import ( TYPE_CHECKING, Any, - Generic, Iterable, List, Mapping, @@ -36,7 +35,7 @@ from typing import ( from bson import RE_TYPE, _convert_raw_document_lists_to_streams from bson.code import Code from bson.son import SON -from pymongo import _csot, helpers_shared +from pymongo import helpers_shared from pymongo.collation import validate_collation_or_none from pymongo.common import ( validate_is_document_type, @@ -44,9 +43,7 @@ from pymongo.common import ( ) from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure -from pymongo.lock import _create_lock from pymongo.message import ( - _CursorAddress, _GetMore, _OpMsg, _OpReply, @@ -55,6 +52,7 @@ from pymongo.message import ( _RawBatchQuery, ) from pymongo.response import PinnedResponse +from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase from pymongo.synchronous.helpers import next from pymongo.typings import _Address, _CollationIn, _DocumentOut, _DocumentType from pymongo.write_concern import validate_boolean @@ -66,30 +64,11 @@ if TYPE_CHECKING: from pymongo.read_preferences import _ServerMode from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.collection import Collection - from pymongo.synchronous.pool import Connection _IS_SYNC = True -class _ConnectionManager: - """Used with exhaust cursors to ensure the connection is returned.""" - - def __init__(self, conn: Connection, more_to_come: bool): - self.conn: Optional[Connection] = conn - self.more_to_come = more_to_come - self._lock = _create_lock() - - def update_exhaust(self, more_to_come: bool) -> None: - self.more_to_come = more_to_come - - def close(self) -> None: - """Return this instance's connection to the connection pool.""" - if self.conn: - self.conn.unpin() - self.conn = None - - -class Cursor(Generic[_DocumentType]): +class Cursor(_CursorBase[_DocumentType]): _query_class = _Query _getmore_class = _GetMore @@ -266,8 +245,8 @@ class Cursor(Generic[_DocumentType]): """The number of documents retrieved so far.""" return self._retrieved - def __del__(self) -> None: - self._die_no_lock() + def _get_namespace(self) -> str: + return f"{self._dbname}.{self._collname}" def clone(self) -> Cursor[_DocumentType]: """Get a clone of this cursor. @@ -897,55 +876,6 @@ class Cursor(Generic[_DocumentType]): self._read_preference = self._collection._read_preference_for(self.session) return self._read_preference - @property - def alive(self) -> bool: - """Does this cursor have the potential to return more data? - - This is mostly useful with `tailable cursors - `_ - since they will stop iterating even though they *may* return more - results in the future. - - With regular cursors, simply use a for loop instead of :attr:`alive`:: - - for doc in collection.find(): - print(doc) - - .. note:: Even if :attr:`alive` is True, :meth:`next` can raise - :exc:`StopIteration`. :attr:`alive` can also be True while iterating - a cursor from a failed server. In this case :attr:`alive` will - return False after :meth:`next` fails to retrieve the next batch - of results from the server. - """ - return bool(len(self._data) or (not self._killed)) - - @property - def cursor_id(self) -> Optional[int]: - """Returns the id of the cursor - - .. versionadded:: 2.2 - """ - return self._id - - @property - def address(self) -> Optional[tuple[str, Any]]: - """The (host, port) of the server used, or None. - - .. versionchanged:: 3.0 - Renamed from "conn_id". - """ - return self._address - - @property - def session(self) -> Optional[ClientSession]: - """The cursor's :class:`~pymongo.client_session.ClientSession`, or None. - - .. versionadded:: 3.6 - """ - if self._session and not self._session._implicit: - return self._session - return None - def __copy__(self) -> Cursor[_DocumentType]: """Support function for `copy.copy()`. @@ -1009,59 +939,6 @@ class Cursor(Generic[_DocumentType]): y[key] = value # type:ignore[index] return y - def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]: - self._killed = True - if self._id and not already_killed: - cursor_id = self._id - assert self._address is not None - address = _CursorAddress(self._address, f"{self._dbname}.{self._collname}") - else: - # Skip killCursors. - cursor_id = 0 - address = None - return cursor_id, address - - def _die_no_lock(self) -> None: - """Closes this cursor without acquiring a lock.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - - cursor_id, address = self._prepare_to_die(already_killed) - self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - def _die_lock(self) -> None: - """Closes this cursor.""" - try: - already_killed = self._killed - except AttributeError: - # ___init__ did not run to completion (or at all). - return - - cursor_id, address = self._prepare_to_die(already_killed) - self._collection.database.client._cleanup_cursor_lock( - cursor_id, - address, - self._sock_mgr, - self._session, - ) - if self._session and self._session._implicit: - self._session._attached_to_cursor = False - self._session = None - self._sock_mgr = None - - def close(self) -> None: - """Explicitly close / kill this cursor.""" - self._die_lock() - def distinct(self, key: str) -> list[Any]: """Get a list of distinct values for `key` among all documents in the result set of this query. @@ -1294,40 +1171,8 @@ class Cursor(Generic[_DocumentType]): def __enter__(self) -> Cursor[_DocumentType]: return self - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - self.close() - @_csot.apply - def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: - """Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``. - - To use:: - - >>> cursor.to_list() - - Or, to read at most n items from the cursor:: - - >>> cursor.to_list(n) - - If the cursor is empty or has no more results, an empty list will be returned. - - .. versionadded:: 4.9 - """ - res: list[_DocumentType] = [] - remaining = length - if isinstance(length, int) and length < 1: - raise ValueError("to_list() length must be greater than 0") - while self.alive: - if not self._next_batch(res, remaining): - break - if length is not None: - remaining = length - len(res) - if remaining == 0: - break - return res - - -class RawBatchCursor(Cursor, Generic[_DocumentType]): # type: ignore[type-arg] +class RawBatchCursor(Cursor[_DocumentType]): """A cursor / iterator over raw batches of BSON data from a query result.""" _query_class = _RawBatchQuery diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py new file mode 100644 index 000000000..52ada613c --- /dev/null +++ b/pymongo/synchronous/cursor_base.py @@ -0,0 +1,122 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Synchronous cursor base extending the shared agnostic cursor base.""" +from __future__ import annotations + +from abc import abstractmethod +from typing import TYPE_CHECKING, Any, Optional + +from pymongo import _csot +from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.lock import _create_lock +from pymongo.typings import _DocumentType + +if TYPE_CHECKING: + from pymongo.synchronous.client_session import ClientSession + from pymongo.synchronous.pool import Connection + +_IS_SYNC = True + + +class _ConnectionManager: + """Used with exhaust cursors to ensure the connection is returned.""" + + def __init__(self, conn: Connection, more_to_come: bool): + self.conn: Optional[Connection] = conn + self.more_to_come = more_to_come + self._lock = _create_lock() + + def update_exhaust(self, more_to_come: bool) -> None: + self.more_to_come = more_to_come + + def close(self) -> None: + """Return this instance's connection to the connection pool.""" + if self.conn: + self.conn.unpin() + self.conn = None + + +class _CursorBase(_AgnosticCursorBase[_DocumentType]): + """Synchronous cursor base class.""" + + @property + def session(self) -> Optional[ClientSession]: + """The cursor's :class:`~pymongo.client_session.ClientSession`, or None. + + .. versionadded:: 3.6 + """ + if self._session and not self._session._implicit: + return self._session + return None + + @abstractmethod + def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] + ... + + def _die_lock(self) -> None: + """Closes this cursor.""" + try: + already_killed = self._killed + except AttributeError: + # ___init__ did not run to completion (or at all). + return + + cursor_id, address = self._prepare_to_die(already_killed) + self._collection.database.client._cleanup_cursor_lock( + cursor_id, + address, + self._sock_mgr, + self._session, + ) + if self._session and self._session._implicit: + self._session._attached_to_cursor = False + self._session = None + self._sock_mgr = None + + def close(self) -> None: + """Explicitly close / kill this cursor.""" + self._die_lock() + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.close() + + @_csot.apply + def to_list(self, length: Optional[int] = None) -> list[_DocumentType]: + """Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``. + + To use:: + + >>> cursor.to_list() + + Or, to read at most n items from the cursor:: + + >>> cursor.to_list(n) + + If the cursor is empty or has no more results, an empty list will be returned. + + .. versionadded:: 4.9 + """ + res: list[_DocumentType] = [] + remaining = length + if isinstance(length, int) and length < 1: + raise ValueError("to_list() length must be greater than 0") + while self.alive: + if not self._next_batch(res, remaining): + break + if length is not None: + remaining = length - len(res) + if remaining == 0: + break + return res diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index a6a07a6c9..4e3d178f8 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -146,7 +146,7 @@ if TYPE_CHECKING: from pymongo.server_selectors import Selection from pymongo.synchronous.bulk import _Bulk from pymongo.synchronous.client_session import ClientSession, _ServerSession - from pymongo.synchronous.cursor import _ConnectionManager + from pymongo.synchronous.cursor_base import _ConnectionManager from pymongo.synchronous.encryption import _Encrypter from pymongo.synchronous.pool import Connection from pymongo.synchronous.server import Server diff --git a/pyproject.toml b/pyproject.toml index 12af7c6f6..65cbeca8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,7 @@ mockupdb = [ perf = ["simplejson>=3.17.0"] typing = [ "mypy==1.19.1", - "pyright==1.1.407", + "pyright==1.1.408", "typing_extensions>=3.7.4.2", "pip>=20.2" ] diff --git a/test/command_logging/command.json b/test/command_logging/command.json index d2970df69..57e18b365 100644 --- a/test/command_logging/command.json +++ b/test/command_logging/command.json @@ -23,6 +23,12 @@ "database": "database", "collectionName": "logging-tests-collection" } + }, + { + "session": { + "id": "session", + "client": "client" + } } ], "initialData": [ @@ -210,6 +216,328 @@ ] } ] + }, + { + "description": "A successful commitTransaction command", + "runOnRequirements": [ + { + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "operations": [ + { + "name": "startTransaction", + "object": "session" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session", + "document": { + "_id": 2, + "x": 22 + } + } + }, + { + "name": "commitTransaction", + "object": "session" + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "command", + "data": { + "message": "Command started", + "databaseName": "logging-tests", + "commandName": "insert", + "command": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command succeeded", + "databaseName": "logging-tests", + "commandName": "insert", + "reply": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command started", + "databaseName": "admin", + "commandName": "commitTransaction", + "command": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command succeeded", + "databaseName": "admin", + "commandName": "commitTransaction", + "reply": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "A successful abortTransaction command", + "runOnRequirements": [ + { + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "operations": [ + { + "name": "startTransaction", + "object": "session" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session", + "document": { + "_id": 3, + "x": 33 + } + } + }, + { + "name": "abortTransaction", + "object": "session" + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "command", + "data": { + "message": "Command started", + "databaseName": "logging-tests", + "commandName": "insert", + "command": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command succeeded", + "databaseName": "logging-tests", + "commandName": "insert", + "reply": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command started", + "databaseName": "admin", + "commandName": "abortTransaction", + "command": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "command", + "data": { + "message": "Command succeeded", + "databaseName": "admin", + "commandName": "abortTransaction", + "reply": { + "$$type": "string" + }, + "requestId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + } + ] + } + ] } ] } diff --git a/test/csot/sessions-inherit-timeoutMS.json b/test/csot/sessions-inherit-timeoutMS.json index dbf163e48..50feabf60 100644 --- a/test/csot/sessions-inherit-timeoutMS.json +++ b/test/csot/sessions-inherit-timeoutMS.json @@ -244,11 +244,12 @@ "failPoint": { "configureFailPoint": "failCommand", "mode": { - "times": 1 + "times": 2 }, "data": { "failCommands": [ - "insert" + "insert", + "abortTransaction" ], "blockConnection": true, "blockTimeMS": 600 diff --git a/test/csot/sessions-override-operation-timeoutMS.json b/test/csot/sessions-override-operation-timeoutMS.json index 441c69832..78e873f94 100644 --- a/test/csot/sessions-override-operation-timeoutMS.json +++ b/test/csot/sessions-override-operation-timeoutMS.json @@ -245,11 +245,12 @@ "failPoint": { "configureFailPoint": "failCommand", "mode": { - "times": 1 + "times": 2 }, "data": { "failCommands": [ - "insert" + "insert", + "abortTransaction" ], "blockConnection": true, "blockTimeMS": 600 diff --git a/test/csot/sessions-override-timeoutMS.json b/test/csot/sessions-override-timeoutMS.json index d90152e90..c4a446c53 100644 --- a/test/csot/sessions-override-timeoutMS.json +++ b/test/csot/sessions-override-timeoutMS.json @@ -242,11 +242,12 @@ "failPoint": { "configureFailPoint": "failCommand", "mode": { - "times": 1 + "times": 2 }, "data": { "failCommands": [ - "insert" + "insert", + "abortTransaction" ], "blockConnection": true, "blockTimeMS": 600 diff --git a/test/handshake/unified/metadata-not-propagated.json b/test/handshake/unified/metadata-not-propagated.json index 500b579b8..7f70dd148 100644 --- a/test/handshake/unified/metadata-not-propagated.json +++ b/test/handshake/unified/metadata-not-propagated.json @@ -10,6 +10,7 @@ { "client": { "id": "client", + "useMultipleMongoses": false, "observeEvents": [ "commandSucceededEvent", "commandFailedEvent", diff --git a/test/server_selection/server_selection/ReplicaSetWithPrimary/read/DeprioritizedNearestStateChange.json b/test/server_selection/server_selection/ReplicaSetWithPrimary/read/DeprioritizedNearestStateChange.json new file mode 100644 index 000000000..f1560bc11 --- /dev/null +++ b/test/server_selection/server_selection/ReplicaSetWithPrimary/read/DeprioritizedNearestStateChange.json @@ -0,0 +1,78 @@ +{ + "topology_description": { + "type": "ReplicaSetWithPrimary", + "servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 5, + "type": "RSSecondary", + "tags": { + "data_center": "nyc" + } + }, + { + "address": "c:27017", + "avg_rtt_ms": 100, + "type": "RSSecondary", + "tags": { + "data_center": "nyc" + } + }, + { + "address": "a:27017", + "avg_rtt_ms": 25, + "type": "RSPrimary", + "tags": { + "data_center": "nyc" + } + } + ] + }, + "operation": "read", + "read_preference": { + "mode": "Nearest", + "tag_sets": [ + { + "data_center": "nyc" + } + ] + }, + "deprioritized_servers": [ + { + "address": "b:27017", + "avg_rtt_ms": 50, + "type": "RSPrimary", + "tags": { + "data_center": "nyc" + } + } + ], + "suitable_servers": [ + { + "address": "c:27017", + "avg_rtt_ms": 100, + "type": "RSSecondary", + "tags": { + "data_center": "nyc" + } + }, + { + "address": "a:27017", + "avg_rtt_ms": 25, + "type": "RSPrimary", + "tags": { + "data_center": "nyc" + } + } + ], + "in_latency_window": [ + { + "address": "a:27017", + "avg_rtt_ms": 25, + "type": "RSPrimary", + "tags": { + "data_center": "nyc" + } + } + ] +} diff --git a/tools/synchro.py b/tools/synchro.py index 91820644e..3e326a108 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -30,6 +30,7 @@ from unasync import Rule, unasync_files # type: ignore[import-not-found] replacements = { "AsyncCollection": "Collection", "AsyncDatabase": "Database", + "_AsyncCursorBase": "_CursorBase", "AsyncCursor": "Cursor", "AsyncMongoClient": "MongoClient", "AsyncCommandCursor": "CommandCursor", diff --git a/uv.lock b/uv.lock index 0ede4d796..7d0ff9fb0 100644 --- a/uv.lock +++ b/uv.lock @@ -1624,7 +1624,7 @@ pip = [{ name = "pip", specifier = ">=20.2" }] typing = [ { name = "mypy", specifier = "==1.19.1" }, { name = "pip", specifier = ">=20.2" }, - { name = "pyright", specifier = "==1.1.407" }, + { name = "pyright", specifier = "==1.1.408" }, { name = "typing-extensions", specifier = ">=3.7.4.2" }, ] @@ -1674,15 +1674,15 @@ wheels = [ [[package]] name = "pyright" -version = "1.1.407" +version = "1.1.408" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "nodeenv" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/a6/1b/0aa08ee42948b61745ac5b5b5ccaec4669e8884b53d31c8ec20b2fcd6b6f/pyright-1.1.407.tar.gz", hash = "sha256:099674dba5c10489832d4a4b2d302636152a9a42d317986c38474c76fe562262", size = 4122872, upload-time = "2025-10-24T23:17:15.145Z" } +sdist = { url = "https://files.pythonhosted.org/packages/74/b2/5db700e52554b8f025faa9c3c624c59f1f6c8841ba81ab97641b54322f16/pyright-1.1.408.tar.gz", hash = "sha256:f28f2321f96852fa50b5829ea492f6adb0e6954568d1caa3f3af3a5f555eb684", size = 4400578, upload-time = "2026-01-08T08:07:38.795Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/dc/93/b69052907d032b00c40cb656d21438ec00b3a471733de137a3f65a49a0a0/pyright-1.1.407-py3-none-any.whl", hash = "sha256:6dd419f54fcc13f03b52285796d65e639786373f433e243f8b94cf93a7444d21", size = 5997008, upload-time = "2025-10-24T23:17:13.159Z" }, + { url = "https://files.pythonhosted.org/packages/0c/82/a2c93e32800940d9573fb28c346772a14778b84ba7524e691b324620ab89/pyright-1.1.408-py3-none-any.whl", hash = "sha256:090b32865f4fdb1e0e6cd82bf5618480d48eecd2eb2e70f960982a3d9a4c17c1", size = 6399144, upload-time = "2026-01-08T08:07:37.082Z" }, ] [[package]]