MOTOR-331 Add python 3 type annotations (#221)

This commit is contained in:
Noah Stapp 2023-08-18 11:57:56 -07:00 committed by GitHub
parent 9fdb62a350
commit 70136ad1f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1456 additions and 12 deletions

View File

@ -65,3 +65,25 @@ jobs:
tox -e py3-sphinx-docs
tox -e py3-sphinx-doctest
tox -e py3-sphinx-linkcheck
typing:
name: Typing Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.11']
fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
cache-dependency-path: 'setup.py'
- name: Install dependencies
run: |
python -m pip install -U pip tox
- name: Run mypy
run: |
tox -e typecheck-mypy

View File

@ -20,14 +20,14 @@ repos:
rev: 22.3.0
hooks:
- id: black
files: \.py$
files: \.(py|pyi)$
args: [--line-length=100]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
files: \.py$
files: \.(py|pyi)$
args: [--profile=black]
- repo: https://github.com/pycqa/flake8

View File

@ -1,6 +1,7 @@
include README.rst
include LICENSE
include tox.ini
include mypy.ini
include doc/Makefile
include doc/examples/tornado_change_stream_templates/index.html
recursive-include doc *.rst
@ -14,6 +15,8 @@ recursive-include doc *.js
recursive-include doc *.txt
recursive-include doc *.bat
recursive-include synchro *.py
recursive-include motor *.pyi
recursive-include motor *.typed
exclude .readthedocs.yaml
exclude .git-blame-ignore-revs

View File

@ -20,7 +20,7 @@ from ._version import get_version_string, version, version_tuple # noqa
try:
import tornado
import tornado # type: ignore
except ImportError:
tornado = None
else:

View File

@ -544,7 +544,7 @@ class AgnosticDatabase(AgnosticBaseProperties):
drop_collection = AsyncCommand().unwrap("MotorCollection")
get_collection = DelegateMethod().wrap(Collection)
list_collection_names = AsyncRead(doc=docstrings.list_collection_names_doc)
list_collections = AsyncRead()
list_collections = AsyncRead().wrap(CommandCursor)
name = ReadOnlyProperty()
validate_collection = AsyncRead().unwrap("MotorCollection")
with_options = DelegateMethod().wrap(Database)
@ -853,6 +853,12 @@ class AgnosticDatabase(AgnosticBaseProperties):
return klass(self, obj.name, _delegate=obj)
elif obj.__class__ is Database:
return self.__class__(self._client, obj.name, _delegate=obj)
elif obj.__class__ is CommandCursor:
command_cursor_class = create_class_with_framework(
AgnosticCommandCursor, self._framework, self.__module__
)
return command_cursor_class(obj, self)
else:
return obj

837
motor/core.pyi Normal file
View File

@ -0,0 +1,837 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Framework-agnostic type stubs for Motor, an asynchronous driver for MongoDB."""
from __future__ import annotations
from typing import (
Any,
Awaitable,
Callable,
Collection,
Coroutine,
Dict,
FrozenSet,
Iterable,
List,
Mapping,
MutableMapping,
NoReturn,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
overload,
)
import pymongo.common
import pymongo.database
import pymongo.errors
import pymongo.mongo_client
from bson import Binary, Code, CodecOptions, DBRef, Timestamp
from bson.raw_bson import RawBSONDocument
from pymongo import IndexModel, ReadPreference, WriteConcern
from pymongo.change_stream import ChangeStream
from pymongo.client_options import ClientOptions
from pymongo.client_session import _T, ClientSession, SessionOptions, TransactionOptions
from pymongo.collection import ReturnDocument, _IndexKeyHint, _IndexList, _WriteOp
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
from pymongo.cursor import Cursor, RawBatchCursor, _Hint, _Sort
from pymongo.database import Database
from pymongo.encryption import ClientEncryption, RewrapManyDataKeyResult
from pymongo.encryption_options import RangeOpts
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import _ServerMode
from pymongo.results import (
BulkWriteResult,
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
from pymongo.topology_description import TopologyDescription
from pymongo.typings import (
_Address,
_CollationIn,
_DocumentType,
_DocumentTypeArg,
_Pipeline,
)
try:
from pymongo import SearchIndexModel
except ImportError:
SearchIndexModel = Any
_WITH_TRANSACTION_RETRY_TIME_LIMIT: int
_CodecDocumentType = TypeVar("_CodecDocumentType", bound=Mapping[str, Any])
def _within_time_limit(start_time: float) -> bool: ...
def _max_time_expired_error(exc: Exception) -> bool: ...
class AgnosticBase(object):
delegate: Any
def __eq__(self, other: Any) -> bool: ...
def __init__(self, delegate: Any) -> None: ...
def __repr__(self) -> str: ...
class AgnosticBaseProperties(AgnosticBase):
codec_options: CodecOptions
read_preference: _ServerMode
read_concern: ReadConcern
write_concern: WriteConcern
class AgnosticClient(AgnosticBaseProperties):
__motor_class_name__: str
__delegate_class__: Type[pymongo.MongoClient]
def address(self) -> Optional[Tuple[str, int]]: ...
def arbiters(self) -> Set[Tuple[str, int]]: ...
def close(self) -> None: ...
def __hash__(self) -> int: ...
async def drop_database(
self,
name_or_database: Union[str, AgnosticDatabase],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
) -> None: ...
def options(self) -> ClientOptions: ...
def get_database(
self,
name: Optional[str] = None,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
) -> Database[_DocumentType]: ...
def get_default_database(
self,
default: Optional[str] = None,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
) -> Database[_DocumentType]: ...
HOST: str
def is_mongos(self) -> bool: ...
def is_primary(self) -> bool: ...
async def list_databases(
self,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> AgnosticCommandCursor: ...
async def list_database_names(
self,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
) -> List[str]: ...
def nodes(self) -> FrozenSet[_Address]: ...
PORT: int
def primary(self) -> Optional[Tuple[str, int]]: ...
read_concern: ReadConcern
def secondaries(self) -> Set[Tuple[str, int]]: ...
async def server_info(
self, session: Optional[AgnosticClientSession] = None
) -> Dict[str, Any]: ...
def topology_description(self) -> TopologyDescription: ...
async def start_session(
self,
causal_consistency: Optional[bool] = None,
default_transaction_options: Optional[TransactionOptions] = None,
snapshot: Optional[bool] = False,
) -> AgnosticClientSession: ...
_io_loop: Optional[Any]
_framework: Any
def __init__(self, *args: Any, **kwargs: Any) -> None: ...
@property
def io_loop(self) -> Any: ...
def get_io_loop(self) -> Any: ...
def watch(
self,
pipeline: Optional[_Pipeline] = None,
full_document: Optional[str] = None,
resume_after: Optional[Mapping[str, Any]] = None,
max_await_time_ms: Optional[int] = None,
batch_size: Optional[int] = None,
collation: Optional[_CollationIn] = None,
start_at_operation_time: Optional[Timestamp] = None,
session: Optional[AgnosticClientSession] = None,
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[str] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> AgnosticChangeStream: ...
def __getattr__(self, name: str) -> AgnosticDatabase: ...
def __getitem__(self, name: str) -> AgnosticDatabase: ...
def wrap(self, obj: Any) -> Any: ...
class _MotorTransactionContext:
_session: AgnosticClientSession
def __init__(self, session: AgnosticClientSession): ...
async def __aenter__(self) -> _MotorTransactionContext: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
class AgnosticClientSession(AgnosticBase):
__motor_class_name__: str
__delegate_class__: Type[ClientSession]
async def commit_transaction(self) -> None: ...
async def abort_transaction(self) -> None: ...
async def end_session(self) -> None: ...
def cluster_time(self) -> Optional[Mapping[str, Any]]: ...
def has_ended(self) -> bool: ...
def in_transaction(self) -> bool: ...
def options(self) -> SessionOptions: ...
def operation_time(self) -> Optional[Timestamp]: ...
def session_id(self) -> Mapping[str, Any]: ...
def advance_cluster_time(self, cluster_time: Mapping[str, Any]) -> None: ...
def advance_operation_time(self, operation_time: Timestamp) -> None: ...
def __init__(self, delegate: ClientSession, motor_client: AgnosticClient): ...
def get_io_loop(self) -> Any: ...
async def with_transaction(
self,
coro: Callable[..., Coroutine[Any, Any, Any]],
read_concern: Optional[ReadConcern] = None,
write_concern: Optional[WriteConcern] = None,
read_preference: Optional[_ServerMode] = None,
max_commit_time_ms: Optional[int] = None,
) -> _T: ...
def start_transaction(
self,
read_concern: Optional[ReadConcern] = None,
write_concern: Optional[WriteConcern] = None,
read_preference: Optional[_ServerMode] = None,
max_commit_time_ms: Optional[int] = None,
) -> _MotorTransactionContext: ...
@property
def client(self) -> AgnosticClient: ...
async def __aenter__(self) -> AgnosticClientSession: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def __enter__(self) -> None: ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
class AgnosticDatabase(AgnosticBaseProperties):
__motor_class_name__: str
__delegate_class__: Type[Database]
def __hash__(self) -> int: ...
def __bool__(self) -> int: ...
async def cursor_command(
self,
command: Union[str, MutableMapping[str, Any]],
value: Any = 1,
read_preference: Optional[_ServerMode] = None,
codec_options: Optional[CodecOptions[_CodecDocumentType]] = None,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
max_await_time_ms: Optional[int] = None,
**kwargs: Any,
) -> AgnosticCommandCursor: ...
async def command(
self,
command: Union[str, MutableMapping[str, Any]],
value: Any = 1,
check: bool = True,
allowable_errors: Optional[Sequence[Union[str, int]]] = None,
read_preference: Optional[_ServerMode] = None,
codec_options: Optional[CodecOptions[_CodecDocumentType]] = None,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> Union[Dict[str, Any], _CodecDocumentType]: ...
async def create_collection(
self,
name: str,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
session: Optional[AgnosticClientSession] = None,
check_exists: Optional[bool] = True,
**kwargs: Any,
) -> AgnosticCollection: ...
async def dereference(
self,
dbref: DBRef,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> Optional[_DocumentType]: ...
async def drop_collection(
self,
name_or_collection: Union[str, AgnosticCollection],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
encrypted_fields: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]: ...
async def get_collection(
self,
name: str,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
) -> AgnosticCollection: ...
async def list_collection_names(
self,
session: Optional[AgnosticClientSession] = None,
filter: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> List[str]: ...
async def list_collections(
self,
session: Optional[AgnosticClientSession] = None,
filter: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> AgnosticCommandCursor: ...
def name(self) -> str: ...
async def validate_collection(
self,
name_or_collection: Union[str, AgnosticCollection],
scandata: bool = False,
full: bool = False,
session: Optional[AgnosticClientSession] = None,
background: Optional[bool] = None,
comment: Optional[Any] = None,
) -> Dict[str, Any]: ...
def with_options(
self,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
) -> AgnosticDatabase: ...
async def _async_aggregate(
self, pipeline: _Pipeline, session: Optional[AgnosticClientSession] = None, **kwargs: Any
) -> AgnosticCommandCursor: ...
def __init__(self, client: AgnosticClient, name: str, **kwargs: Any) -> None: ...
def aggregate(
self, pipeline: _Pipeline, *args: Any, **kwargs: Any
) -> AgnosticLatentCommandCursor: ...
def watch(
self,
pipeline: Optional[_Pipeline] = None,
full_document: Optional[str] = None,
resume_after: Optional[Mapping[str, Any]] = None,
max_await_time_ms: Optional[int] = None,
batch_size: Optional[int] = None,
collation: Optional[_CollationIn] = None,
start_at_operation_time: Optional[Timestamp] = None,
session: Optional[AgnosticClientSession] = None,
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> AgnosticChangeStream: ...
@property
def client(self) -> AgnosticClient: ...
def __getattr__(self, name: str) -> AgnosticCollection: ...
def __getitem__(self, name: str) -> AgnosticCollection: ...
def __call__(self, *args: Any, **kwargs: Any) -> None: ...
def wrap(self, obj: Any) -> Any: ...
def get_io_loop(self) -> Any: ...
class AgnosticCollection(AgnosticBaseProperties):
__motor_class_name__: str
__delegate_class__: Type[Collection]
def __hash__(self) -> int: ...
def __bool__(self) -> bool: ...
async def bulk_write(
self,
requests: Sequence[_WriteOp[_DocumentType]],
ordered: bool = True,
bypass_document_validation: bool = False,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
let: Optional[Mapping] = None,
) -> BulkWriteResult: ...
async def count_documents(
self,
filter: Mapping[str, Any],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> int: ...
async def create_index(
self,
keys: _IndexKeyHint,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> str: ...
async def create_indexes(
self,
indexes: Sequence[IndexModel],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> List[str]: ...
async def delete_many(
self,
filter: Mapping[str, Any],
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> DeleteResult: ...
async def delete_one(
self,
filter: Mapping[str, Any],
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> DeleteResult: ...
async def distinct(
self,
key: str,
filter: Optional[Mapping[str, Any]] = None,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> List[Any]: ...
async def drop(
self,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
encrypted_fields: Optional[Mapping[str, Any]] = None,
) -> None: ...
async def drop_index(
self,
index_or_name: _IndexKeyHint,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> None: ...
async def drop_indexes(
self,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> None: ...
async def estimated_document_count(
self, comment: Optional[Any] = None, **kwargs: Any
) -> int: ...
async def find_one(
self, filter: Optional[Any] = None, *args: Any, **kwargs: Any
) -> Optional[_DocumentType]: ...
async def find_one_and_delete(
self,
filter: Mapping[str, Any],
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None,
sort: Optional[_IndexList] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> _DocumentType: ...
async def find_one_and_replace(
self,
filter: Mapping[str, Any],
replacement: Mapping[str, Any],
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None,
sort: Optional[_IndexList] = None,
upsert: bool = False,
return_document: bool = ReturnDocument.BEFORE,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> _DocumentType: ...
async def find_one_and_update(
self,
filter: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None,
sort: Optional[_IndexList] = None,
upsert: bool = False,
return_document: bool = ReturnDocument.BEFORE,
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> _DocumentType: ...
def full_name(self) -> str: ...
async def index_information(
self, session: Optional[AgnosticClientSession] = None, comment: Optional[Any] = None
) -> MutableMapping[str, Any]: ...
async def insert_many(
self,
documents: Iterable[Union[_DocumentType, RawBSONDocument]],
ordered: bool = True,
bypass_document_validation: bool = False,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
) -> InsertManyResult: ...
async def insert_one(
self,
document: Union[_DocumentType, RawBSONDocument],
bypass_document_validation: bool = False,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
) -> InsertOneResult: ...
def name(self) -> str: ...
async def options(
self, session: Optional[AgnosticClientSession] = None, comment: Optional[Any] = None
) -> MutableMapping[str, Any]: ...
async def rename(
self,
new_name: str,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> MutableMapping[str, Any]: ...
async def replace_one(
self,
filter: Mapping[str, Any],
replacement: Mapping[str, Any],
upsert: bool = False,
bypass_document_validation: bool = False,
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> UpdateResult: ...
async def update_many(
self,
filter: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
upsert: bool = False,
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
bypass_document_validation: Optional[bool] = None,
collation: Optional[_CollationIn] = None,
hint: Optional[_IndexKeyHint] = None,
session: Optional[AgnosticClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> UpdateResult: ...
async def update_one(
self,
filter: Mapping[str, Any],
update: Union[Mapping[str, Any], _Pipeline],
upsert: bool = False,
bypass_document_validation: bool = False,
collation: Optional[_CollationIn] = None,
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
hint: Optional[_IndexKeyHint] = None,
session: Union[Optional[AgnosticClientSession], Optional[AgnosticClientSession]] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> UpdateResult: ...
def with_options(
self,
codec_options: Optional[CodecOptions] = None,
read_preference: Optional[ReadPreference] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
) -> Collection[Mapping[str, Any]]: ...
async def list_search_indexes(
self,
name: Optional[str] = None,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> AgnosticCommandCursor: ...
async def create_search_index(
self,
model: Union[Mapping[str, SearchIndexModel], Any],
session: Optional[AgnosticClientSession] = None,
comment: Any = None,
**kwargs: Any,
) -> str: ...
async def create_search_indexes(
self,
models: List[SearchIndexModel],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> List[str]: ...
async def drop_search_index(
self,
name: str,
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> None: ...
async def update_search_index(
self,
name: str,
definition: Mapping[str, Any],
session: Optional[AgnosticClientSession] = None,
comment: Optional[Any] = None,
**kwargs: Any,
) -> None: ...
def __init__(
self,
database: Database[_DocumentType],
name: str,
codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None,
read_preference: Optional[_ServerMode] = None,
write_concern: Optional[WriteConcern] = None,
read_concern: Optional[ReadConcern] = None,
_delegate: Any = None,
**kwargs: Any,
) -> None: ...
def __getattr__(self, name: str) -> AgnosticCollection: ...
def __getitem__(self, name: str) -> AgnosticCollection: ...
def __call__(self, *args: Any, **kwargs: Any) -> Any: ...
def find(self, *args: Any, **kwargs: Any) -> AgnosticCursor: ...
def find_raw_batches(self, *args: Any, **kwargs: Any) -> AgnosticCursor: ...
def aggregate(
self, pipeline: _Pipeline, *args: Any, **kwargs: Any
) -> AgnosticCommandCursor: ...
def aggregate_raw_batches(
self, pipeline: _Pipeline, **kwargs: Any
) -> AgnosticCommandCursor: ...
def watch(
self,
pipeline: Optional[_Pipeline] = None,
full_document: Optional[str] = None,
resume_after: Optional[Mapping[str, Any]] = None,
max_await_time_ms: Optional[int] = None,
batch_size: Optional[int] = None,
collation: Optional[_CollationIn] = None,
start_at_operation_time: Optional[Timestamp] = None,
session: Optional[AgnosticClientSession] = None,
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> Any: ...
def list_indexes(
self, session: Optional[AgnosticClientSession] = None, **kwargs: Any
) -> AgnosticCommandCursor: ...
def wrap(self, obj: Any) -> Any: ...
def get_io_loop(self) -> Any: ...
class AgnosticBaseCursor(AgnosticBase):
def __init__(
self, cursor: Union[Cursor, CommandCursor, _LatentCursor], collection: AgnosticCollection
) -> None: ...
def address(self) -> Optional[_Address]: ...
def cursor_id(self) -> Optional[int]: ...
def alive(self) -> bool: ...
def session(self) -> Optional[AgnosticClientSession]: ...
async def _async_close(self) -> None: ...
async def _refresh(self) -> int: ...
def __aiter__(self) -> Any: ...
async def next(self) -> _DocumentType: ...
__anext__ = next
async def __aenter__(self) -> Any: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Any: ...
def _get_more(self) -> int: ...
@property
def fetch_next(self) -> Any: ...
def next_object(self) -> Any: ...
def each(self, callback: Callable) -> None: ...
def _each_got_more(self, callback: Callable, future: Any) -> None: ...
def to_list(self, length: int) -> List: ...
def _to_list(self, length: int, the_list: List, future: Any, get_more_result: Any) -> None: ...
def get_io_loop(self) -> Any: ...
def batch_size(self, batch_size: int) -> AgnosticBaseCursor: ...
def _buffer_size(self) -> int: ...
def _query_flags(self) -> Optional[int]: ...
def _data(self) -> None: ...
def _killed(self) -> None: ...
async def close(self) -> None: ...
class AgnosticCursor(AgnosticBaseCursor):
__motor_class_name__: str
__delegate_class__: Type[Cursor]
def collation(self, collation: Optional[_CollationIn]) -> AgnosticCursor: ...
async def distinct(self, key: str) -> List: ...
async def explain(self) -> _DocumentType: ...
def add_option(self, mask: int) -> AgnosticCursor: ...
def remove_option(self, mask: int) -> AgnosticCursor: ...
def limit(self, limit: int) -> AgnosticCursor: ...
def skip(self, skip: int) -> AgnosticCursor: ...
def max_scan(self, max_scan: Optional[int]) -> AgnosticCursor: ...
def sort(
self, key_or_list: _Hint, direction: Optional[Union[int, str]] = None
) -> AgnosticCursor: ...
def hint(self, index: Optional[_Hint]) -> AgnosticCursor: ...
def where(self, code: Union[str, Code]) -> AgnosticCursor: ...
def max_await_time_ms(self, max_await_time_ms: Optional[int]) -> AgnosticCursor: ...
def max_time_ms(self, max_time_ms: Optional[int]) -> AgnosticCursor: ...
def min(self, spec: _Sort) -> AgnosticCursor: ...
def max(self, spec: _Sort) -> AgnosticCursor: ...
def comment(self, comment: Any) -> AgnosticCursor: ...
def allow_disk_use(self, allow_disk_use: bool) -> AgnosticCursor: ...
def rewind(self) -> AgnosticCursor: ...
def clone(self) -> AgnosticCursor: ...
def __copy__(self) -> AgnosticCursor: ...
def __deepcopy__(self, memo: Any) -> AgnosticCursor: ...
def _query_flags(self) -> int: ...
def _data(self) -> Any: ...
def _killed(self) -> Any: ...
class AgnosticRawBatchCursor(AgnosticCursor):
__motor_class_name__: str
__delegate_class__: Type[RawBatchCursor]
class AgnosticCommandCursor(AgnosticBaseCursor):
__motor_class_name__: str
__delegate_class__: Type[CommandCursor]
def _query_flags(self) -> int: ...
def _data(self) -> Any: ...
def _killed(self) -> Any: ...
class AgnosticRawBatchCommandCursor(AgnosticCommandCursor):
__motor_class_name__: str
__delegate_class__: Type[RawBatchCommandCursor]
class _LatentCursor:
def __init__(self, collection: AgnosticCollection): ...
def _CommandCursor__end_session(self, *args: Any, **kwargs: Any) -> None: ...
def _CommandCursor__die(self, *args: Any, **kwargs: Any) -> None: ...
def clone(self) -> _LatentCursor: ...
def rewind(self) -> _LatentCursor: ...
class AgnosticLatentCommandCursor(AgnosticCommandCursor):
__motor_class_name__: str
def __init__(self, collection: AgnosticCollection, start: Any, *args: Any, **kwargs: Any): ...
def _on_started(self, original_future: Any, future: Any) -> None: ...
class AgnosticChangeStream(AgnosticBase):
__motor_class_name__: str
__delegate_class__: Type[ChangeStream]
async def _close(self) -> None: ...
def resume_token(self) -> Optional[Mapping[str, Any]]: ...
def __init__(
self,
target: Union[
pymongo.MongoClient[_DocumentType], Database[_DocumentType], Collection[_DocumentType]
],
pipeline: Optional[_Pipeline],
full_document: Optional[str],
resume_after: Optional[Mapping[str, Any]],
max_await_time_ms: Optional[int],
batch_size: Optional[int],
collation: Optional[_CollationIn],
start_at_operation_time: Optional[Timestamp],
session: Optional[AgnosticClientSession],
start_after: Optional[Mapping[str, Any]],
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
): ...
def _lazy_init(self) -> None: ...
def _try_next(self) -> Optional[_DocumentType]: ...
def alive(self) -> bool: ...
async def next(self) -> _DocumentType: ...
async def try_next(self) -> Optional[_DocumentType]: ...
async def close(self) -> None: ...
def __aiter__(self) -> AgnosticChangeStream: ...
__anext__ = next
async def __aenter__(self) -> AgnosticChangeStream: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def get_io_loop(self) -> Any: ...
def __enter__(self) -> None: ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
class AgnosticClientEncryption(AgnosticBase):
__motor_class_name__: str
__delegate_class__: Type[ClientEncryption]
def __init__(
self,
kms_providers: Mapping[str, Any],
key_vault_namespace: str,
key_vault_client: AgnosticClient,
codec_options: CodecOptions,
io_loop: Optional[Any] = None,
kms_tls_options: Optional[Mapping[str, Any]] = None,
): ...
async def create_data_key(
self,
kms_provider: str,
master_key: Optional[Mapping[str, Any]] = None,
key_alt_names: Optional[Sequence[str]] = None,
key_material: Optional[bytes] = None,
) -> Binary: ...
async def encrypt(
self,
value: Any,
algorithm: str,
key_id: Optional[Binary] = None,
key_alt_name: Optional[str] = None,
query_type: Optional[str] = None,
contention_factor: Optional[int] = None,
range_opts: Optional[RangeOpts] = None,
) -> Binary: ...
async def decrypt(self, value: Binary) -> Any: ...
async def close(self) -> None: ...
async def rewrap_many_data_key(
self,
filter: Mapping[str, Any],
provider: Optional[str] = None,
master_key: Optional[Mapping[str, Any]] = None,
) -> RewrapManyDataKeyResult: ...
async def delete_key(self, id: Binary) -> DeleteResult: ...
async def get_key(self, id: Binary) -> Optional[RawBSONDocument]: ...
async def add_key_alt_name(self, id: Binary, key_alt_name: str) -> Any: ...
async def get_key_by_alt_name(self, key_alt_name: str) -> Optional[RawBSONDocument]: ...
async def remove_key_alt_name(
self, id: Binary, key_alt_name: str
) -> Optional[RawBSONDocument]: ...
async def encrypt_expression(
self,
expression: Mapping[str, Any],
algorithm: str,
key_id: Optional[Binary] = None,
key_alt_name: Optional[str] = None,
query_type: Optional[str] = None,
contention_factor: Optional[int] = None,
range_opts: Optional[RangeOpts] = None,
) -> RawBSONDocument: ...
@property
def io_loop(self) -> Any: ...
def get_io_loop(self) -> Any: ...
async def __aenter__(self) -> AgnosticClientEncryption: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def __enter__(self) -> NoReturn: ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
async def get_keys(self) -> AgnosticCursor: ...
async def create_encrypted_collection(
self,
database: AgnosticDatabase,
name: str,
encrypted_fields: Mapping[str, Any],
kms_provider: Optional[str] = None,
master_key: Optional[Mapping[str, Any]] = None,
**kwargs: Any,
) -> Tuple[AgnosticCollection, Mapping[str, Any]]: ...

View File

@ -16,11 +16,12 @@
import functools
import inspect
from typing import Any, Callable, Dict
_class_cache = {}
_class_cache: Dict[Any, Any] = {}
def asynchronize(framework, sync_method, doc=None, wrap_class=None, unwrap_class=None):
def asynchronize(framework, sync_method: Callable, doc=None, wrap_class=None, unwrap_class=None):
"""Decorate `sync_method` so it returns a Future.
The method runs on a thread and resolves the Future when it completes.

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Asyncio support for Motor, an asynchronous driver for MongoDB."""
from typing import TypeVar
from . import core, motor_gridfs
from .frameworks import asyncio as asyncio_framework
@ -33,8 +34,10 @@ __all__ = [
"AsyncIOMotorClientEncryption",
]
T = TypeVar("T")
def create_asyncio_class(cls):
def create_asyncio_class(cls: T) -> T:
return create_class_with_framework(cls, asyncio_framework, "motor.motor_asyncio")

184
motor/motor_gridfs.pyi Normal file
View File

@ -0,0 +1,184 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GridFS type stubs for Motor, an asynchronous driver for MongoDB."""
import datetime
import os
from typing import Any, Iterable, List, Mapping, NoReturn, Optional, Type
from bson import ObjectId
from gridfs import DEFAULT_CHUNK_SIZE, GridFSBucket, GridIn, GridOut, GridOutCursor
from pymongo import WriteConcern
from pymongo.read_preferences import _ServerMode
from motor.core import (
AgnosticClientSession,
AgnosticCollection,
AgnosticCursor,
AgnosticDatabase,
)
_SEEK_SET = os.SEEK_SET
_SEEK_CUR = os.SEEK_CUR
_SEEK_END = os.SEEK_END
class AgnosticGridOutCursor(AgnosticCursor):
__motor_class_name__: str
__delegate_class__ = type[GridOutCursor]
async def _Cursor__die(self, synchronous: bool = False) -> None: ...
def next_object(self) -> AgnosticGridOutCursor: ...
class AgnosticGridOut(object):
__motor_class_name__: str
__delegate_class__: Type[GridOut]
_id: Any
aliases: Optional[List[str]]
chunk_size: int
filename: Optional[str]
name: Optional[str]
content_type: Optional[str]
length: int
upload_date: datetime.datetime
metadata: Optional[Mapping[str, Any]]
async def _ensure_file(self) -> None: ...
def close(self) -> None: ...
async def read(self, size: int = -1) -> NoReturn: ...
def readable(self) -> bool: ...
async def readchunk(self) -> bytes: ...
async def readline(self, size: int = -1) -> bytes: ...
def seek(self, pos: int, whence: int = _SEEK_SET) -> int: ...
def seekable(self) -> bool: ...
def tell(self) -> int: ...
def write(self, data: Any) -> None: ...
def __init__(
self,
root_collection: AgnosticCollection,
file_id: Optional[int] = None,
file_document: Optional[Any] = None,
delegate: Any = None,
session: Optional[AgnosticClientSession] = None,
) -> None: ...
def __aiter__(self) -> AgnosticGridOut: ...
async def __anext__(self) -> bytes: ...
def __getattr__(self, item: str) -> Any: ...
def open(self) -> Any: ...
def get_io_loop(self) -> Any: ...
async def stream_to_handler(self, request_handler: Any) -> None: ...
class AgnosticGridIn(object):
__motor_class_name__: str
__delegate_class__: Type[GridIn]
__getattr__: Any
_id: Any
filename: str
name: str
content_type: Optional[str]
length: int
chunk_size: int
upload_date: datetime.datetime
async def abort(self) -> None: ...
def closed(self) -> bool: ...
async def close(self) -> None: ...
def read(self, size: int = -1) -> NoReturn: ...
def readable(self) -> bool: ...
def seekable(self) -> bool: ...
async def write(self, data: Any) -> None: ...
def writeable(self) -> bool: ...
async def writelines(self, sequence: Iterable[Any]) -> None: ...
async def _exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Any: ...
async def set(self, name: str, value: Any) -> None: ...
def __init__(
self,
root_collection: AgnosticCollection,
delegate: Any = None,
session: Optional[AgnosticClientSession] = None,
**kwargs: Any
) -> None: ...
async def __aenter__(self) -> AgnosticGridIn: ...
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def get_io_loop(self) -> Any: ...
class AgnosticGridFSBucket(object):
__motor_class_name__: str
__delegate_class__: Type[GridFSBucket]
async def delete(
self, file_id: Any, session: Optional[AgnosticClientSession] = None
) -> None: ...
async def download_to_stream(
self, file_id: Any, destination: Any, session: Optional[AgnosticClientSession] = None
) -> None: ...
async def download_to_stream_by_name(
self,
filename: str,
destination: Any,
revision: int = -1,
session: Optional[AgnosticClientSession] = None,
) -> None: ...
async def open_download_stream_by_name(
self, filename: str, revision: int = -1, session: Optional[AgnosticClientSession] = None
) -> GridOut: ...
async def open_download_stream(
self, file_id: Any, session: Optional[AgnosticClientSession] = None
) -> GridOut: ...
def open_upload_stream(
self,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Mapping[str, Any]] = None,
session: Optional[AgnosticClientSession] = None,
) -> GridIn: ...
def open_upload_stream_with_id(
self,
file_id: Any,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Mapping[str, Any]] = None,
session: Optional[AgnosticClientSession] = None,
) -> GridIn: ...
async def rename(
self, file_id: Any, new_filename: str, session: Optional[AgnosticClientSession] = None
) -> None: ...
async def upload_from_stream(
self,
filename: str,
source: Any,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Mapping[str, Any]] = None,
session: Optional[AgnosticClientSession] = None,
) -> ObjectId: ...
async def upload_from_stream_with_id(
self,
file_id: Any,
filename: str,
source: Any,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Mapping[str, Any]] = None,
session: Optional[AgnosticClientSession] = None,
) -> None: ...
def __init__(
self,
database: AgnosticDatabase,
bucket_name: str = "fs",
chunk_size_bytes: int = DEFAULT_CHUNK_SIZE,
write_concern: Optional[WriteConcern] = None,
read_preference: Optional[_ServerMode] = None,
collection: Optional[AgnosticCollection] = None,
) -> None: ...
def get_io_loop(self) -> Any: ...
def wrap(self, obj: Any) -> Any: ...
def find(self, *args: Any, **kwargs: Any) -> AgnosticGridOutCursor: ...
def _hash_gridout(gridout: AgnosticGridOut) -> str: ...

