PYTHON-3932 Support mypy --strict testing in bson package (#1362)

This commit is contained in:
Steven Silvester 2023-08-29 13:49:11 -05:00 committed by GitHub
parent 3e1a4ab56e
commit 83ab612aa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 222 additions and 147 deletions

View File

@ -53,6 +53,8 @@ bytes [#bytes]_ binary both
.. [#bytes] The bytes type is encoded as BSON binary with
subtype 0. It will be decoded back to bytes.
"""
from __future__ import annotations
import datetime
import itertools
import os
@ -69,10 +71,8 @@ from typing import (
Any,
BinaryIO,
Callable,
Dict,
Generator,
Iterator,
List,
Mapping,
MutableMapping,
NoReturn,
@ -109,7 +109,6 @@ from bson.datetime_ms import (
DatetimeMS,
_datetime_to_millis,
_millis_to_datetime,
utc,
)
from bson.dbref import DBRef
from bson.decimal128 import Decimal128
@ -121,9 +120,11 @@ from bson.objectid import ObjectId
from bson.regex import Regex
from bson.son import RE_TYPE, SON
from bson.timestamp import Timestamp
from bson.tz_util import utc
# Import some modules for type-checking only.
if TYPE_CHECKING:
from bson.raw_bson import RawBSONDocument
from bson.typings import _DocumentType, _ReadableBuffer
try:
@ -249,7 +250,7 @@ def _get_int(
return _UNPACK_INT_FROM(data, position)[0], position + 4
def _get_c_string(data: Any, view: Any, position: int, opts: CodecOptions) -> Tuple[str, int]:
def _get_c_string(data: Any, view: Any, position: int, opts: CodecOptions[Any]) -> Tuple[str, int]:
"""Decode a BSON 'C' string to python str."""
end = data.index(b"\x00", position)
return _utf_8_decode(view[position:end], opts.unicode_decode_error_handler, True)[0], end + 1
@ -263,7 +264,7 @@ def _get_float(
def _get_string(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, dummy: Any
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], dummy: Any
) -> Tuple[str, int]:
"""Decode a BSON string to python str."""
length = _UNPACK_INT_FROM(data, position)[0]
@ -294,7 +295,7 @@ def _get_object_size(data: Any, position: int, obj_end: int) -> Tuple[int, int]:
def _get_object(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, dummy: Any
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], dummy: Any
) -> Tuple[Any, int]:
"""Decode a BSON subdocument to opts.document_class or bson.dbref.DBRef."""
obj_size, end = _get_object_size(data, position, obj_end)
@ -315,7 +316,7 @@ def _get_object(
def _get_array(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, element_name: str
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], element_name: str
) -> Tuple[Any, int]:
"""Decode a BSON array to python list."""
size = _UNPACK_INT_FROM(data, position)[0]
@ -325,7 +326,7 @@ def _get_array(
position += 4
end -= 1
result: List[Any] = []
result: list[Any] = []
# Avoid doing global and attribute lookups in the loop.
append = result.append
@ -357,7 +358,7 @@ def _get_array(
def _get_binary(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, dummy1: Any
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], dummy1: Any
) -> Tuple[Union[Binary, uuid.UUID], int]:
"""Decode a BSON binary to bson.binary.Binary or python UUID."""
length, subtype = _UNPACK_LENGTH_SUBTYPE_FROM(data, position)
@ -415,14 +416,14 @@ def _get_boolean(
def _get_date(
data: Any, view: Any, position: int, dummy0: int, opts: CodecOptions, dummy1: Any
data: Any, view: Any, position: int, dummy0: int, opts: CodecOptions[Any], dummy1: Any
) -> Tuple[Union[datetime.datetime, DatetimeMS], int]:
"""Decode a BSON datetime to python datetime.datetime."""
return _millis_to_datetime(_UNPACK_LONG_FROM(data, position)[0], opts), position + 8
def _get_code(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, element_name: str
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], element_name: str
) -> Tuple[Code, int]:
"""Decode a BSON code to bson.code.Code."""
code, position = _get_string(data, view, position, obj_end, opts, element_name)
@ -430,7 +431,7 @@ def _get_code(
def _get_code_w_scope(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, element_name: str
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], element_name: str
) -> Tuple[Code, int]:
"""Decode a BSON code_w_scope to bson.code.Code."""
code_end = position + _UNPACK_INT_FROM(data, position)[0]
@ -442,8 +443,8 @@ def _get_code_w_scope(
def _get_regex(
data: Any, view: Any, position: int, dummy0: Any, opts: CodecOptions, dummy1: Any
) -> Tuple[Regex, int]:
data: Any, view: Any, position: int, dummy0: Any, opts: CodecOptions[Any], dummy1: Any
) -> Tuple[Regex[Any], int]:
"""Decode a BSON regex to bson.regex.Regex or a python pattern object."""
pattern, position = _get_c_string(data, view, position, opts)
bson_flags, position = _get_c_string(data, view, position, opts)
@ -452,7 +453,7 @@ def _get_regex(
def _get_ref(
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions, element_name: str
data: Any, view: Any, position: int, obj_end: int, opts: CodecOptions[Any], element_name: str
) -> Tuple[DBRef, int]:
"""Decode (deprecated) BSON DBPointer to bson.dbref.DBRef."""
collection, position = _get_string(data, view, position, obj_end, opts, element_name)
@ -489,7 +490,7 @@ def _get_decimal128(
# - position: int, beginning of object in 'data' to decode
# - obj_end: int, end of object to decode in 'data' if variable-length type
# - opts: a CodecOptions
_ELEMENT_GETTER: Dict[int, Callable[..., Tuple[Any, int]]] = {
_ELEMENT_GETTER: dict[int, Callable[..., Tuple[Any, int]]] = {
ord(BSONNUM): _get_float,
ord(BSONSTR): _get_string,
ord(BSONOBJ): _get_object,
@ -521,10 +522,13 @@ if _USE_C:
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
opts: CodecOptions[Any],
raw_array: bool = False,
) -> Any:
return _cbson._element_to_dict(data, position, obj_end, opts, raw_array)
) -> Tuple[str, Any, int]:
return cast(
"Tuple[str, Any, int]",
_cbson._element_to_dict(data, position, obj_end, opts, raw_array),
)
else:
@ -533,9 +537,9 @@ else:
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
opts: CodecOptions[Any],
raw_array: bool = False,
) -> Any:
) -> Tuple[str, Any, int]:
"""Decode a single key, value pair."""
element_type = data[position]
position += 1
@ -558,14 +562,21 @@ else:
return element_name, value, position
_T = TypeVar("_T", bound=MutableMapping[Any, Any])
_T = TypeVar("_T", bound=MutableMapping[str, Any])
def _raw_to_dict(
data: Any, position: int, obj_end: int, opts: CodecOptions, result: _T, raw_array: bool = False
data: Any,
position: int,
obj_end: int,
opts: CodecOptions[RawBSONDocument],
result: _T,
raw_array: bool = False,
) -> _T:
data, view = get_data_and_view(data)
return _elements_to_dict(data, view, position, obj_end, opts, result, raw_array=raw_array)
return cast(
_T, _elements_to_dict(data, view, position, obj_end, opts, result, raw_array=raw_array)
)
def _elements_to_dict(
@ -573,7 +584,7 @@ def _elements_to_dict(
view: Any,
position: int,
obj_end: int,
opts: CodecOptions,
opts: CodecOptions[Any],
result: Any = None,
raw_array: bool = False,
) -> Any:
@ -591,14 +602,14 @@ def _elements_to_dict(
return result
def _bson_to_dict(data: Any, opts: CodecOptions) -> Any:
def _bson_to_dict(data: Any, opts: CodecOptions[_DocumentType]) -> _DocumentType:
"""Decode a BSON string to document_class."""
data, view = get_data_and_view(data)
try:
if _raw_document_class(opts.document_class):
return opts.document_class(data, opts)
return opts.document_class(data, opts) # type:ignore[call-arg]
_, end = _get_object_size(data, 0, len(data))
return _elements_to_dict(data, view, 4, end, opts)
return cast("_DocumentType", _elements_to_dict(data, view, 4, end, opts))
except InvalidBSON:
raise
except Exception:
@ -679,15 +690,15 @@ def _encode_bytes(name: bytes, value: bytes, dummy0: Any, dummy1: Any) -> bytes:
return b"\x05" + name + _PACK_INT(len(value)) + b"\x00" + value
def _encode_mapping(name: bytes, value: Any, check_keys: bool, opts: CodecOptions) -> bytes:
def _encode_mapping(name: bytes, value: Any, check_keys: bool, opts: CodecOptions[Any]) -> bytes:
"""Encode a mapping type."""
if _raw_document_class(value):
return b"\x03" + name + value.raw
return b"\x03" + name + cast(bytes, value.raw)
data = b"".join([_element_to_bson(key, val, check_keys, opts) for key, val in value.items()])
return b"\x03" + name + _PACK_INT(len(data) + 5) + data + b"\x00"
def _encode_dbref(name: bytes, value: DBRef, check_keys: bool, opts: CodecOptions) -> bytes:
def _encode_dbref(name: bytes, value: DBRef, check_keys: bool, opts: CodecOptions[Any]) -> bytes:
"""Encode bson.dbref.DBRef."""
buf = bytearray(b"\x03" + name + b"\x00\x00\x00\x00")
begin = len(buf) - 4
@ -704,7 +715,9 @@ def _encode_dbref(name: bytes, value: DBRef, check_keys: bool, opts: CodecOption
return bytes(buf)
def _encode_list(name: bytes, value: Sequence[Any], check_keys: bool, opts: CodecOptions) -> bytes:
def _encode_list(
name: bytes, value: Sequence[Any], check_keys: bool, opts: CodecOptions[Any]
) -> bytes:
"""Encode a list/tuple."""
lname = gen_list_name()
data = b"".join([_name_value_to_bson(next(lname), item, check_keys, opts) for item in value])
@ -725,7 +738,7 @@ def _encode_binary(name: bytes, value: Binary, dummy0: Any, dummy1: Any) -> byte
return b"\x05" + name + _PACK_LENGTH_SUBTYPE(len(value), subtype) + value
def _encode_uuid(name: bytes, value: uuid.UUID, dummy: Any, opts: CodecOptions) -> bytes:
def _encode_uuid(name: bytes, value: uuid.UUID, dummy: Any, opts: CodecOptions[Any]) -> bytes:
"""Encode uuid.UUID."""
uuid_representation = opts.uuid_representation
binval = Binary.from_uuid(value, uuid_representation=uuid_representation)
@ -759,7 +772,7 @@ def _encode_none(name: bytes, dummy0: Any, dummy1: Any, dummy2: Any) -> bytes:
return b"\x0A" + name
def _encode_regex(name: bytes, value: Regex, dummy0: Any, dummy1: Any) -> bytes:
def _encode_regex(name: bytes, value: Regex[Any], dummy0: Any, dummy1: Any) -> bytes:
"""Encode a python regex or bson.regex.Regex."""
flags = value.flags
# Python 3 common case
@ -785,7 +798,7 @@ def _encode_regex(name: bytes, value: Regex, dummy0: Any, dummy1: Any) -> bytes:
return b"\x0B" + name + _make_c_string_check(value.pattern) + sflags
def _encode_code(name: bytes, value: Code, dummy: Any, opts: CodecOptions) -> bytes:
def _encode_code(name: bytes, value: Code, dummy: Any, opts: CodecOptions[Any]) -> bytes:
"""Encode bson.code.Code."""
cstring = _make_c_string(value)
cstrlen = len(cstring)
@ -890,7 +903,7 @@ def _name_value_to_bson(
name: bytes,
value: Any,
check_keys: bool,
opts: CodecOptions,
opts: CodecOptions[Any],
in_custom_call: bool = False,
in_fallback_call: bool = False,
) -> bytes:
@ -954,7 +967,7 @@ def _name_value_to_bson(
raise InvalidDocument(f"cannot encode object: {value!r}, of type: {type(value)!r}")
def _element_to_bson(key: Any, value: Any, check_keys: bool, opts: CodecOptions) -> bytes:
def _element_to_bson(key: Any, value: Any, check_keys: bool, opts: CodecOptions[Any]) -> bytes:
"""Encode a single key, value pair."""
if not isinstance(key, str):
raise InvalidDocument(f"documents must have only string keys, key was {key!r}")
@ -968,7 +981,9 @@ def _element_to_bson(key: Any, value: Any, check_keys: bool, opts: CodecOptions)
return _name_value_to_bson(name, value, check_keys, opts)
def _dict_to_bson(doc: Any, check_keys: bool, opts: CodecOptions, top_level: bool = True) -> bytes:
def _dict_to_bson(
doc: Any, check_keys: bool, opts: CodecOptions[Any], top_level: bool = True
) -> bytes:
"""Encode a document to BSON."""
if _raw_document_class(doc):
return cast(bytes, doc.raw)
@ -996,7 +1011,7 @@ _CODEC_OPTIONS_TYPE_ERROR = TypeError("codec_options must be an instance of Code
def encode(
document: Mapping[str, Any],
check_keys: bool = False,
codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS,
codec_options: CodecOptions[Any] = DEFAULT_CODEC_OPTIONS,
) -> bytes:
"""Encode a document to BSON.
@ -1024,20 +1039,18 @@ def encode(
@overload
def decode(data: "_ReadableBuffer", codec_options: None = None) -> Dict[str, Any]:
def decode(data: _ReadableBuffer, codec_options: None = None) -> dict[str, Any]:
...
@overload
def decode(
data: "_ReadableBuffer", codec_options: "CodecOptions[_DocumentType]"
) -> "_DocumentType":
def decode(data: _ReadableBuffer, codec_options: CodecOptions[_DocumentType]) -> _DocumentType:
...
def decode(
data: "_ReadableBuffer", codec_options: "Optional[CodecOptions[_DocumentType]]" = None
) -> Union[Dict[str, Any], "_DocumentType"]:
data: _ReadableBuffer, codec_options: Optional[CodecOptions[_DocumentType]] = None
) -> Union[dict[str, Any], _DocumentType]:
"""Decode BSON to a document.
By default, returns a BSON document represented as a Python
@ -1063,20 +1076,18 @@ def decode(
.. versionadded:: 3.9
"""
opts: CodecOptions = codec_options or DEFAULT_CODEC_OPTIONS
opts: CodecOptions[Any] = codec_options or DEFAULT_CODEC_OPTIONS
if not isinstance(opts, CodecOptions):
raise _CODEC_OPTIONS_TYPE_ERROR
return _bson_to_dict(data, opts)
return cast("Union[dict[str, Any], _DocumentType]", _bson_to_dict(data, opts))
def _decode_all(
data: "_ReadableBuffer", opts: "CodecOptions[_DocumentType]"
) -> "List[_DocumentType]":
def _decode_all(data: _ReadableBuffer, opts: CodecOptions[_DocumentType]) -> list[_DocumentType]:
"""Decode a BSON data to multiple documents."""
data, view = get_data_and_view(data)
data_len = len(data)
docs: "List[_DocumentType]" = []
docs: list[_DocumentType] = []
position = 0
end = data_len - 1
use_raw = _raw_document_class(opts.document_class)
@ -1107,20 +1118,20 @@ if _USE_C:
@overload
def decode_all(data: "_ReadableBuffer", codec_options: None = None) -> "List[Dict[str, Any]]":
def decode_all(data: _ReadableBuffer, codec_options: None = None) -> list[dict[str, Any]]:
...
@overload
def decode_all(
data: "_ReadableBuffer", codec_options: "CodecOptions[_DocumentType]"
) -> "List[_DocumentType]":
data: _ReadableBuffer, codec_options: CodecOptions[_DocumentType]
) -> list[_DocumentType]:
...
def decode_all(
data: "_ReadableBuffer", codec_options: "Optional[CodecOptions[_DocumentType]]" = None
) -> "Union[List[Dict[str, Any]], List[_DocumentType]]":
data: _ReadableBuffer, codec_options: Optional[CodecOptions[_DocumentType]] = None
) -> Union[list[dict[str, Any]], list[_DocumentType]]:
"""Decode BSON data to multiple documents.
`data` must be a bytes-like object implementing the buffer protocol that
@ -1152,22 +1163,26 @@ def decode_all(
return _decode_all(data, codec_options)
def _decode_selective(rawdoc: Any, fields: Any, codec_options: Any) -> Mapping[Any, Any]:
def _decode_selective(
rawdoc: Any, fields: Any, codec_options: CodecOptions[_DocumentType]
) -> _DocumentType:
if _raw_document_class(codec_options.document_class):
# If document_class is RawBSONDocument, use vanilla dictionary for
# decoding command response.
doc = {}
doc: _DocumentType = {} # type:ignore[assignment]
else:
# Else, use the specified document_class.
doc = codec_options.document_class()
for key, value in rawdoc.items():
if key in fields:
if fields[key] == 1:
doc[key] = _bson_to_dict(rawdoc.raw, codec_options)[key]
doc[key] = _bson_to_dict(rawdoc.raw, codec_options)[key] # type:ignore[index]
else:
doc[key] = _decode_selective(value, fields[key], codec_options)
doc[key] = _decode_selective( # type:ignore[index]
value, fields[key], codec_options
)
else:
doc[key] = value
doc[key] = value # type:ignore[index]
return doc
@ -1176,7 +1191,7 @@ def _array_of_documents_to_buffer(view: memoryview) -> bytes:
position = 0
_, end = _get_object_size(view, position, len(view))
position += 4
buffers: List[memoryview] = []
buffers: list[memoryview] = []
append = buffers.append
while position < end - 1:
# Just skip the keys.
@ -1211,7 +1226,9 @@ def _convert_raw_document_lists_to_streams(document: Any) -> None:
cursor[key] = []
def _decode_all_selective(data: Any, codec_options: CodecOptions, fields: Any) -> List[Any]:
def _decode_all_selective(
data: Any, codec_options: CodecOptions[_DocumentType], fields: Any
) -> list[_DocumentType]:
"""Decode BSON data to a single document while using user-provided
custom decoding logic.
@ -1257,20 +1274,18 @@ def _decode_all_selective(data: Any, codec_options: CodecOptions, fields: Any) -
@overload
def decode_iter(data: bytes, codec_options: None = None) -> "Iterator[Dict[str, Any]]":
def decode_iter(data: bytes, codec_options: None = None) -> Iterator[dict[str, Any]]:
...
@overload
def decode_iter(
data: bytes, codec_options: "CodecOptions[_DocumentType]"
) -> "Iterator[_DocumentType]":
def decode_iter(data: bytes, codec_options: CodecOptions[_DocumentType]) -> Iterator[_DocumentType]:
...
def decode_iter(
data: bytes, codec_options: "Optional[CodecOptions[_DocumentType]]" = None
) -> "Union[Iterator[Dict[str, Any]], Iterator[_DocumentType]]":
data: bytes, codec_options: Optional[CodecOptions[_DocumentType]] = None
) -> Union[Iterator[dict[str, Any]], Iterator[_DocumentType]]:
"""Decode BSON data to multiple documents as a generator.
Works similarly to the decode_all function, but yields one document at a
@ -1301,26 +1316,27 @@ def decode_iter(
elements = data[position : position + obj_size]
position += obj_size
yield _bson_to_dict(elements, opts)
yield _bson_to_dict(elements, opts) # type:ignore[misc, type-var]
@overload
def decode_file_iter(
file_obj: Union[BinaryIO, IO], codec_options: None = None
) -> "Iterator[Dict[str, Any]]":
file_obj: Union[BinaryIO, IO[bytes]], codec_options: None = None
) -> Iterator[dict[str, Any]]:
...
@overload
def decode_file_iter(
file_obj: Union[BinaryIO, IO], codec_options: "CodecOptions[_DocumentType]"
) -> "Iterator[_DocumentType]":
file_obj: Union[BinaryIO, IO[bytes]], codec_options: CodecOptions[_DocumentType]
) -> Iterator[_DocumentType]:
...
def decode_file_iter(
file_obj: Union[BinaryIO, IO], codec_options: "Optional[CodecOptions[_DocumentType]]" = None
) -> "Union[Iterator[Dict[str, Any]], Iterator[_DocumentType]]":
file_obj: Union[BinaryIO, IO[bytes]],
codec_options: Optional[CodecOptions[_DocumentType]] = None,
) -> Union[Iterator[dict[str, Any]], Iterator[_DocumentType]]:
"""Decode bson data from a file to multiple documents as a generator.
Works similarly to the decode_all function, but reads from the file object
@ -1340,14 +1356,14 @@ def decode_file_iter(
opts = codec_options or DEFAULT_CODEC_OPTIONS
while True:
# Read size of next object.
size_data = file_obj.read(4)
size_data: Any = file_obj.read(4)
if not size_data:
break # Finished with file normally.
elif len(size_data) != 4:
raise InvalidBSON("cut off in middle of objsize")
obj_size = _UNPACK_INT_FROM(size_data, 0)[0] - 4
elements = size_data + file_obj.read(max(0, obj_size))
yield _bson_to_dict(elements, opts)
yield _bson_to_dict(elements, opts) # type:ignore[type-var, arg-type, misc]
def is_valid(bson: bytes) -> bool:
@ -1380,11 +1396,11 @@ class BSON(bytes):
@classmethod
def encode(
cls: Type["BSON"],
cls: Type[BSON],
document: Mapping[str, Any],
check_keys: bool = False,
codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS,
) -> "BSON":
codec_options: CodecOptions[Any] = DEFAULT_CODEC_OPTIONS,
) -> BSON:
"""Encode a document to a new :class:`BSON` instance.
A document can be any mapping type (like :class:`dict`).
@ -1407,7 +1423,9 @@ class BSON(bytes):
"""
return cls(encode(document, check_keys, codec_options))
def decode(self, codec_options: "CodecOptions[_DocumentType]" = DEFAULT_CODEC_OPTIONS) -> "_DocumentType": # type: ignore[override,assignment]
def decode( # type:ignore[override]
self, codec_options: CodecOptions[Any] = DEFAULT_CODEC_OPTIONS
) -> dict[str, Any]:
"""Decode this BSON data.
By default, returns a BSON document represented as a Python

View File

@ -15,6 +15,8 @@
"""Setstate and getstate functions for objects with __slots__, allowing
compatibility with default pickling protocol
"""
from __future__ import annotations
from typing import Any, Mapping

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Tuple, Type, Union
from uuid import UUID
@ -225,10 +226,10 @@ class Binary(bytes):
__subtype: int
def __new__(
cls: Type["Binary"],
data: Union[memoryview, bytes, "_mmap", "_array"],
cls: Type[Binary],
data: Union[memoryview, bytes, _mmap, _array[Any]],
subtype: int = BINARY_SUBTYPE,
) -> "Binary":
) -> Binary:
if not isinstance(subtype, int):
raise TypeError("subtype must be an instance of int")
if subtype >= 256 or subtype < 0:
@ -240,7 +241,7 @@ class Binary(bytes):
@classmethod
def from_uuid(
cls: Type["Binary"], uuid: UUID, uuid_representation: int = UuidRepresentation.STANDARD
cls: Type[Binary], uuid: UUID, uuid_representation: int = UuidRepresentation.STANDARD
) -> "Binary":
"""Create a BSON Binary object from a Python UUID.

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for representing JavaScript code in BSON."""
from __future__ import annotations
from collections.abc import Mapping as _Mapping
from typing import Any, Mapping, Optional, Type, Union
@ -50,11 +51,11 @@ class Code(str):
__scope: Union[Mapping[str, Any], None]
def __new__(
cls: Type["Code"],
code: Union[str, "Code"],
cls: Type[Code],
code: Union[str, Code],
scope: Optional[Mapping[str, Any]] = None,
**kwargs: Any,
) -> "Code":
) -> Code:
if not isinstance(code, str):
raise TypeError("code must be an instance of str")

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for specifying BSON codec options."""
from __future__ import annotations
import abc
import datetime
@ -22,7 +23,6 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Iterable,
Mapping,
@ -140,8 +140,8 @@ class TypeRegistry:
) -> None:
self.__type_codecs = list(type_codecs or [])
self._fallback_encoder = fallback_encoder
self._encoder_map: Dict[Any, Any] = {}
self._decoder_map: Dict[Any, Any] = {}
self._encoder_map: dict[Any, Any] = {}
self._decoder_map: dict[Any, Any] = {}
if self._fallback_encoder is not None:
if not callable(fallback_encoder):
@ -236,7 +236,7 @@ class _BaseCodecOptions(NamedTuple):
if TYPE_CHECKING:
class CodecOptions(Tuple, Generic[_DocumentType]):
class CodecOptions(Tuple[_DocumentType], Generic[_DocumentType]):
document_class: Type[_DocumentType]
tz_aware: bool
uuid_representation: int
@ -246,7 +246,7 @@ if TYPE_CHECKING:
datetime_conversion: Optional[int]
def __new__(
cls: Type["CodecOptions"],
cls: Type[CodecOptions[_DocumentType]],
document_class: Optional[Type[_DocumentType]] = ...,
tz_aware: bool = ...,
uuid_representation: Optional[int] = ...,
@ -254,28 +254,28 @@ if TYPE_CHECKING:
tzinfo: Optional[datetime.tzinfo] = ...,
type_registry: Optional[TypeRegistry] = ...,
datetime_conversion: Optional[int] = ...,
) -> "CodecOptions[_DocumentType]":
) -> CodecOptions[_DocumentType]:
...
# CodecOptions API
def with_options(self, **kwargs: Any) -> "CodecOptions[_DocumentType]":
def with_options(self, **kwargs: Any) -> CodecOptions[Any]:
...
def _arguments_repr(self) -> str:
...
def _options_dict(self) -> Dict[Any, Any]:
def _options_dict(self) -> dict[Any, Any]:
...
# NamedTuple API
@classmethod
def _make(cls, obj: Iterable) -> "CodecOptions[_DocumentType]":
def _make(cls, obj: Iterable[Any]) -> CodecOptions[_DocumentType]:
...
def _asdict(self) -> Dict[str, Any]:
def _asdict(self) -> dict[str, Any]:
...
def _replace(self, **kwargs: Any) -> "CodecOptions[_DocumentType]":
def _replace(self, **kwargs: Any) -> CodecOptions[_DocumentType]:
...
_source: str
@ -372,7 +372,7 @@ else:
super().__init__()
def __new__(
cls: Type["CodecOptions"],
cls: Type[CodecOptions],
document_class: Optional[Type[Mapping[str, Any]]] = None,
tz_aware: bool = False,
uuid_representation: Optional[int] = UuidRepresentation.UNSPECIFIED,
@ -380,7 +380,7 @@ else:
tzinfo: Optional[datetime.tzinfo] = None,
type_registry: Optional[TypeRegistry] = None,
datetime_conversion: Optional[DatetimeConversion] = DatetimeConversion.DATETIME,
) -> "CodecOptions":
) -> CodecOptions:
doc_class = document_class or dict
# issubclass can raise TypeError for generic aliases like SON[str, Any].
# In that case we can use the base class for the comparison.
@ -452,7 +452,7 @@ else:
)
)
def _options_dict(self) -> Dict[str, Any]:
def _options_dict(self) -> dict[str, Any]:
"""Dictionary of the arguments used to create this object."""
# TODO: PYTHON-2442 use _asdict() instead
return {
@ -468,7 +468,7 @@ else:
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._arguments_repr()})"
def with_options(self, **kwargs: Any) -> "CodecOptions":
def with_options(self, **kwargs: Any) -> CodecOptions:
"""Make a copy of this CodecOptions, overriding some options::
>>> from bson.codec_options import DEFAULT_CODEC_OPTIONS
@ -485,10 +485,10 @@ else:
return CodecOptions(**opts)
DEFAULT_CODEC_OPTIONS: "CodecOptions[Dict[str, Any]]" = CodecOptions()
DEFAULT_CODEC_OPTIONS: CodecOptions[dict[str, Any]] = CodecOptions()
def _parse_codec_options(options: Any) -> CodecOptions:
def _parse_codec_options(options: Any) -> CodecOptions[Any]:
"""Parse BSON codec options."""
kwargs = {}
for k in set(options) & {

View File

@ -16,6 +16,7 @@
.. versionadded:: 4.3
"""
from __future__ import annotations
import calendar
import datetime
@ -98,7 +99,9 @@ class DatetimeMS:
_type_marker = 9
def as_datetime(self, codec_options: CodecOptions = DEFAULT_CODEC_OPTIONS) -> datetime.datetime:
def as_datetime(
self, codec_options: CodecOptions[Any] = DEFAULT_CODEC_OPTIONS
) -> datetime.datetime:
"""Create a Python :class:`~datetime.datetime` from this DatetimeMS object.
:Parameters:
@ -126,7 +129,9 @@ def _max_datetime_ms(tz: datetime.timezone = datetime.timezone.utc) -> int:
return _datetime_to_millis(datetime.datetime.max.replace(tzinfo=tz))
def _millis_to_datetime(millis: int, opts: CodecOptions) -> Union[datetime.datetime, DatetimeMS]:
def _millis_to_datetime(
millis: int, opts: CodecOptions[Any]
) -> Union[datetime.datetime, DatetimeMS]:
"""Convert milliseconds since epoch UTC to datetime."""
if (
opts.datetime_conversion == DatetimeConversion.DATETIME

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for manipulating DBRefs (references to MongoDB documents)."""
from __future__ import annotations
from copy import deepcopy
from typing import Any, Mapping, Optional
@ -123,7 +124,7 @@ class DBRef:
(self.__collection, self.__id, self.__database, tuple(sorted(self.__kwargs.items())))
)
def __deepcopy__(self, memo: Any) -> "DBRef":
def __deepcopy__(self, memo: Any) -> DBRef:
"""Support function for `copy.deepcopy()`."""
return DBRef(
deepcopy(self.__collection, memo),

View File

@ -16,6 +16,7 @@
.. versionadded:: 3.4
"""
from __future__ import annotations
import decimal
import struct
@ -270,7 +271,7 @@ class Decimal128:
return ctx.create_decimal((sign, digits, exponent))
@classmethod
def from_bid(cls: Type["Decimal128"], value: bytes) -> "Decimal128":
def from_bid(cls: Type[Decimal128], value: bytes) -> Decimal128:
"""Create an instance of :class:`Decimal128` from Binary Integer
Decimal string.

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""A BSON wrapper for long (int in python3)"""
from __future__ import annotations
from typing import Any

View File

@ -99,6 +99,7 @@ but it will be faster as there is less recursion.
`libbson <https://github.com/mongodb/libbson>`_. `python-bsonjs` works best
with PyMongo when using :class:`~bson.raw_bson.RawBSONDocument`.
"""
from __future__ import annotations
import base64
import datetime
@ -106,7 +107,18 @@ import json
import math
import re
import uuid
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple, Type, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
from bson.binary import ALL_UUID_SUBTYPES, UUID_SUBTYPE, Binary, UuidRepresentation
from bson.code import Code
@ -218,11 +230,18 @@ class JSONMode:
"""
class JSONOptions(CodecOptions):
if TYPE_CHECKING:
_BASE_CLASS = CodecOptions[MutableMapping[str, Any]]
else:
_BASE_CLASS = CodecOptions
class JSONOptions(_BASE_CLASS):
json_mode: int
strict_number_long: bool
datetime_representation: int
strict_uuid: bool
document_class: Type[MutableMapping[str, Any]]
def __init__(self, *args: Any, **kwargs: Any):
"""Encapsulates JSON options for :func:`dumps` and :func:`loads`.
@ -282,14 +301,14 @@ class JSONOptions(CodecOptions):
super().__init__()
def __new__(
cls: Type["JSONOptions"],
cls: Type[JSONOptions],
strict_number_long: Optional[bool] = None,
datetime_representation: Optional[int] = None,
strict_uuid: Optional[bool] = None,
json_mode: int = JSONMode.RELAXED,
*args: Any,
**kwargs: Any,
) -> "JSONOptions":
) -> JSONOptions:
kwargs["tz_aware"] = kwargs.get("tz_aware", False)
if kwargs["tz_aware"]:
kwargs["tzinfo"] = kwargs.get("tzinfo", utc)
@ -303,7 +322,7 @@ class JSONOptions(CodecOptions):
"JSONOptions.datetime_representation must be one of LEGACY, "
"NUMBERLONG, or ISO8601 from DatetimeRepresentation."
)
self = cast(JSONOptions, super().__new__(cls, *args, **kwargs))
self = cast(JSONOptions, super().__new__(cls, *args, **kwargs)) # type:ignore[arg-type]
if json_mode not in (JSONMode.LEGACY, JSONMode.RELAXED, JSONMode.CANONICAL):
raise ValueError(
"JSONOptions.json_mode must be one of LEGACY, RELAXED, "
@ -361,7 +380,7 @@ class JSONOptions(CodecOptions):
)
)
def _options_dict(self) -> Dict[Any, Any]:
def _options_dict(self) -> dict[Any, Any]:
# TODO: PYTHON-2442 use _asdict() instead
options_dict = super()._options_dict()
options_dict.update(
@ -374,7 +393,7 @@ class JSONOptions(CodecOptions):
)
return options_dict
def with_options(self, **kwargs: Any) -> "JSONOptions":
def with_options(self, **kwargs: Any) -> JSONOptions:
"""
Make a copy of this JSONOptions, overriding some options::
@ -501,7 +520,7 @@ def _json_convert(obj: Any, json_options: JSONOptions = DEFAULT_JSON_OPTIONS) ->
def object_pairs_hook(
pairs: Sequence[Tuple[str, Any]], json_options: JSONOptions = DEFAULT_JSON_OPTIONS
) -> Any:
return object_hook(json_options.document_class(pairs), json_options)
return object_hook(json_options.document_class(pairs), json_options) # type:ignore[call-arg]
def object_hook(dct: Mapping[str, Any], json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any:
@ -685,7 +704,7 @@ def _parse_canonical_datetime(
if json_options.datetime_conversion == DatetimeConversion.DATETIME_MS:
return DatetimeMS(aware_tzinfo_none)
return aware_tzinfo_none
return _millis_to_datetime(int(dtm), json_options)
return _millis_to_datetime(int(dtm), cast("CodecOptions[Any]", json_options))
def _parse_canonical_oid(doc: Any) -> ObjectId:
@ -711,7 +730,7 @@ def _parse_canonical_code(doc: Any) -> Code:
return Code(doc["$code"], scope=doc.get("$scope"))
def _parse_canonical_regex(doc: Any) -> Regex:
def _parse_canonical_regex(doc: Any) -> Regex[str]:
"""Decode a JSON regex to bson.regex.Regex."""
regex = doc["$regularExpression"]
if len(doc) != 1:

View File

@ -13,6 +13,8 @@
# limitations under the License.
"""Representation for the MongoDB internal MaxKey type."""
from __future__ import annotations
from typing import Any

View File

@ -13,6 +13,8 @@
# limitations under the License.
"""Representation for the MongoDB internal MinKey type."""
from __future__ import annotations
from typing import Any

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for working with MongoDB ObjectIds."""
from __future__ import annotations
import binascii
import calendar

