PYTHON-3780 add types to cursor.py (#1290)

This commit is contained in:
Iris 2023-07-13 12:40:30 -07:00 committed by GitHub
parent dbb196fdfc
commit c33b9d6b4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 34 deletions

View File

@ -903,6 +903,6 @@ class GridOutCursor(Cursor):
def remove_option(self, *args: Any, **kwargs: Any) -> NoReturn:
raise NotImplementedError("Method does not exist for GridOutCursor")
def _clone_base(self, session: ClientSession) -> "GridOutCursor":
def _clone_base(self, session: Optional[ClientSession]) -> "GridOutCursor":
"""Creates an empty GridOutCursor for information to be copied into."""
return GridOutCursor(self.__root_collection, session=session)

View File

@ -57,6 +57,17 @@ from pymongo.message import (
from pymongo.response import PinnedResponse
from pymongo.typings import _CollationIn, _DocumentType
if TYPE_CHECKING:
from _typeshed import SupportsItems
from bson.codec_options import CodecOptions
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.message import _OpMsg, _OpReply
from pymongo.pool import SocketInfo
from pymongo.read_preferences import _ServerMode
# These errors mean that the server has already killed the cursor so there is
# no need to send killCursors.
_CURSOR_CLOSED_ERRORS = frozenset(
@ -131,19 +142,17 @@ class CursorType:
class _SocketManager:
"""Used with exhaust cursors to ensure the socket is returned."""
def __init__(self, sock, more_to_come):
self.sock = sock
def __init__(self, sock: SocketInfo, more_to_come: bool):
self.sock: Optional[SocketInfo] = sock
self.more_to_come = more_to_come
self.closed = False
self.lock = _create_lock()
def update_exhaust(self, more_to_come):
def update_exhaust(self, more_to_come: bool) -> None:
self.more_to_come = more_to_come
def close(self):
def close(self) -> None:
"""Return this instance's socket to the connection pool."""
if not self.closed:
self.closed = True
if self.sock:
self.sock.unpin()
self.sock = None
@ -152,12 +161,6 @@ _Sort = Sequence[Union[str, Tuple[str, Union[int, str, Mapping[str, Any]]]]]
_Hint = Union[str, _Sort]
if TYPE_CHECKING:
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.read_preferences import _ServerMode
class Cursor(Generic[_DocumentType]):
"""A cursor / iterator over Mongo query results."""
@ -358,7 +361,7 @@ class Cursor(Generic[_DocumentType]):
"""
return self._clone(True)
def _clone(self, deepcopy=True, base=None):
def _clone(self, deepcopy: bool = True, base: Optional[Cursor] = None) -> Cursor:
"""Internal clone helper."""
if not base:
if self.__explicit_session:
@ -401,11 +404,11 @@ class Cursor(Generic[_DocumentType]):
base.__dict__.update(data)
return base
def _clone_base(self, session):
def _clone_base(self, session: Optional[ClientSession]) -> Cursor:
"""Creates an empty Cursor object for information to be copied into."""
return self.__class__(self.__collection, session=session)
def __die(self, synchronous=False):
def __die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
try:
already_killed = self.__killed
@ -437,7 +440,7 @@ class Cursor(Generic[_DocumentType]):
"""Explicitly close / kill this cursor."""
self.__die(True)
def __query_spec(self):
def __query_spec(self) -> Mapping[str, Any]:
"""Get the spec to use for a query."""
operators: Dict[str, Any] = {}
if self.__ordering:
@ -494,7 +497,7 @@ class Cursor(Generic[_DocumentType]):
return self.__spec
def __check_okay_to_chain(self):
def __check_okay_to_chain(self) -> None:
"""Check if it is okay to chain more options onto this cursor."""
if self.__retrieved or self.__id is not None:
raise InvalidOperation("cannot set options after executing query")
@ -684,7 +687,7 @@ class Cursor(Generic[_DocumentType]):
def __getitem__(self, index: slice) -> "Cursor[_DocumentType]":
...
def __getitem__(self, index):
def __getitem__(self, index: Union[int, slice]) -> Union[_DocumentType, Cursor[_DocumentType]]:
"""Get a single document or a slice of documents from this cursor.
.. warning:: A :class:`~Cursor` is not a Python :class:`list`. Each
@ -928,7 +931,7 @@ class Cursor(Generic[_DocumentType]):
c.__limit = -abs(c.__limit)
return next(c)
def __set_hint(self, index):
def __set_hint(self, index: Optional[_Hint]) -> None:
if index is None:
self.__hint = None
return
@ -1039,7 +1042,7 @@ class Cursor(Generic[_DocumentType]):
self.__collation = validate_collation_or_none(collation)
return self
def __send_message(self, operation):
def __send_message(self, operation: Union[_Query, _GetMore]) -> None:
"""Send a query or getmore operation and handles the response.
If operation is ``None`` this is an exhaust cursor, which reads
@ -1120,17 +1123,22 @@ class Cursor(Generic[_DocumentType]):
self.close()
def _unpack_response(
self, response, cursor_id, codec_options, user_fields=None, legacy_response=False
):
self,
response: Union[_OpReply, _OpMsg],
cursor_id: Optional[int],
codec_options: CodecOptions,
user_fields: Optional[Mapping[str, Any]] = None,
legacy_response: bool = False,
) -> List[Mapping[str, Any]]:
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)
def _read_preference(self):
def _read_preference(self) -> _ServerMode:
if self.__read_preference is None:
# Save the read preference for getMore commands.
self.__read_preference = self.__collection._read_preference_for(self.session)
return self.__read_preference
def _refresh(self):
def _refresh(self) -> int:
"""Refreshes the cursor with more data from Mongo.
Returns the length of self.__data after refresh. Will exit early if
@ -1277,23 +1285,35 @@ class Cursor(Generic[_DocumentType]):
"""
return self._clone(deepcopy=True)
def _deepcopy(self, x, memo=None):
@overload
def _deepcopy(self, x: Iterable, memo: Optional[Dict[int, Union[List, Dict]]] = None) -> List:
...
@overload
def _deepcopy(
self, x: SupportsItems, memo: Optional[Dict[int, Union[List, Dict]]] = None
) -> Dict:
...
def _deepcopy(
self, x: Union[Iterable, SupportsItems], memo: Optional[Dict[int, Union[List, Dict]]] = None
) -> Union[List, Dict]:
"""Deepcopy helper for the data dictionary or list.
Regular expressions cannot be deep copied but as they are immutable we
don't have to copy them when cloning.
"""
y: Any
y: Union[List, Dict]
iterator: Iterable[Tuple[Any, Any]]
if not hasattr(x, "items"):
y, is_list, iterator = [], True, enumerate(x)
else:
y, is_list, iterator = {}, False, x.items()
y, is_list, iterator = {}, False, cast("SupportsItems", x).items()
if memo is None:
memo = {}
val_id = id(x)
if val_id in memo:
return memo.get(val_id)
return memo[val_id]
memo[val_id] = y
for key, value in iterator:
@ -1303,7 +1323,7 @@ class Cursor(Generic[_DocumentType]):
value = copy.deepcopy(value, memo)
if is_list:
y.append(value)
y.append(value) # type: ignore[union-attr]
else:
if not isinstance(key, RE_TYPE):
key = copy.deepcopy(key, memo)
@ -1329,8 +1349,13 @@ class RawBatchCursor(Cursor, Generic[_DocumentType]):
super().__init__(collection, *args, **kwargs)
def _unpack_response(
self, response, cursor_id, codec_options, user_fields=None, legacy_response=False
):
self,
response: Union[_OpReply, _OpMsg],
cursor_id: Optional[int],
codec_options: CodecOptions[Mapping[str, Any]],
user_fields: Optional[Mapping[str, Any]] = None,
legacy_response: bool = False,
) -> List[Mapping[str, Any]]:
raw_response = response.raw_response(cursor_id, user_fields=user_fields)
if not legacy_response:
# OP_MSG returns firstBatch/nextBatch documents as a BSON array