View File

@ -13,6 +13,7 @@
# limitations under the License.
"""Tornado support for Motor, an asynchronous driver for MongoDB."""
from typing import TypeVar
from . import core, motor_gridfs
from .frameworks import tornado as tornado_framework
@ -33,8 +34,10 @@ __all__ = [
"MotorClientEncryption",
]
T = TypeVar("T")
def create_motor_class(cls):
def create_motor_class(cls: T) -> T:
return create_class_with_framework(cls, tornado_framework, "motor.motor_tornado")

2
motor/py.typed Normal file
View File

@ -0,0 +1,2 @@
# PEP-561 Support File.
# "Package maintainers who wish to support type checking of their code MUST add a marker file named py.typed to their package supporting typing".

11
mypy.ini Normal file
View File

@ -0,0 +1,11 @@
[mypy]
check_untyped_defs = true
disallow_incomplete_defs = true
no_implicit_optional = true
pretty = true
show_error_context = true
show_error_codes = true
strict_equality = true
warn_unused_configs = true
warn_unused_ignores = true
warn_redundant_casts = true

View File

@ -1,3 +1,4 @@
import os
import sys
if sys.version_info[:2] < (3, 10):
@ -30,6 +31,7 @@ Operating System :: Microsoft :: Windows
Programming Language :: Python
Programming Language :: Python :: Implementation :: CPython
Programming Language :: Python :: Implementation :: PyPy
Typing :: Typed
"""
description = "Non-blocking MongoDB driver for Tornado or asyncio"
@ -136,7 +138,8 @@ class test(Command):
runner_class = unittest.TextTestRunner
runner = runner_class(**runner_kwargs)
env.setup()
if "SKIP_ENV_SETUP" not in os.environ:
env.setup()
if not self.tornado_warnings:
suppress_tornado_warnings()

View File

@ -0,0 +1,15 @@
import asyncio
from motor.core import AgnosticClient
async def _main():
client: AgnosticClient = AgnosticClient()
await client.test.test.insert_many(
{"a": 1}
) # error: Dict entry 0 has incompatible type "str": "int"; expected "Mapping[str, Any]": "int"
loop = asyncio.get_event_loop()
loop.run_until_complete(_main())
loop.close()

View File

@ -0,0 +1,15 @@
import asyncio
from motor.core import AgnosticClient
async def _main():
client: AgnosticClient = AgnosticClient()
client.test.test.insert_one(
[{}]
) # error: Argument 1 to "insert_one" of "Collection" has incompatible type "List[Dict[<nothing>, <nothing>]]"; expected "Mapping[str, Any]"
loop = asyncio.get_event_loop()
loop.run_until_complete(_main())
loop.close()

View File

@ -0,0 +1,26 @@
import asyncio
from bson.raw_bson import RawBSONDocument
from motor.core import AgnosticClient
async def _main():
client = AgnosticClient(document_class=RawBSONDocument)
coll = client.test.test
doc = {"my": "doc"}
await coll.insert_one(doc)
retrieved = await coll.find_one({"_id": doc["_id"]})
assert retrieved is not None
assert len(retrieved.raw) > 0
retrieved[
"foo"
] = "bar" # error: Unsupported target for indexed assignment ("RawBSONDocument") [index]
client.test.test.insert_one(
[{}]
) # error: Argument 1 to "insert_one" of "Collection" has incompatible type "List[Dict[<nothing>, <nothing>]]"; expected "Mapping[str, Any]"
loop = asyncio.get_event_loop()
loop.run_until_complete(_main())
loop.close()

33
test/test_mypy_fails.py Normal file
View File

@ -0,0 +1,33 @@
import os
import sys
import unittest
from typing import Iterable
try:
from mypy import api
except ImportError:
api = None
sys.path[0:0] = [""]
TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "mypy_fails")
def get_tests() -> Iterable[str]:
for dirpath, _, filenames in os.walk(TEST_PATH):
for filename in filenames:
yield os.path.join(dirpath, filename)
class TestMypyFails(unittest.TestCase):
def ensure_mypy_fails(self, filename: str) -> None:
if api is None:
raise unittest.SkipTest("Mypy is not installed")
stdout, stderr, exit_status = api.run([filename])
self.assertTrue(exit_status, msg=stdout)
def test_mypy_failures(self) -> None:
for filename in get_tests():
with self.subTest(filename=filename):
self.ensure_mypy_fails(filename)

266
test/test_typing.py Normal file
View File

@ -0,0 +1,266 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test that each file in mypy_fails/ actually fails mypy, and test some
sample client code that uses Motor typings.
"""
import unittest
from test.asyncio_tests import AsyncIOTestCase, asyncio_test
from typing import TYPE_CHECKING, Any, AsyncIterable, Dict, List, Union
from bson import CodecOptions
from bson.raw_bson import RawBSONDocument
from bson.son import SON
from pymongo.operations import DeleteOne, InsertOne, ReplaceOne
from pymongo.read_preferences import ReadPreference
from motor.core import AgnosticClient, AgnosticCollection
try:
from bson import ObjectId
from typing_extensions import NotRequired, TypedDict
class Movie(TypedDict):
name: str
year: int
class MovieWithId(TypedDict):
_id: ObjectId
name: str
year: int
class ImplicitMovie(TypedDict):
_id: NotRequired[ObjectId]
name: str
year: int
except ImportError:
Movie = dict # type:ignore[misc,assignment]
ImplicitMovie = dict # type: ignore[assignment,misc]
MovieWithId = dict # type: ignore[assignment,misc]
TypedDict = None
NotRequired = None # type: ignore[assignment]
def only_type_check(func):
def inner(*args, **kwargs):
if not TYPE_CHECKING:
raise unittest.SkipTest("Used for Type Checking Only")
func(*args, **kwargs)
return inner
class TestMotor(AsyncIOTestCase):
cx: AgnosticClient
@asyncio_test
async def test_insert_find(self) -> None:
doc = {"my": "doc"}
coll: AgnosticCollection = self.collection
coll2 = self.cx.test.test2
result = await coll.insert_one(doc)
self.assertEqual(result.inserted_id, doc["_id"])
retrieved = await coll.find_one({"_id": doc["_id"]})
if retrieved:
# Documents returned from find are mutable.
retrieved["new_field"] = 1
result2 = await coll2.insert_one(retrieved)
self.assertEqual(result2.inserted_id, result.inserted_id)
@asyncio_test
async def test_cursor_iterable(self) -> None:
async def to_list(iterable: AsyncIterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [gen async for gen in iterable]
await self.collection.insert_one({})
cursor = self.collection.find()
docs = await to_list(cursor)
self.assertTrue(docs)
@only_type_check
@asyncio_test
async def test_bulk_write(self) -> None:
await self.collection.insert_one({})
coll: AgnosticCollection = self.collection
requests: List[InsertOne[Movie]] = [InsertOne(Movie(name="American Graffiti", year=1973))]
result_one = await coll.bulk_write(requests)
self.assertTrue(result_one.acknowledged)
new_requests: List[Union[InsertOne[Movie], ReplaceOne[Movie]]] = []
input_list: List[Union[InsertOne[Movie], ReplaceOne[Movie]]] = [
InsertOne(Movie(name="American Graffiti", year=1973)),
ReplaceOne({}, Movie(name="American Graffiti", year=1973)),
]
for i in input_list:
new_requests.append(i)
result_two = await coll.bulk_write(new_requests)
self.assertTrue(result_two.acknowledged)
# Because ReplaceOne is not generic, type checking is not enforced for ReplaceOne in the first example.
@only_type_check
@asyncio_test
async def test_bulk_write_heterogeneous(self):
coll: AgnosticCollection = self.collection
requests: List[Union[InsertOne[Movie], ReplaceOne, DeleteOne]] = [
InsertOne(Movie(name="American Graffiti", year=1973)),
ReplaceOne({}, {"name": "American Graffiti", "year": "WRONG_TYPE"}),
DeleteOne({}),
]
result_one = await coll.bulk_write(requests)
self.assertTrue(result_one.acknowledged)
requests_two: List[Union[InsertOne[Movie], ReplaceOne[Movie], DeleteOne]] = [
InsertOne(Movie(name="American Graffiti", year=1973)),
ReplaceOne(
{},
{"name": "American Graffiti", "year": "WRONG_TYPE"},
),
DeleteOne({}),
]
result_two = await coll.bulk_write(requests_two)
self.assertTrue(result_two.acknowledged)
@asyncio_test
async def test_command(self) -> None:
result: Dict = await self.cx.admin.command("ping")
result.items()
@asyncio_test
async def test_list_collections(self) -> None:
cursor = await self.cx.test.list_collections()
value = await cursor.next()
value.items()
@asyncio_test
async def test_list_databases(self) -> None:
cursor = await self.cx.list_databases()
value = await cursor.next()
value.items()
@asyncio_test
async def test_default_document_type(self) -> None:
client = self.asyncio_client()
self.addCleanup(client.close)
coll = client.test.test
doc = {"my": "doc"}
await coll.insert_one(doc)
retrieved = await coll.find_one({"_id": doc["_id"]})
assert retrieved is not None
retrieved["a"] = 1
@asyncio_test
async def test_aggregate_pipeline(self) -> None:
coll3 = self.cx.test.test3
await coll3.insert_many(
[
{"x": 1, "tags": ["dog", "cat"]},
{"x": 2, "tags": ["cat"]},
{"x": 2, "tags": ["mouse", "cat", "dog"]},
{"x": 3, "tags": []},
]
)
class mydict(Dict[str, Any]):
pass
result = coll3.aggregate(
[
mydict({"$unwind": "$tags"}),
{"$group": {"_id": "$tags", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1), ("_id", -1)])},
]
)
self.assertTrue(len([doc async for doc in result]))
@asyncio_test
async def test_with_transaction(self) -> None:
async def execute_transaction(session):
pass
async with await self.cx.start_session() as session:
return await session.with_transaction(
execute_transaction, read_preference=ReadPreference.PRIMARY
)
class TestDocumentType(AsyncIOTestCase):
@only_type_check
def test_typeddict_explicit_document_type(self) -> None:
out = MovieWithId(_id=ObjectId(), name="THX-1138", year=1971)
assert out is not None
# This should fail because the output is a Movie.
assert out["foo"] # type:ignore[typeddict-item]
assert out["_id"]
# This should work the same as the test above, but this time using NotRequired to allow
# automatic insertion of the _id field by insert_one.
@only_type_check
def test_typeddict_not_required_document_type(self) -> None:
out = ImplicitMovie(name="THX-1138", year=1971)
assert out is not None
# This should fail because the output is a Movie.
assert out["foo"] # type:ignore[typeddict-item]
# pyright gives reportTypedDictNotRequiredAccess for the following:
assert out["_id"]
@only_type_check
def test_typeddict_empty_document_type(self) -> None:
out = Movie(name="THX-1138", year=1971)
assert out is not None
# This should fail because the output is a Movie.
assert out["foo"] # type:ignore[typeddict-item]
# This should fail because _id is not included in our TypedDict definition.
assert out["_id"] # type:ignore[typeddict-item]
class TestCommandDocumentType(AsyncIOTestCase):
@only_type_check
async def test_default(self) -> None:
client: AgnosticClient = AgnosticClient()
result: Dict = await client.admin.command("ping")
result["a"] = 1
@only_type_check
async def test_explicit_document_type(self) -> None:
client: AgnosticClient = AgnosticClient()
codec_options: CodecOptions[Dict[str, Any]] = CodecOptions()
result = await client.admin.command("ping", codec_options=codec_options)
result["a"] = 1
@only_type_check
async def test_typeddict_document_type(self) -> None:
client: AgnosticClient = AgnosticClient()
codec_options: CodecOptions[Movie] = CodecOptions()
result = await client.admin.command("ping", codec_options=codec_options)
assert result["year"] == 1
assert result["name"] == "a"
@only_type_check
async def test_raw_bson_document_type(self) -> None:
client: AgnosticClient = AgnosticClient()
codec_options = CodecOptions(RawBSONDocument)
result: RawBSONDocument = await client.admin.command(
"ping", codec_options=codec_options
) # Fix once @overload for command works
assert len(result.raw) > 0
@only_type_check
async def test_son_document_type(self) -> None:
client = AgnosticClient(document_class=SON[str, Any])
codec_options = CodecOptions(SON[str, Any])
result = await client.admin.command("ping", codec_options=codec_options)
result["a"] = 1
if __name__ == "__main__":
unittest.main()