View File

@ -50,8 +50,9 @@ For use cases like moving documents across different databases or writing binary
blobs to disk, using raw BSON documents provides better speed and avoids the
overhead of decoding or encoding BSON.
"""
from __future__ import annotations
from typing import Any, Dict, ItemsView, Iterator, Mapping, Optional
from typing import Any, ItemsView, Iterator, Mapping, MutableMapping, Optional
from bson import _get_object_size, _raw_to_dict
from bson.codec_options import _RAW_BSON_DOCUMENT_MARKER
@ -61,8 +62,8 @@ from bson.son import SON
def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions, raw_array: bool = False
) -> Dict[Any, Any]:
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument], raw_array: bool = False
) -> MutableMapping[str, Any]:
"""Inflates the top level fields of a BSON document.
:Parameters:
@ -87,8 +88,11 @@ class RawBSONDocument(Mapping[str, Any]):
__slots__ = ("__raw", "__inflated_doc", "__codec_options")
_type_marker = _RAW_BSON_DOCUMENT_MARKER
__codec_options: CodecOptions[RawBSONDocument]
def __init__(self, bson_bytes: bytes, codec_options: Optional[CodecOptions] = None) -> None:
def __init__(
self, bson_bytes: bytes, codec_options: Optional[CodecOptions[RawBSONDocument]] = None
) -> None:
"""Create a new :class:`RawBSONDocument`
:class:`RawBSONDocument` is a representation of a BSON document that
@ -156,7 +160,9 @@ class RawBSONDocument(Mapping[str, Any]):
return self.__inflated_doc
@staticmethod
def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]:
def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument]
) -> Mapping[str, Any]:
return _inflate_bson(bson_bytes, codec_options)
def __getitem__(self, item: str) -> Any:
@ -185,12 +191,18 @@ class _RawArrayBSONDocument(RawBSONDocument):
"""A RawBSONDocument that only expands sub-documents and arrays when accessed."""
@staticmethod
def _inflate_bson(bson_bytes: bytes, codec_options: CodecOptions) -> Mapping[Any, Any]:
def _inflate_bson(
bson_bytes: bytes, codec_options: CodecOptions[RawBSONDocument]
) -> Mapping[str, Any]:
return _inflate_bson(bson_bytes, codec_options, raw_array=True)
DEFAULT_RAW_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=RawBSONDocument)
_RAW_ARRAY_BSON_OPTIONS: CodecOptions = DEFAULT.with_options(document_class=_RawArrayBSONDocument)
DEFAULT_RAW_BSON_OPTIONS: CodecOptions[RawBSONDocument] = DEFAULT.with_options(
document_class=RawBSONDocument
)
_RAW_ARRAY_BSON_OPTIONS: CodecOptions[_RawArrayBSONDocument] = DEFAULT.with_options(
document_class=_RawArrayBSONDocument
)
"""The default :class:`~bson.codec_options.CodecOptions` for
:class:`RawBSONDocument`.
"""

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for representing MongoDB regular expressions."""
from __future__ import annotations
import re
from typing import Any, Generic, Pattern, Type, TypeVar, Union
@ -53,7 +54,7 @@ class Regex(Generic[_T]):
_type_marker = 11
@classmethod
def from_native(cls: Type["Regex"], regex: "Pattern[_T]") -> "Regex[_T]":
def from_native(cls: Type[Regex[Any]], regex: Pattern[_T]) -> Regex[_T]:
"""Convert a Python regular expression into a ``Regex`` instance.
Note that in Python 3, a regular expression compiled from a
@ -118,7 +119,7 @@ class Regex(Generic[_T]):
def __repr__(self) -> str:
return f"Regex({self.pattern!r}, {self.flags!r})"
def try_compile(self) -> "Pattern[_T]":
def try_compile(self) -> Pattern[_T]:
"""Compile this :class:`Regex` as a Python regular expression.
.. warning::

