PYTHON-1357 - Refactor Cursor and CommandCursor (#2691)
This commit is contained in:
parent
182d8e2ea0
commit
543c4e532c
@ -5,3 +5,4 @@
|
|||||||
.. automodule:: pymongo.asynchronous.command_cursor
|
.. automodule:: pymongo.asynchronous.command_cursor
|
||||||
:synopsis: Tools for iterating over MongoDB command results
|
:synopsis: Tools for iterating over MongoDB command results
|
||||||
:members:
|
:members:
|
||||||
|
:inherited-members:
|
||||||
|
|||||||
@ -7,6 +7,8 @@
|
|||||||
|
|
||||||
.. autoclass:: pymongo.asynchronous.cursor.AsyncCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
|
.. autoclass:: pymongo.asynchronous.cursor.AsyncCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
|
||||||
:members:
|
:members:
|
||||||
|
:inherited-members:
|
||||||
|
|
||||||
|
|
||||||
.. describe:: c[index]
|
.. describe:: c[index]
|
||||||
|
|
||||||
|
|||||||
@ -4,3 +4,4 @@
|
|||||||
.. automodule:: pymongo.command_cursor
|
.. automodule:: pymongo.command_cursor
|
||||||
:synopsis: Tools for iterating over MongoDB command results
|
:synopsis: Tools for iterating over MongoDB command results
|
||||||
:members:
|
:members:
|
||||||
|
:inherited-members:
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
.. autoclass:: pymongo.cursor.Cursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
|
.. autoclass:: pymongo.cursor.Cursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
|
||||||
:members:
|
:members:
|
||||||
|
:inherited-members:
|
||||||
|
|
||||||
.. describe:: c[index]
|
.. describe:: c[index]
|
||||||
|
|
||||||
|
|||||||
@ -157,7 +157,7 @@ from bson.binary import Binary
|
|||||||
from bson.int64 import Int64
|
from bson.int64 import Int64
|
||||||
from bson.timestamp import Timestamp
|
from bson.timestamp import Timestamp
|
||||||
from pymongo import _csot
|
from pymongo import _csot
|
||||||
from pymongo.asynchronous.cursor import _ConnectionManager
|
from pymongo.asynchronous.cursor_base import _ConnectionManager
|
||||||
from pymongo.errors import (
|
from pymongo.errors import (
|
||||||
ConfigurationError,
|
ConfigurationError,
|
||||||
ConnectionFailure,
|
ConnectionFailure,
|
||||||
|
|||||||
@ -20,7 +20,6 @@ from typing import (
|
|||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Generic,
|
|
||||||
Mapping,
|
Mapping,
|
||||||
NoReturn,
|
NoReturn,
|
||||||
Optional,
|
Optional,
|
||||||
@ -29,17 +28,10 @@ from typing import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from bson import CodecOptions, _convert_raw_document_lists_to_streams
|
from bson import CodecOptions, _convert_raw_document_lists_to_streams
|
||||||
from pymongo import _csot
|
from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager
|
||||||
from pymongo.asynchronous.cursor import _ConnectionManager
|
|
||||||
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
|
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
|
||||||
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
||||||
from pymongo.message import (
|
from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore
|
||||||
_CursorAddress,
|
|
||||||
_GetMore,
|
|
||||||
_OpMsg,
|
|
||||||
_OpReply,
|
|
||||||
_RawBatchGetMore,
|
|
||||||
)
|
|
||||||
from pymongo.response import PinnedResponse
|
from pymongo.response import PinnedResponse
|
||||||
from pymongo.typings import _Address, _DocumentOut, _DocumentType
|
from pymongo.typings import _Address, _DocumentOut, _DocumentType
|
||||||
|
|
||||||
@ -51,7 +43,7 @@ if TYPE_CHECKING:
|
|||||||
_IS_SYNC = False
|
_IS_SYNC = False
|
||||||
|
|
||||||
|
|
||||||
class AsyncCommandCursor(Generic[_DocumentType]):
|
class AsyncCommandCursor(_AsyncCursorBase[_DocumentType]):
|
||||||
"""An asynchronous cursor / iterator over command cursors."""
|
"""An asynchronous cursor / iterator over command cursors."""
|
||||||
|
|
||||||
_getmore_class = _GetMore
|
_getmore_class = _GetMore
|
||||||
@ -98,8 +90,8 @@ class AsyncCommandCursor(Generic[_DocumentType]):
|
|||||||
f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}"
|
f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def _get_namespace(self) -> str:
|
||||||
self._die_no_lock()
|
return self._ns
|
||||||
|
|
||||||
def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
|
def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
|
||||||
"""Limits the number of documents returned in one batch. Each batch
|
"""Limits the number of documents returned in one batch. Each batch
|
||||||
@ -161,94 +153,12 @@ class AsyncCommandCursor(Generic[_DocumentType]):
|
|||||||
) -> Sequence[_DocumentOut]:
|
) -> Sequence[_DocumentOut]:
|
||||||
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)
|
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)
|
||||||
|
|
||||||
@property
|
|
||||||
def alive(self) -> bool:
|
|
||||||
"""Does this cursor have the potential to return more data?
|
|
||||||
|
|
||||||
Even if :attr:`alive` is ``True``, :meth:`next` can raise
|
|
||||||
:exc:`StopIteration`. Best to use a for loop::
|
|
||||||
|
|
||||||
async for doc in collection.aggregate(pipeline):
|
|
||||||
print(doc)
|
|
||||||
|
|
||||||
.. note:: :attr:`alive` can be True while iterating a cursor from
|
|
||||||
a failed server. In this case :attr:`alive` will return False after
|
|
||||||
:meth:`next` fails to retrieve the next batch of results from the
|
|
||||||
server.
|
|
||||||
"""
|
|
||||||
return bool(len(self._data) or (not self._killed))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor_id(self) -> int:
|
|
||||||
"""Returns the id of the cursor."""
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def address(self) -> Optional[_Address]:
|
|
||||||
"""The (host, port) of the server used, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.0
|
|
||||||
"""
|
|
||||||
return self._address
|
|
||||||
|
|
||||||
@property
|
|
||||||
def session(self) -> Optional[AsyncClientSession]:
|
|
||||||
"""The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.6
|
|
||||||
"""
|
|
||||||
if self._session and not self._session._implicit:
|
|
||||||
return self._session
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
|
|
||||||
already_killed = self._killed
|
|
||||||
self._killed = True
|
|
||||||
if self._id and not already_killed:
|
|
||||||
cursor_id = self._id
|
|
||||||
assert self._address is not None
|
|
||||||
address = _CursorAddress(self._address, self._ns)
|
|
||||||
else:
|
|
||||||
# Skip killCursors.
|
|
||||||
cursor_id = 0
|
|
||||||
address = None
|
|
||||||
return cursor_id, address
|
|
||||||
|
|
||||||
def _die_no_lock(self) -> None:
|
|
||||||
"""Closes this cursor without acquiring a lock."""
|
|
||||||
cursor_id, address = self._prepare_to_die()
|
|
||||||
self._collection.database.client._cleanup_cursor_no_lock(
|
|
||||||
cursor_id, address, self._sock_mgr, self._session
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
async def _die_lock(self) -> None:
|
|
||||||
"""Closes this cursor."""
|
|
||||||
cursor_id, address = self._prepare_to_die()
|
|
||||||
await self._collection.database.client._cleanup_cursor_lock(
|
|
||||||
cursor_id,
|
|
||||||
address,
|
|
||||||
self._sock_mgr,
|
|
||||||
self._session,
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
def _end_session(self) -> None:
|
def _end_session(self) -> None:
|
||||||
if self._session and self._session._implicit:
|
if self._session and self._session._implicit:
|
||||||
self._session._attached_to_cursor = False
|
self._session._attached_to_cursor = False
|
||||||
self._session._end_implicit_session()
|
self._session._end_implicit_session()
|
||||||
self._session = None
|
self._session = None
|
||||||
|
|
||||||
async def close(self) -> None:
|
|
||||||
"""Explicitly close / kill this cursor."""
|
|
||||||
await self._die_lock()
|
|
||||||
|
|
||||||
async def _send_message(self, operation: _GetMore) -> None:
|
async def _send_message(self, operation: _GetMore) -> None:
|
||||||
"""Send a getmore message and handle the response."""
|
"""Send a getmore message and handle the response."""
|
||||||
client = self._collection.database.client
|
client = self._collection.database.client
|
||||||
@ -330,6 +240,9 @@ class AsyncCommandCursor(Generic[_DocumentType]):
|
|||||||
def __aiter__(self) -> AsyncIterator[_DocumentType]:
|
def __aiter__(self) -> AsyncIterator[_DocumentType]:
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
|
||||||
|
return self
|
||||||
|
|
||||||
async def next(self) -> _DocumentType:
|
async def next(self) -> _DocumentType:
|
||||||
"""Advance the cursor."""
|
"""Advance the cursor."""
|
||||||
# Block until a document is returnable.
|
# Block until a document is returnable.
|
||||||
@ -385,41 +298,6 @@ class AsyncCommandCursor(Generic[_DocumentType]):
|
|||||||
"""
|
"""
|
||||||
return await self._try_next(get_more_allowed=True)
|
return await self._try_next(get_more_allowed=True)
|
||||||
|
|
||||||
async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
|
|
||||||
return self
|
|
||||||
|
|
||||||
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
||||||
await self.close()
|
|
||||||
|
|
||||||
@_csot.apply
|
|
||||||
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
|
||||||
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.
|
|
||||||
|
|
||||||
To use::
|
|
||||||
|
|
||||||
>>> await cursor.to_list()
|
|
||||||
|
|
||||||
Or, so read at most n items from the cursor::
|
|
||||||
|
|
||||||
>>> await cursor.to_list(n)
|
|
||||||
|
|
||||||
If the cursor is empty or has no more results, an empty list will be returned.
|
|
||||||
|
|
||||||
.. versionadded:: 4.9
|
|
||||||
"""
|
|
||||||
res: list[_DocumentType] = []
|
|
||||||
remaining = length
|
|
||||||
if isinstance(length, int) and length < 1:
|
|
||||||
raise ValueError("to_list() length must be greater than 0")
|
|
||||||
while self.alive:
|
|
||||||
if not await self._next_batch(res, remaining):
|
|
||||||
break
|
|
||||||
if length is not None:
|
|
||||||
remaining = length - len(res)
|
|
||||||
if remaining == 0:
|
|
||||||
break
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]):
|
class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]):
|
||||||
_getmore_class = _RawBatchGetMore
|
_getmore_class = _RawBatchGetMore
|
||||||
|
|||||||
@ -21,7 +21,6 @@ from collections import deque
|
|||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Generic,
|
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
Mapping,
|
Mapping,
|
||||||
@ -36,7 +35,8 @@ from typing import (
|
|||||||
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
|
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
|
||||||
from bson.code import Code
|
from bson.code import Code
|
||||||
from bson.son import SON
|
from bson.son import SON
|
||||||
from pymongo import _csot, helpers_shared
|
from pymongo import helpers_shared
|
||||||
|
from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager
|
||||||
from pymongo.asynchronous.helpers import anext
|
from pymongo.asynchronous.helpers import anext
|
||||||
from pymongo.collation import validate_collation_or_none
|
from pymongo.collation import validate_collation_or_none
|
||||||
from pymongo.common import (
|
from pymongo.common import (
|
||||||
@ -45,9 +45,7 @@ from pymongo.common import (
|
|||||||
)
|
)
|
||||||
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort
|
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort
|
||||||
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
||||||
from pymongo.lock import _async_create_lock
|
|
||||||
from pymongo.message import (
|
from pymongo.message import (
|
||||||
_CursorAddress,
|
|
||||||
_GetMore,
|
_GetMore,
|
||||||
_OpMsg,
|
_OpMsg,
|
||||||
_OpReply,
|
_OpReply,
|
||||||
@ -65,31 +63,12 @@ if TYPE_CHECKING:
|
|||||||
from bson.codec_options import CodecOptions
|
from bson.codec_options import CodecOptions
|
||||||
from pymongo.asynchronous.client_session import AsyncClientSession
|
from pymongo.asynchronous.client_session import AsyncClientSession
|
||||||
from pymongo.asynchronous.collection import AsyncCollection
|
from pymongo.asynchronous.collection import AsyncCollection
|
||||||
from pymongo.asynchronous.pool import AsyncConnection
|
|
||||||
from pymongo.read_preferences import _ServerMode
|
from pymongo.read_preferences import _ServerMode
|
||||||
|
|
||||||
_IS_SYNC = False
|
_IS_SYNC = False
|
||||||
|
|
||||||
|
|
||||||
class _ConnectionManager:
|
class AsyncCursor(_AsyncCursorBase[_DocumentType]):
|
||||||
"""Used with exhaust cursors to ensure the connection is returned."""
|
|
||||||
|
|
||||||
def __init__(self, conn: AsyncConnection, more_to_come: bool):
|
|
||||||
self.conn: Optional[AsyncConnection] = conn
|
|
||||||
self.more_to_come = more_to_come
|
|
||||||
self._lock = _async_create_lock()
|
|
||||||
|
|
||||||
def update_exhaust(self, more_to_come: bool) -> None:
|
|
||||||
self.more_to_come = more_to_come
|
|
||||||
|
|
||||||
async def close(self) -> None:
|
|
||||||
"""Return this instance's connection to the connection pool."""
|
|
||||||
if self.conn:
|
|
||||||
await self.conn.unpin()
|
|
||||||
self.conn = None
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncCursor(Generic[_DocumentType]):
|
|
||||||
_query_class = _Query
|
_query_class = _Query
|
||||||
_getmore_class = _GetMore
|
_getmore_class = _GetMore
|
||||||
|
|
||||||
@ -266,8 +245,8 @@ class AsyncCursor(Generic[_DocumentType]):
|
|||||||
"""The number of documents retrieved so far."""
|
"""The number of documents retrieved so far."""
|
||||||
return self._retrieved
|
return self._retrieved
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def _get_namespace(self) -> str:
|
||||||
self._die_no_lock()
|
return f"{self._dbname}.{self._collname}"
|
||||||
|
|
||||||
def clone(self) -> AsyncCursor[_DocumentType]:
|
def clone(self) -> AsyncCursor[_DocumentType]:
|
||||||
"""Get a clone of this cursor.
|
"""Get a clone of this cursor.
|
||||||
@ -899,55 +878,6 @@ class AsyncCursor(Generic[_DocumentType]):
|
|||||||
self._read_preference = self._collection._read_preference_for(self.session)
|
self._read_preference = self._collection._read_preference_for(self.session)
|
||||||
return self._read_preference
|
return self._read_preference
|
||||||
|
|
||||||
@property
|
|
||||||
def alive(self) -> bool:
|
|
||||||
"""Does this cursor have the potential to return more data?
|
|
||||||
|
|
||||||
This is mostly useful with `tailable cursors
|
|
||||||
<https://www.mongodb.com/docs/manual/core/tailable-cursors/>`_
|
|
||||||
since they will stop iterating even though they *may* return more
|
|
||||||
results in the future.
|
|
||||||
|
|
||||||
With regular cursors, simply use an asynchronous for loop instead of :attr:`alive`::
|
|
||||||
|
|
||||||
async for doc in collection.find():
|
|
||||||
print(doc)
|
|
||||||
|
|
||||||
.. note:: Even if :attr:`alive` is True, :meth:`next` can raise
|
|
||||||
:exc:`StopIteration`. :attr:`alive` can also be True while iterating
|
|
||||||
a cursor from a failed server. In this case :attr:`alive` will
|
|
||||||
return False after :meth:`next` fails to retrieve the next batch
|
|
||||||
of results from the server.
|
|
||||||
"""
|
|
||||||
return bool(len(self._data) or (not self._killed))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor_id(self) -> Optional[int]:
|
|
||||||
"""Returns the id of the cursor
|
|
||||||
|
|
||||||
.. versionadded:: 2.2
|
|
||||||
"""
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def address(self) -> Optional[tuple[str, Any]]:
|
|
||||||
"""The (host, port) of the server used, or None.
|
|
||||||
|
|
||||||
.. versionchanged:: 3.0
|
|
||||||
Renamed from "conn_id".
|
|
||||||
"""
|
|
||||||
return self._address
|
|
||||||
|
|
||||||
@property
|
|
||||||
def session(self) -> Optional[AsyncClientSession]:
|
|
||||||
"""The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.6
|
|
||||||
"""
|
|
||||||
if self._session and not self._session._implicit:
|
|
||||||
return self._session
|
|
||||||
return None
|
|
||||||
|
|
||||||
def __copy__(self) -> AsyncCursor[_DocumentType]:
|
def __copy__(self) -> AsyncCursor[_DocumentType]:
|
||||||
"""Support function for `copy.copy()`.
|
"""Support function for `copy.copy()`.
|
||||||
|
|
||||||
@ -1011,59 +941,6 @@ class AsyncCursor(Generic[_DocumentType]):
|
|||||||
y[key] = value # type:ignore[index]
|
y[key] = value # type:ignore[index]
|
||||||
return y
|
return y
|
||||||
|
|
||||||
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
|
|
||||||
self._killed = True
|
|
||||||
if self._id and not already_killed:
|
|
||||||
cursor_id = self._id
|
|
||||||
assert self._address is not None
|
|
||||||
address = _CursorAddress(self._address, f"{self._dbname}.{self._collname}")
|
|
||||||
else:
|
|
||||||
# Skip killCursors.
|
|
||||||
cursor_id = 0
|
|
||||||
address = None
|
|
||||||
return cursor_id, address
|
|
||||||
|
|
||||||
def _die_no_lock(self) -> None:
|
|
||||||
"""Closes this cursor without acquiring a lock."""
|
|
||||||
try:
|
|
||||||
already_killed = self._killed
|
|
||||||
except AttributeError:
|
|
||||||
# ___init__ did not run to completion (or at all).
|
|
||||||
return
|
|
||||||
|
|
||||||
cursor_id, address = self._prepare_to_die(already_killed)
|
|
||||||
self._collection.database.client._cleanup_cursor_no_lock(
|
|
||||||
cursor_id, address, self._sock_mgr, self._session
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
async def _die_lock(self) -> None:
|
|
||||||
"""Closes this cursor."""
|
|
||||||
try:
|
|
||||||
already_killed = self._killed
|
|
||||||
except AttributeError:
|
|
||||||
# ___init__ did not run to completion (or at all).
|
|
||||||
return
|
|
||||||
|
|
||||||
cursor_id, address = self._prepare_to_die(already_killed)
|
|
||||||
await self._collection.database.client._cleanup_cursor_lock(
|
|
||||||
cursor_id,
|
|
||||||
address,
|
|
||||||
self._sock_mgr,
|
|
||||||
self._session,
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
async def close(self) -> None:
|
|
||||||
"""Explicitly close / kill this cursor."""
|
|
||||||
await self._die_lock()
|
|
||||||
|
|
||||||
async def distinct(self, key: str) -> list[Any]:
|
async def distinct(self, key: str) -> list[Any]:
|
||||||
"""Get a list of distinct values for `key` among all documents
|
"""Get a list of distinct values for `key` among all documents
|
||||||
in the result set of this query.
|
in the result set of this query.
|
||||||
@ -1296,40 +1173,8 @@ class AsyncCursor(Generic[_DocumentType]):
|
|||||||
async def __aenter__(self) -> AsyncCursor[_DocumentType]:
|
async def __aenter__(self) -> AsyncCursor[_DocumentType]:
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
||||||
await self.close()
|
|
||||||
|
|
||||||
@_csot.apply
|
class AsyncRawBatchCursor(AsyncCursor[_DocumentType]):
|
||||||
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
|
||||||
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.
|
|
||||||
|
|
||||||
To use::
|
|
||||||
|
|
||||||
>>> await cursor.to_list()
|
|
||||||
|
|
||||||
Or, to read at most n items from the cursor::
|
|
||||||
|
|
||||||
>>> await cursor.to_list(n)
|
|
||||||
|
|
||||||
If the cursor is empty or has no more results, an empty list will be returned.
|
|
||||||
|
|
||||||
.. versionadded:: 4.9
|
|
||||||
"""
|
|
||||||
res: list[_DocumentType] = []
|
|
||||||
remaining = length
|
|
||||||
if isinstance(length, int) and length < 1:
|
|
||||||
raise ValueError("to_list() length must be greater than 0")
|
|
||||||
while self.alive:
|
|
||||||
if not await self._next_batch(res, remaining):
|
|
||||||
break
|
|
||||||
if length is not None:
|
|
||||||
remaining = length - len(res)
|
|
||||||
if remaining == 0:
|
|
||||||
break
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncRawBatchCursor(AsyncCursor, Generic[_DocumentType]): # type: ignore[type-arg]
|
|
||||||
"""An asynchronous cursor / iterator over raw batches of BSON data from a query result."""
|
"""An asynchronous cursor / iterator over raw batches of BSON data from a query result."""
|
||||||
|
|
||||||
_query_class = _RawBatchQuery
|
_query_class = _RawBatchQuery
|
||||||
|
|||||||
122
pymongo/asynchronous/cursor_base.py
Normal file
122
pymongo/asynchronous/cursor_base.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
# Copyright 2026-present MongoDB, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||||
|
# may not use this file except in compliance with the License. You
|
||||||
|
# may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied. See the License for the specific language governing
|
||||||
|
# permissions and limitations under the License.
|
||||||
|
|
||||||
|
"""Asynchronous cursor base extending the shared agnostic cursor base."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import abstractmethod
|
||||||
|
from typing import TYPE_CHECKING, Any, Optional
|
||||||
|
|
||||||
|
from pymongo import _csot
|
||||||
|
from pymongo.cursor_shared import _AgnosticCursorBase
|
||||||
|
from pymongo.lock import _async_create_lock
|
||||||
|
from pymongo.typings import _DocumentType
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pymongo.asynchronous.client_session import AsyncClientSession
|
||||||
|
from pymongo.asynchronous.pool import AsyncConnection
|
||||||
|
|
||||||
|
_IS_SYNC = False
|
||||||
|
|
||||||
|
|
||||||
|
class _ConnectionManager:
|
||||||
|
"""Used with exhaust cursors to ensure the connection is returned."""
|
||||||
|
|
||||||
|
def __init__(self, conn: AsyncConnection, more_to_come: bool):
|
||||||
|
self.conn: Optional[AsyncConnection] = conn
|
||||||
|
self.more_to_come = more_to_come
|
||||||
|
self._lock = _async_create_lock()
|
||||||
|
|
||||||
|
def update_exhaust(self, more_to_come: bool) -> None:
|
||||||
|
self.more_to_come = more_to_come
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
"""Return this instance's connection to the connection pool."""
|
||||||
|
if self.conn:
|
||||||
|
await self.conn.unpin()
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
|
|
||||||
|
class _AsyncCursorBase(_AgnosticCursorBase[_DocumentType]):
|
||||||
|
"""Asynchronous cursor base class."""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> Optional[AsyncClientSession]:
|
||||||
|
"""The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None.
|
||||||
|
|
||||||
|
.. versionadded:: 3.6
|
||||||
|
"""
|
||||||
|
if self._session and not self._session._implicit:
|
||||||
|
return self._session
|
||||||
|
return None
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg]
|
||||||
|
...
|
||||||
|
|
||||||
|
async def _die_lock(self) -> None:
|
||||||
|
"""Closes this cursor."""
|
||||||
|
try:
|
||||||
|
already_killed = self._killed
|
||||||
|
except AttributeError:
|
||||||
|
# ___init__ did not run to completion (or at all).
|
||||||
|
return
|
||||||
|
|
||||||
|
cursor_id, address = self._prepare_to_die(already_killed)
|
||||||
|
await self._collection.database.client._cleanup_cursor_lock(
|
||||||
|
cursor_id,
|
||||||
|
address,
|
||||||
|
self._sock_mgr,
|
||||||
|
self._session,
|
||||||
|
)
|
||||||
|
if self._session and self._session._implicit:
|
||||||
|
self._session._attached_to_cursor = False
|
||||||
|
self._session = None
|
||||||
|
self._sock_mgr = None
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
"""Explicitly close / kill this cursor."""
|
||||||
|
await self._die_lock()
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
||||||
|
await self.close()
|
||||||
|
|
||||||
|
@_csot.apply
|
||||||
|
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
||||||
|
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.
|
||||||
|
|
||||||
|
To use::
|
||||||
|
|
||||||
|
>>> await cursor.to_list()
|
||||||
|
|
||||||
|
Or, to read at most n items from the cursor::
|
||||||
|
|
||||||
|
>>> await cursor.to_list(n)
|
||||||
|
|
||||||
|
If the cursor is empty or has no more results, an empty list will be returned.
|
||||||
|
|
||||||
|
.. versionadded:: 4.9
|
||||||
|
"""
|
||||||
|
res: list[_DocumentType] = []
|
||||||
|
remaining = length
|
||||||
|
if isinstance(length, int) and length < 1:
|
||||||
|
raise ValueError("to_list() length must be greater than 0")
|
||||||
|
while self.alive:
|
||||||
|
if not await self._next_batch(res, remaining):
|
||||||
|
break
|
||||||
|
if length is not None:
|
||||||
|
remaining = length - len(res)
|
||||||
|
if remaining == 0:
|
||||||
|
break
|
||||||
|
return res
|
||||||
@ -139,7 +139,7 @@ if TYPE_CHECKING:
|
|||||||
from bson.objectid import ObjectId
|
from bson.objectid import ObjectId
|
||||||
from pymongo.asynchronous.bulk import _AsyncBulk
|
from pymongo.asynchronous.bulk import _AsyncBulk
|
||||||
from pymongo.asynchronous.client_session import AsyncClientSession, _ServerSession
|
from pymongo.asynchronous.client_session import AsyncClientSession, _ServerSession
|
||||||
from pymongo.asynchronous.cursor import _ConnectionManager
|
from pymongo.asynchronous.cursor_base import _ConnectionManager
|
||||||
from pymongo.asynchronous.encryption import _Encrypter
|
from pymongo.asynchronous.encryption import _Encrypter
|
||||||
from pymongo.asynchronous.pool import AsyncConnection
|
from pymongo.asynchronous.pool import AsyncConnection
|
||||||
from pymongo.asynchronous.server import Server
|
from pymongo.asynchronous.server import Server
|
||||||
|
|||||||
@ -16,7 +16,104 @@
|
|||||||
"""Constants and types shared across all cursor classes."""
|
"""Constants and types shared across all cursor classes."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Any, Mapping, Sequence, Tuple, Union
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Any, Generic, Mapping, Optional, Sequence, Tuple, Union
|
||||||
|
|
||||||
|
from pymongo.message import _CursorAddress
|
||||||
|
from pymongo.typings import _Address, _DocumentType
|
||||||
|
|
||||||
|
|
||||||
|
class _AgnosticCursorBase(Generic[_DocumentType], ABC):
|
||||||
|
"""
|
||||||
|
Shared IO-agnostic cursor base used by both async and sync cursor classes.
|
||||||
|
All IO-specific behavior is implemented in subclasses.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# These are all typed more accurately in subclasses.
|
||||||
|
_collection: Any
|
||||||
|
_id: Optional[int]
|
||||||
|
_data: Any
|
||||||
|
_address: Optional[_Address]
|
||||||
|
_sock_mgr: Any
|
||||||
|
_session: Optional[Any]
|
||||||
|
_killed: bool
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_namespace(self) -> str:
|
||||||
|
"""Return the full namespace (dbname.collname) for this cursor."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def __del__(self) -> None:
|
||||||
|
self._die_no_lock()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def alive(self) -> bool:
|
||||||
|
"""Does this cursor have the potential to return more data?
|
||||||
|
|
||||||
|
This is mostly useful with `tailable cursors
|
||||||
|
<https://www.mongodb.com/docs/manual/core/tailable-cursors/>`_
|
||||||
|
since they will stop iterating even though they *may* return more
|
||||||
|
results in the future.
|
||||||
|
|
||||||
|
With regular cursors, simply use an asynchronous for loop instead of :attr:`alive`::
|
||||||
|
|
||||||
|
async for doc in collection.find():
|
||||||
|
print(doc)
|
||||||
|
|
||||||
|
.. note:: Even if :attr:`alive` is True, :meth:`next` can raise
|
||||||
|
:exc:`StopIteration`. :attr:`alive` can also be True while iterating
|
||||||
|
a cursor from a failed server. In this case :attr:`alive` will
|
||||||
|
return False after :meth:`next` fails to retrieve the next batch
|
||||||
|
of results from the server.
|
||||||
|
"""
|
||||||
|
return bool(len(self._data) or (not self._killed))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cursor_id(self) -> Optional[int]:
|
||||||
|
"""Returns the id of the cursor.
|
||||||
|
|
||||||
|
.. versionadded:: 2.2
|
||||||
|
"""
|
||||||
|
return self._id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def address(self) -> Optional[_Address]:
|
||||||
|
"""The (host, port) of the server used, or None.
|
||||||
|
|
||||||
|
.. versionchanged:: 3.0
|
||||||
|
Renamed from "conn_id".
|
||||||
|
"""
|
||||||
|
return self._address
|
||||||
|
|
||||||
|
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
|
||||||
|
self._killed = True
|
||||||
|
if self._id and not already_killed:
|
||||||
|
cursor_id = self._id
|
||||||
|
assert self._address is not None
|
||||||
|
address = _CursorAddress(self._address, self._get_namespace())
|
||||||
|
else:
|
||||||
|
# Skip killCursors.
|
||||||
|
cursor_id = 0
|
||||||
|
address = None
|
||||||
|
return cursor_id, address
|
||||||
|
|
||||||
|
def _die_no_lock(self) -> None:
|
||||||
|
"""Closes this cursor without acquiring a lock."""
|
||||||
|
try:
|
||||||
|
already_killed = self._killed
|
||||||
|
except AttributeError:
|
||||||
|
# ___init__ did not run to completion (or at all).
|
||||||
|
return
|
||||||
|
|
||||||
|
cursor_id, address = self._prepare_to_die(already_killed)
|
||||||
|
self._collection.database.client._cleanup_cursor_no_lock(
|
||||||
|
cursor_id, address, self._sock_mgr, self._session
|
||||||
|
)
|
||||||
|
if self._session and self._session._implicit:
|
||||||
|
self._session._attached_to_cursor = False
|
||||||
|
self._session = None
|
||||||
|
self._sock_mgr = None
|
||||||
|
|
||||||
|
|
||||||
# These errors mean that the server has already killed the cursor so there is
|
# These errors mean that the server has already killed the cursor so there is
|
||||||
# no need to send killCursors.
|
# no need to send killCursors.
|
||||||
|
|||||||
@ -168,7 +168,7 @@ from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
|
|||||||
from pymongo.read_concern import ReadConcern
|
from pymongo.read_concern import ReadConcern
|
||||||
from pymongo.read_preferences import ReadPreference, _ServerMode
|
from pymongo.read_preferences import ReadPreference, _ServerMode
|
||||||
from pymongo.server_type import SERVER_TYPE
|
from pymongo.server_type import SERVER_TYPE
|
||||||
from pymongo.synchronous.cursor import _ConnectionManager
|
from pymongo.synchronous.cursor_base import _ConnectionManager
|
||||||
from pymongo.write_concern import WriteConcern
|
from pymongo.write_concern import WriteConcern
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|||||||
@ -19,7 +19,6 @@ from collections import deque
|
|||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Generic,
|
|
||||||
Iterator,
|
Iterator,
|
||||||
Mapping,
|
Mapping,
|
||||||
NoReturn,
|
NoReturn,
|
||||||
@ -29,18 +28,11 @@ from typing import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from bson import CodecOptions, _convert_raw_document_lists_to_streams
|
from bson import CodecOptions, _convert_raw_document_lists_to_streams
|
||||||
from pymongo import _csot
|
|
||||||
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
|
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
|
||||||
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
||||||
from pymongo.message import (
|
from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore
|
||||||
_CursorAddress,
|
|
||||||
_GetMore,
|
|
||||||
_OpMsg,
|
|
||||||
_OpReply,
|
|
||||||
_RawBatchGetMore,
|
|
||||||
)
|
|
||||||
from pymongo.response import PinnedResponse
|
from pymongo.response import PinnedResponse
|
||||||
from pymongo.synchronous.cursor import _ConnectionManager
|
from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase
|
||||||
from pymongo.typings import _Address, _DocumentOut, _DocumentType
|
from pymongo.typings import _Address, _DocumentOut, _DocumentType
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@ -51,7 +43,7 @@ if TYPE_CHECKING:
|
|||||||
_IS_SYNC = True
|
_IS_SYNC = True
|
||||||
|
|
||||||
|
|
||||||
class CommandCursor(Generic[_DocumentType]):
|
class CommandCursor(_CursorBase[_DocumentType]):
|
||||||
"""A cursor / iterator over command cursors."""
|
"""A cursor / iterator over command cursors."""
|
||||||
|
|
||||||
_getmore_class = _GetMore
|
_getmore_class = _GetMore
|
||||||
@ -98,8 +90,8 @@ class CommandCursor(Generic[_DocumentType]):
|
|||||||
f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}"
|
f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def _get_namespace(self) -> str:
|
||||||
self._die_no_lock()
|
return self._ns
|
||||||
|
|
||||||
def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
|
def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
|
||||||
"""Limits the number of documents returned in one batch. Each batch
|
"""Limits the number of documents returned in one batch. Each batch
|
||||||
@ -161,94 +153,12 @@ class CommandCursor(Generic[_DocumentType]):
|
|||||||
) -> Sequence[_DocumentOut]:
|
) -> Sequence[_DocumentOut]:
|
||||||
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)
|
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)
|
||||||
|
|
||||||
@property
|
|
||||||
def alive(self) -> bool:
|
|
||||||
"""Does this cursor have the potential to return more data?
|
|
||||||
|
|
||||||
Even if :attr:`alive` is ``True``, :meth:`next` can raise
|
|
||||||
:exc:`StopIteration`. Best to use a for loop::
|
|
||||||
|
|
||||||
for doc in collection.aggregate(pipeline):
|
|
||||||
print(doc)
|
|
||||||
|
|
||||||
.. note:: :attr:`alive` can be True while iterating a cursor from
|
|
||||||
a failed server. In this case :attr:`alive` will return False after
|
|
||||||
:meth:`next` fails to retrieve the next batch of results from the
|
|
||||||
server.
|
|
||||||
"""
|
|
||||||
return bool(len(self._data) or (not self._killed))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor_id(self) -> int:
|
|
||||||
"""Returns the id of the cursor."""
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def address(self) -> Optional[_Address]:
|
|
||||||
"""The (host, port) of the server used, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.0
|
|
||||||
"""
|
|
||||||
return self._address
|
|
||||||
|
|
||||||
@property
|
|
||||||
def session(self) -> Optional[ClientSession]:
|
|
||||||
"""The cursor's :class:`~pymongo.client_session.ClientSession`, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.6
|
|
||||||
"""
|
|
||||||
if self._session and not self._session._implicit:
|
|
||||||
return self._session
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
|
|
||||||
already_killed = self._killed
|
|
||||||
self._killed = True
|
|
||||||
if self._id and not already_killed:
|
|
||||||
cursor_id = self._id
|
|
||||||
assert self._address is not None
|
|
||||||
address = _CursorAddress(self._address, self._ns)
|
|
||||||
else:
|
|
||||||
# Skip killCursors.
|
|
||||||
cursor_id = 0
|
|
||||||
address = None
|
|
||||||
return cursor_id, address
|
|
||||||
|
|
||||||
def _die_no_lock(self) -> None:
|
|
||||||
"""Closes this cursor without acquiring a lock."""
|
|
||||||
cursor_id, address = self._prepare_to_die()
|
|
||||||
self._collection.database.client._cleanup_cursor_no_lock(
|
|
||||||
cursor_id, address, self._sock_mgr, self._session
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
def _die_lock(self) -> None:
|
|
||||||
"""Closes this cursor."""
|
|
||||||
cursor_id, address = self._prepare_to_die()
|
|
||||||
self._collection.database.client._cleanup_cursor_lock(
|
|
||||||
cursor_id,
|
|
||||||
address,
|
|
||||||
self._sock_mgr,
|
|
||||||
self._session,
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
def _end_session(self) -> None:
|
def _end_session(self) -> None:
|
||||||
if self._session and self._session._implicit:
|
if self._session and self._session._implicit:
|
||||||
self._session._attached_to_cursor = False
|
self._session._attached_to_cursor = False
|
||||||
self._session._end_implicit_session()
|
self._session._end_implicit_session()
|
||||||
self._session = None
|
self._session = None
|
||||||
|
|
||||||
def close(self) -> None:
|
|
||||||
"""Explicitly close / kill this cursor."""
|
|
||||||
self._die_lock()
|
|
||||||
|
|
||||||
def _send_message(self, operation: _GetMore) -> None:
|
def _send_message(self, operation: _GetMore) -> None:
|
||||||
"""Send a getmore message and handle the response."""
|
"""Send a getmore message and handle the response."""
|
||||||
client = self._collection.database.client
|
client = self._collection.database.client
|
||||||
@ -330,6 +240,9 @@ class CommandCursor(Generic[_DocumentType]):
|
|||||||
def __iter__(self) -> Iterator[_DocumentType]:
|
def __iter__(self) -> Iterator[_DocumentType]:
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def __enter__(self) -> CommandCursor[_DocumentType]:
|
||||||
|
return self
|
||||||
|
|
||||||
def next(self) -> _DocumentType:
|
def next(self) -> _DocumentType:
|
||||||
"""Advance the cursor."""
|
"""Advance the cursor."""
|
||||||
# Block until a document is returnable.
|
# Block until a document is returnable.
|
||||||
@ -385,41 +298,6 @@ class CommandCursor(Generic[_DocumentType]):
|
|||||||
"""
|
"""
|
||||||
return self._try_next(get_more_allowed=True)
|
return self._try_next(get_more_allowed=True)
|
||||||
|
|
||||||
def __enter__(self) -> CommandCursor[_DocumentType]:
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
@_csot.apply
|
|
||||||
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
|
||||||
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.
|
|
||||||
|
|
||||||
To use::
|
|
||||||
|
|
||||||
>>> cursor.to_list()
|
|
||||||
|
|
||||||
Or, so read at most n items from the cursor::
|
|
||||||
|
|
||||||
>>> cursor.to_list(n)
|
|
||||||
|
|
||||||
If the cursor is empty or has no more results, an empty list will be returned.
|
|
||||||
|
|
||||||
.. versionadded:: 4.9
|
|
||||||
"""
|
|
||||||
res: list[_DocumentType] = []
|
|
||||||
remaining = length
|
|
||||||
if isinstance(length, int) and length < 1:
|
|
||||||
raise ValueError("to_list() length must be greater than 0")
|
|
||||||
while self.alive:
|
|
||||||
if not self._next_batch(res, remaining):
|
|
||||||
break
|
|
||||||
if length is not None:
|
|
||||||
remaining = length - len(res)
|
|
||||||
if remaining == 0:
|
|
||||||
break
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class RawBatchCommandCursor(CommandCursor[_DocumentType]):
|
class RawBatchCommandCursor(CommandCursor[_DocumentType]):
|
||||||
_getmore_class = _RawBatchGetMore
|
_getmore_class = _RawBatchGetMore
|
||||||
|
|||||||
@ -21,7 +21,6 @@ from collections import deque
|
|||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Generic,
|
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
Mapping,
|
Mapping,
|
||||||
@ -36,7 +35,7 @@ from typing import (
|
|||||||
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
|
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
|
||||||
from bson.code import Code
|
from bson.code import Code
|
||||||
from bson.son import SON
|
from bson.son import SON
|
||||||
from pymongo import _csot, helpers_shared
|
from pymongo import helpers_shared
|
||||||
from pymongo.collation import validate_collation_or_none
|
from pymongo.collation import validate_collation_or_none
|
||||||
from pymongo.common import (
|
from pymongo.common import (
|
||||||
validate_is_document_type,
|
validate_is_document_type,
|
||||||
@ -44,9 +43,7 @@ from pymongo.common import (
|
|||||||
)
|
)
|
||||||
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort
|
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS, _QUERY_OPTIONS, CursorType, _Hint, _Sort
|
||||||
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
|
||||||
from pymongo.lock import _create_lock
|
|
||||||
from pymongo.message import (
|
from pymongo.message import (
|
||||||
_CursorAddress,
|
|
||||||
_GetMore,
|
_GetMore,
|
||||||
_OpMsg,
|
_OpMsg,
|
||||||
_OpReply,
|
_OpReply,
|
||||||
@ -55,6 +52,7 @@ from pymongo.message import (
|
|||||||
_RawBatchQuery,
|
_RawBatchQuery,
|
||||||
)
|
)
|
||||||
from pymongo.response import PinnedResponse
|
from pymongo.response import PinnedResponse
|
||||||
|
from pymongo.synchronous.cursor_base import _ConnectionManager, _CursorBase
|
||||||
from pymongo.synchronous.helpers import next
|
from pymongo.synchronous.helpers import next
|
||||||
from pymongo.typings import _Address, _CollationIn, _DocumentOut, _DocumentType
|
from pymongo.typings import _Address, _CollationIn, _DocumentOut, _DocumentType
|
||||||
from pymongo.write_concern import validate_boolean
|
from pymongo.write_concern import validate_boolean
|
||||||
@ -66,30 +64,11 @@ if TYPE_CHECKING:
|
|||||||
from pymongo.read_preferences import _ServerMode
|
from pymongo.read_preferences import _ServerMode
|
||||||
from pymongo.synchronous.client_session import ClientSession
|
from pymongo.synchronous.client_session import ClientSession
|
||||||
from pymongo.synchronous.collection import Collection
|
from pymongo.synchronous.collection import Collection
|
||||||
from pymongo.synchronous.pool import Connection
|
|
||||||
|
|
||||||
_IS_SYNC = True
|
_IS_SYNC = True
|
||||||
|
|
||||||
|
|
||||||
class _ConnectionManager:
|
class Cursor(_CursorBase[_DocumentType]):
|
||||||
"""Used with exhaust cursors to ensure the connection is returned."""
|
|
||||||
|
|
||||||
def __init__(self, conn: Connection, more_to_come: bool):
|
|
||||||
self.conn: Optional[Connection] = conn
|
|
||||||
self.more_to_come = more_to_come
|
|
||||||
self._lock = _create_lock()
|
|
||||||
|
|
||||||
def update_exhaust(self, more_to_come: bool) -> None:
|
|
||||||
self.more_to_come = more_to_come
|
|
||||||
|
|
||||||
def close(self) -> None:
|
|
||||||
"""Return this instance's connection to the connection pool."""
|
|
||||||
if self.conn:
|
|
||||||
self.conn.unpin()
|
|
||||||
self.conn = None
|
|
||||||
|
|
||||||
|
|
||||||
class Cursor(Generic[_DocumentType]):
|
|
||||||
_query_class = _Query
|
_query_class = _Query
|
||||||
_getmore_class = _GetMore
|
_getmore_class = _GetMore
|
||||||
|
|
||||||
@ -266,8 +245,8 @@ class Cursor(Generic[_DocumentType]):
|
|||||||
"""The number of documents retrieved so far."""
|
"""The number of documents retrieved so far."""
|
||||||
return self._retrieved
|
return self._retrieved
|
||||||
|
|
||||||
def __del__(self) -> None:
|
def _get_namespace(self) -> str:
|
||||||
self._die_no_lock()
|
return f"{self._dbname}.{self._collname}"
|
||||||
|
|
||||||
def clone(self) -> Cursor[_DocumentType]:
|
def clone(self) -> Cursor[_DocumentType]:
|
||||||
"""Get a clone of this cursor.
|
"""Get a clone of this cursor.
|
||||||
@ -897,55 +876,6 @@ class Cursor(Generic[_DocumentType]):
|
|||||||
self._read_preference = self._collection._read_preference_for(self.session)
|
self._read_preference = self._collection._read_preference_for(self.session)
|
||||||
return self._read_preference
|
return self._read_preference
|
||||||
|
|
||||||
@property
|
|
||||||
def alive(self) -> bool:
|
|
||||||
"""Does this cursor have the potential to return more data?
|
|
||||||
|
|
||||||
This is mostly useful with `tailable cursors
|
|
||||||
<https://www.mongodb.com/docs/manual/core/tailable-cursors/>`_
|
|
||||||
since they will stop iterating even though they *may* return more
|
|
||||||
results in the future.
|
|
||||||
|
|
||||||
With regular cursors, simply use a for loop instead of :attr:`alive`::
|
|
||||||
|
|
||||||
for doc in collection.find():
|
|
||||||
print(doc)
|
|
||||||
|
|
||||||
.. note:: Even if :attr:`alive` is True, :meth:`next` can raise
|
|
||||||
:exc:`StopIteration`. :attr:`alive` can also be True while iterating
|
|
||||||
a cursor from a failed server. In this case :attr:`alive` will
|
|
||||||
return False after :meth:`next` fails to retrieve the next batch
|
|
||||||
of results from the server.
|
|
||||||
"""
|
|
||||||
return bool(len(self._data) or (not self._killed))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def cursor_id(self) -> Optional[int]:
|
|
||||||
"""Returns the id of the cursor
|
|
||||||
|
|
||||||
.. versionadded:: 2.2
|
|
||||||
"""
|
|
||||||
return self._id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def address(self) -> Optional[tuple[str, Any]]:
|
|
||||||
"""The (host, port) of the server used, or None.
|
|
||||||
|
|
||||||
.. versionchanged:: 3.0
|
|
||||||
Renamed from "conn_id".
|
|
||||||
"""
|
|
||||||
return self._address
|
|
||||||
|
|
||||||
@property
|
|
||||||
def session(self) -> Optional[ClientSession]:
|
|
||||||
"""The cursor's :class:`~pymongo.client_session.ClientSession`, or None.
|
|
||||||
|
|
||||||
.. versionadded:: 3.6
|
|
||||||
"""
|
|
||||||
if self._session and not self._session._implicit:
|
|
||||||
return self._session
|
|
||||||
return None
|
|
||||||
|
|
||||||
def __copy__(self) -> Cursor[_DocumentType]:
|
def __copy__(self) -> Cursor[_DocumentType]:
|
||||||
"""Support function for `copy.copy()`.
|
"""Support function for `copy.copy()`.
|
||||||
|
|
||||||
@ -1009,59 +939,6 @@ class Cursor(Generic[_DocumentType]):
|
|||||||
y[key] = value # type:ignore[index]
|
y[key] = value # type:ignore[index]
|
||||||
return y
|
return y
|
||||||
|
|
||||||
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
|
|
||||||
self._killed = True
|
|
||||||
if self._id and not already_killed:
|
|
||||||
cursor_id = self._id
|
|
||||||
assert self._address is not None
|
|
||||||
address = _CursorAddress(self._address, f"{self._dbname}.{self._collname}")
|
|
||||||
else:
|
|
||||||
# Skip killCursors.
|
|
||||||
cursor_id = 0
|
|
||||||
address = None
|
|
||||||
return cursor_id, address
|
|
||||||
|
|
||||||
def _die_no_lock(self) -> None:
|
|
||||||
"""Closes this cursor without acquiring a lock."""
|
|
||||||
try:
|
|
||||||
already_killed = self._killed
|
|
||||||
except AttributeError:
|
|
||||||
# ___init__ did not run to completion (or at all).
|
|
||||||
return
|
|
||||||
|
|
||||||
cursor_id, address = self._prepare_to_die(already_killed)
|
|
||||||
self._collection.database.client._cleanup_cursor_no_lock(
|
|
||||||
cursor_id, address, self._sock_mgr, self._session
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
def _die_lock(self) -> None:
|
|
||||||
"""Closes this cursor."""
|
|
||||||
try:
|
|
||||||
already_killed = self._killed
|
|
||||||
except AttributeError:
|
|
||||||
# ___init__ did not run to completion (or at all).
|
|
||||||
return
|
|
||||||
|
|
||||||
cursor_id, address = self._prepare_to_die(already_killed)
|
|
||||||
self._collection.database.client._cleanup_cursor_lock(
|
|
||||||
cursor_id,
|
|
||||||
address,
|
|
||||||
self._sock_mgr,
|
|
||||||
self._session,
|
|
||||||
)
|
|
||||||
if self._session and self._session._implicit:
|
|
||||||
self._session._attached_to_cursor = False
|
|
||||||
self._session = None
|
|
||||||
self._sock_mgr = None
|
|
||||||
|
|
||||||
def close(self) -> None:
|
|
||||||
"""Explicitly close / kill this cursor."""
|
|
||||||
self._die_lock()
|
|
||||||
|
|
||||||
def distinct(self, key: str) -> list[Any]:
|
def distinct(self, key: str) -> list[Any]:
|
||||||
"""Get a list of distinct values for `key` among all documents
|
"""Get a list of distinct values for `key` among all documents
|
||||||
in the result set of this query.
|
in the result set of this query.
|
||||||
@ -1294,40 +1171,8 @@ class Cursor(Generic[_DocumentType]):
|
|||||||
def __enter__(self) -> Cursor[_DocumentType]:
|
def __enter__(self) -> Cursor[_DocumentType]:
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
@_csot.apply
|
class RawBatchCursor(Cursor[_DocumentType]):
|
||||||
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
|
||||||
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.
|
|
||||||
|
|
||||||
To use::
|
|
||||||
|
|
||||||
>>> cursor.to_list()
|
|
||||||
|
|
||||||
Or, to read at most n items from the cursor::
|
|
||||||
|
|
||||||
>>> cursor.to_list(n)
|
|
||||||
|
|
||||||
If the cursor is empty or has no more results, an empty list will be returned.
|
|
||||||
|
|
||||||
.. versionadded:: 4.9
|
|
||||||
"""
|
|
||||||
res: list[_DocumentType] = []
|
|
||||||
remaining = length
|
|
||||||
if isinstance(length, int) and length < 1:
|
|
||||||
raise ValueError("to_list() length must be greater than 0")
|
|
||||||
while self.alive:
|
|
||||||
if not self._next_batch(res, remaining):
|
|
||||||
break
|
|
||||||
if length is not None:
|
|
||||||
remaining = length - len(res)
|
|
||||||
if remaining == 0:
|
|
||||||
break
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class RawBatchCursor(Cursor, Generic[_DocumentType]): # type: ignore[type-arg]
|
|
||||||
"""A cursor / iterator over raw batches of BSON data from a query result."""
|
"""A cursor / iterator over raw batches of BSON data from a query result."""
|
||||||
|
|
||||||
_query_class = _RawBatchQuery
|
_query_class = _RawBatchQuery
|
||||||
|
|||||||
122
pymongo/synchronous/cursor_base.py
Normal file
122
pymongo/synchronous/cursor_base.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
# Copyright 2026-present MongoDB, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you
|
||||||
|
# may not use this file except in compliance with the License. You
|
||||||
|
# may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied. See the License for the specific language governing
|
||||||
|
# permissions and limitations under the License.
|
||||||
|
|
||||||
|
"""Synchronous cursor base extending the shared agnostic cursor base."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import abstractmethod
|
||||||
|
from typing import TYPE_CHECKING, Any, Optional
|
||||||
|
|
||||||
|
from pymongo import _csot
|
||||||
|
from pymongo.cursor_shared import _AgnosticCursorBase
|
||||||
|
from pymongo.lock import _create_lock
|
||||||
|
from pymongo.typings import _DocumentType
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pymongo.synchronous.client_session import ClientSession
|
||||||
|
from pymongo.synchronous.pool import Connection
|
||||||
|
|
||||||
|
_IS_SYNC = True
|
||||||
|
|
||||||
|
|
||||||
|
class _ConnectionManager:
|
||||||
|
"""Used with exhaust cursors to ensure the connection is returned."""
|
||||||
|
|
||||||
|
def __init__(self, conn: Connection, more_to_come: bool):
|
||||||
|
self.conn: Optional[Connection] = conn
|
||||||
|
self.more_to_come = more_to_come
|
||||||
|
self._lock = _create_lock()
|
||||||
|
|
||||||
|
def update_exhaust(self, more_to_come: bool) -> None:
|
||||||
|
self.more_to_come = more_to_come
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""Return this instance's connection to the connection pool."""
|
||||||
|
if self.conn:
|
||||||
|
self.conn.unpin()
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
|
|
||||||
|
class _CursorBase(_AgnosticCursorBase[_DocumentType]):
|
||||||
|
"""Synchronous cursor base class."""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> Optional[ClientSession]:
|
||||||
|
"""The cursor's :class:`~pymongo.client_session.ClientSession`, or None.
|
||||||
|
|
||||||
|
.. versionadded:: 3.6
|
||||||
|
"""
|
||||||
|
if self._session and not self._session._implicit:
|
||||||
|
return self._session
|
||||||
|
return None
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg]
|
||||||
|
...
|
||||||
|
|
||||||
|
def _die_lock(self) -> None:
|
||||||
|
"""Closes this cursor."""
|
||||||
|
try:
|
||||||
|
already_killed = self._killed
|
||||||
|
except AttributeError:
|
||||||
|
# ___init__ did not run to completion (or at all).
|
||||||
|
return
|
||||||
|
|
||||||
|
cursor_id, address = self._prepare_to_die(already_killed)
|
||||||
|
self._collection.database.client._cleanup_cursor_lock(
|
||||||
|
cursor_id,
|
||||||
|
address,
|
||||||
|
self._sock_mgr,
|
||||||
|
self._session,
|
||||||
|
)
|
||||||
|
if self._session and self._session._implicit:
|
||||||
|
self._session._attached_to_cursor = False
|
||||||
|
self._session = None
|
||||||
|
self._sock_mgr = None
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""Explicitly close / kill this cursor."""
|
||||||
|
self._die_lock()
|
||||||
|
|
||||||
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
@_csot.apply
|
||||||
|
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
|
||||||
|
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.
|
||||||
|
|
||||||
|
To use::
|
||||||
|
|
||||||
|
>>> cursor.to_list()
|
||||||
|
|
||||||
|
Or, to read at most n items from the cursor::
|
||||||
|
|
||||||
|
>>> cursor.to_list(n)
|
||||||
|
|
||||||
|
If the cursor is empty or has no more results, an empty list will be returned.
|
||||||
|
|
||||||
|
.. versionadded:: 4.9
|
||||||
|
"""
|
||||||
|
res: list[_DocumentType] = []
|
||||||
|
remaining = length
|
||||||
|
if isinstance(length, int) and length < 1:
|
||||||
|
raise ValueError("to_list() length must be greater than 0")
|
||||||
|
while self.alive:
|
||||||
|
if not self._next_batch(res, remaining):
|
||||||
|
break
|
||||||
|
if length is not None:
|
||||||
|
remaining = length - len(res)
|
||||||
|
if remaining == 0:
|
||||||
|
break
|
||||||
|
return res
|
||||||
@ -141,7 +141,7 @@ if TYPE_CHECKING:
|
|||||||
from pymongo.server_selectors import Selection
|
from pymongo.server_selectors import Selection
|
||||||
from pymongo.synchronous.bulk import _Bulk
|
from pymongo.synchronous.bulk import _Bulk
|
||||||
from pymongo.synchronous.client_session import ClientSession, _ServerSession
|
from pymongo.synchronous.client_session import ClientSession, _ServerSession
|
||||||
from pymongo.synchronous.cursor import _ConnectionManager
|
from pymongo.synchronous.cursor_base import _ConnectionManager
|
||||||
from pymongo.synchronous.encryption import _Encrypter
|
from pymongo.synchronous.encryption import _Encrypter
|
||||||
from pymongo.synchronous.pool import Connection
|
from pymongo.synchronous.pool import Connection
|
||||||
from pymongo.synchronous.server import Server
|
from pymongo.synchronous.server import Server
|
||||||
|
|||||||
@ -30,6 +30,7 @@ from unasync import Rule, unasync_files # type: ignore[import-not-found]
|
|||||||
replacements = {
|
replacements = {
|
||||||
"AsyncCollection": "Collection",
|
"AsyncCollection": "Collection",
|
||||||
"AsyncDatabase": "Database",
|
"AsyncDatabase": "Database",
|
||||||
|
"_AsyncCursorBase": "_CursorBase",
|
||||||
"AsyncCursor": "Cursor",
|
"AsyncCursor": "Cursor",
|
||||||
"AsyncMongoClient": "MongoClient",
|
"AsyncMongoClient": "MongoClient",
|
||||||
"AsyncCommandCursor": "CommandCursor",
|
"AsyncCommandCursor": "CommandCursor",
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user