20
tox.ini
View File

@ -36,6 +36,9 @@ envlist =
# Check the sdist integrity.
manifest
# Typecheck with mypy
typecheck-mypy
[testenv]
passenv =
DB_IP
@ -75,7 +78,7 @@ deps =
{py37,py38,py39,py310,py311}: aiohttp
py312: setuptools
py312: setuptools==68.0.0
sphinx: sphinx
sphinx: aiohttp
@ -89,11 +92,10 @@ deps =
synchro312: tornado>=6,<7
synchro312: pynose
synchro312: setuptools
synchro312: setuptools==68.0.0
synchro312: pytest
pypy37: cryptography<3
setenv =
PYTHONWARNINGS="error,ignore:The distutils package is deprecated:DeprecationWarning"
commands =
@ -198,3 +200,15 @@ per-file-ignores =
# F401 'foo' imported but unused
motor/__init__.py: F401
[testenv:typecheck-mypy]
description = run mypy to typecheck
deps =
mypy==1.2.0
typing_extensions
setuptools==68.0.0
setenv = SKIP_ENV_SETUP=1
commands =
mypy --install-types --non-interactive motor/ --disallow-untyped-defs --follow-imports=skip --exclude 'motor/aiohttp/*' --exclude 'motor/frameworks/*' --exclude 'motor/metaprogramming.py' --exclude 'motor/web.py'
mypy --install-types --non-interactive --follow-imports=skip test/test_typing.py
python setup.py test -s test.test_mypy_fails