View File

@ -18,6 +18,7 @@ Regular dictionaries can be used instead of SON objects, but not when the order
of keys is important. A SON object can be used just like a normal Python
dictionary.
"""
from __future__ import annotations
import copy
import re
@ -27,7 +28,6 @@ from typing import (
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Pattern,
@ -35,6 +35,7 @@ from typing import (
Type,
TypeVar,
Union,
cast,
)
# This sort of sucks, but seems to be as good as it gets...
@ -54,7 +55,7 @@ class SON(Dict[_Key, _Value]):
similar to collections.OrderedDict.
"""
__keys: List[Any]
__keys: list[Any]
def __init__(
self,
@ -66,7 +67,7 @@ class SON(Dict[_Key, _Value]):
self.update(data)
self.update(kwargs)
def __new__(cls: Type["SON[_Key, _Value]"], *args: Any, **kwargs: Any) -> "SON[_Key, _Value]":
def __new__(cls: Type[SON[_Key, _Value]], *args: Any, **kwargs: Any) -> SON[_Key, _Value]:
instance = super().__new__(cls, *args, **kwargs) # type: ignore[type-var]
instance.__keys = []
return instance
@ -86,7 +87,7 @@ class SON(Dict[_Key, _Value]):
self.__keys.remove(key)
dict.__delitem__(self, key)
def copy(self) -> "SON[_Key, _Value]":
def copy(self) -> SON[_Key, _Value]:
other: SON[_Key, _Value] = SON()
other.update(self)
return other
@ -108,7 +109,7 @@ class SON(Dict[_Key, _Value]):
for _, v in self.items():
yield v
def values(self) -> List[_Value]: # type: ignore[override]
def values(self) -> list[_Value]: # type: ignore[override]
return [v for _, v in self.items()]
def clear(self) -> None:
@ -170,7 +171,7 @@ class SON(Dict[_Key, _Value]):
"""
if isinstance(other, SON):
return len(self) == len(other) and list(self.items()) == list(other.items())
return self.to_dict() == other
return cast(bool, self.to_dict() == other)
def __ne__(self, other: Any) -> bool:
return not self == other
@ -178,7 +179,7 @@ class SON(Dict[_Key, _Value]):
def __len__(self) -> int:
return len(self.__keys)
def to_dict(self) -> Dict[_Key, _Value]:
def to_dict(self) -> dict[_Key, _Value]:
"""Convert a SON document to a normal Python dictionary instance.
This is trickier than just *dict(...)* because it needs to be
@ -193,9 +194,9 @@ class SON(Dict[_Key, _Value]):
else:
return value
return transform_value(dict(self))
return cast("dict[_Key, _Value]", transform_value(dict(self)))
def __deepcopy__(self, memo: Dict[int, "SON[_Key, _Value]"]) -> "SON[_Key, _Value]":
def __deepcopy__(self, memo: dict[int, SON[_Key, _Value]]) -> SON[_Key, _Value]:
out: SON[_Key, _Value] = SON()
val_id = id(self)
if val_id in memo:

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tools for representing MongoDB internal Timestamps."""
from __future__ import annotations
import calendar
import datetime

View File

@ -13,6 +13,8 @@
# limitations under the License.
"""Type aliases used by bson"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, TypeVar, Union
if TYPE_CHECKING:

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Timezone related utilities for BSON."""
from __future__ import annotations
from datetime import datetime, timedelta, tzinfo
from typing import Optional, Tuple, Union

View File

@ -23,7 +23,6 @@ from copy import deepcopy
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterator,
Mapping,
@ -388,7 +387,7 @@ class _Encrypter:
def encrypt(
self, database: str, cmd: Mapping[str, Any], codec_options: CodecOptions
) -> Dict[Any, Any]:
) -> MutableMapping[Any, Any]:
"""Encrypt a MongoDB command.
:Parameters:

View File

@ -1127,7 +1127,7 @@ class _EncryptedBulkWriteContext(_BulkWriteContext):
def __batch_command(
self, cmd: MutableMapping[str, Any], docs: List[Mapping[str, Any]]
) -> Tuple[Dict[str, Any], List[Mapping[str, Any]]]:
) -> Tuple[MutableMapping[str, Any], List[Mapping[str, Any]]]:
namespace = self.db_name + ".$cmd"
msg, to_send = _encode_batched_write_command(
namespace, self.op_type, cmd, docs, self.codec, self
@ -1137,8 +1137,8 @@ class _EncryptedBulkWriteContext(_BulkWriteContext):
# Chop off the OP_QUERY header to get a properly batched write command.
cmd_start = msg.index(b"\x00", 4) + 9
cmd = _inflate_bson(memoryview(msg)[cmd_start:], DEFAULT_RAW_BSON_OPTIONS)
return cmd, to_send
outgoing = _inflate_bson(memoryview(msg)[cmd_start:], DEFAULT_RAW_BSON_OPTIONS)
return outgoing, to_send
def execute(
self, cmd: MutableMapping[str, Any], docs: List[Mapping[str, Any]], client: MongoClient

View File

@ -28,6 +28,7 @@ from typing import (
Optional,
Sequence,
Union,
cast,
)
from bson import _decode_all_selective
@ -223,7 +224,9 @@ def command(
if client and client._encrypter and reply:
decrypted = client._encrypter.decrypt(reply.raw_command_response())
response_doc = _decode_all_selective(decrypted, codec_options, user_fields)[0]
response_doc = cast(
"_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0]
)
return response_doc # type: ignore[return-value]

View File

@ -33,7 +33,7 @@ GRIDFS_IGNORE = [
"WriteConcern",
]
PYMONGO_IGNORE = []
GLOBAL_INGORE = ["TYPE_CHECKING"]
GLOBAL_INGORE = ["TYPE_CHECKING", "annotations"]
class TestDefaultExports(unittest.TestCase):

View File

@ -86,7 +86,8 @@ deps =
certifi; platform_system == "win32" or platform_system == "Darwin"
typing_extensions
commands =
mypy --install-types --non-interactive --disallow-untyped-defs bson gridfs tools pymongo
mypy --install-types --non-interactive --strict bson
mypy --install-types --non-interactive --disallow-untyped-defs gridfs tools pymongo
mypy --install-types --non-interactive --disable-error-code var-annotated --disable-error-code attr-defined --disable-error-code union-attr --disable-error-code assignment --disable-error-code no-redef --disable-error-code index --allow-redefinition --allow-untyped-globals --exclude "test/mypy_fails/*.*" --exclude "test/conftest.py" test
mypy --install-types --non-interactive test/test_typing.py test/test_typing_strict.py