Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2024-07-31 12:28:01 -05:00
commit a72820f661
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
44 changed files with 580 additions and 60 deletions

View File

@ -0,0 +1,5 @@
:mod:`change_stream` -- Watch changes on a collection, database, or cluster
===========================================================================
.. automodule:: pymongo.asynchronous.change_stream
:members:

View File

@ -0,0 +1,5 @@
:mod:`client_session` -- Logical sessions for sequential operations
===================================================================
.. automodule:: pymongo.asynchronous.client_session
:members:

View File

@ -0,0 +1,61 @@
:mod:`collection` -- Collection level operations
================================================
.. automodule:: pymongo.asynchronous.collection
:synopsis: Collection level operations
.. autoclass:: pymongo.asynchronous.collection.ReturnDocument
.. autoclass:: pymongo.asynchronous.collection.AsyncCollection(database, name, create=False, **kwargs)
.. describe:: c[name] || c.name
Get the `name` sub-collection of :class:`AsyncCollection` `c`.
Raises :class:`~pymongo.asynchronous.errors.InvalidName` if an invalid
collection name is used.
.. autoattribute:: full_name
.. autoattribute:: name
.. autoattribute:: database
.. autoattribute:: codec_options
.. autoattribute:: read_preference
.. autoattribute:: write_concern
.. autoattribute:: read_concern
.. automethod:: with_options
.. automethod:: bulk_write
.. automethod:: insert_one
.. automethod:: insert_many
.. automethod:: replace_one
.. automethod:: update_one
.. automethod:: update_many
.. automethod:: delete_one
.. automethod:: delete_many
.. automethod:: aggregate
.. automethod:: aggregate_raw_batches
.. automethod:: watch
.. automethod:: find(filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
.. automethod:: find_raw_batches(filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
.. automethod:: find_one(filter=None, *args, **kwargs)
.. automethod:: find_one_and_delete
.. automethod:: find_one_and_replace(filter, replacement, projection=None, sort=None, return_document=ReturnDocument.BEFORE, hint=None, session=None, **kwargs)
.. automethod:: find_one_and_update(filter, update, projection=None, sort=None, return_document=ReturnDocument.BEFORE, array_filters=None, hint=None, session=None, **kwargs)
.. automethod:: count_documents
.. automethod:: estimated_document_count
.. automethod:: distinct
.. automethod:: create_index
.. automethod:: create_indexes
.. automethod:: drop_index
.. automethod:: drop_indexes
.. automethod:: list_indexes
.. automethod:: index_information
.. automethod:: create_search_index
.. automethod:: create_search_indexes
.. automethod:: drop_search_index
.. automethod:: list_search_indexes
.. automethod:: update_search_index
.. automethod:: drop
.. automethod:: rename
.. automethod:: options
.. automethod:: __getitem__
.. automethod:: __getattr__

View File

@ -0,0 +1,6 @@
:mod:`command_cursor` -- Tools for iterating over MongoDB command results
=========================================================================
.. automodule:: pymongo.asynchronous.command_cursor
:synopsis: Tools for iterating over MongoDB command results
:members:

View File

@ -0,0 +1,16 @@
:mod:`cursor` -- Tools for iterating over MongoDB query results
===============================================================
.. automodule:: pymongo.asynchronous.cursor
:synopsis: Tools for iterating over MongoDB query results
.. autoclass:: pymongo.asynchronous.cursor.AsyncCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
:members:
.. describe:: c[index]
See :meth:`__getitem__` and read the warning.
.. automethod:: __getitem__
.. autoclass:: pymongo.asynchronous.cursor.AsyncRawBatchCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, allow_disk_use=None)

View File

@ -0,0 +1,26 @@
:mod:`database` -- Database level operations
============================================
.. automodule:: pymongo.asynchronous.database
:synopsis: Database level operations
.. autoclass:: pymongo.asynchronous.database.AsyncDatabase
:members:
.. describe:: db[collection_name] || db.collection_name
Get the `collection_name` :class:`~pymongo.asynchronous.collection.AsyncCollection` of
:class:`AsyncDatabase` `db`.
Raises :class:`~pymongo.errors.InvalidName` if an invalid collection
name is used.
.. note:: Use dictionary style access if `collection_name` is an
attribute of the :class:`AsyncDatabase` class eg: db[`collection_name`].
.. automethod:: __getitem__
.. automethod:: __getattr__
.. autoattribute:: codec_options
.. autoattribute:: read_preference
.. autoattribute:: write_concern
.. autoattribute:: read_concern

View File

@ -0,0 +1,22 @@
:mod:`pymongo async` -- Async Python driver for MongoDB
=======================================================
.. automodule:: pymongo.asynchronous
:synopsis: Asynchronous Python driver for MongoDB
.. data:: AsyncMongoClient
Alias for :class:`pymongo.asynchronous.mongo_client.MongoClient`.
Sub-modules:
.. toctree::
:maxdepth: 2
change_stream
client_session
collection
command_cursor
cursor
database
mongo_client

View File

@ -0,0 +1,39 @@
:mod:`mongo_client` -- Tools for connecting to MongoDB
======================================================
.. automodule:: pymongo.asynchronous.mongo_client
:synopsis: Tools for connecting to MongoDB
.. autoclass:: pymongo.asynchronous.mongo_client.AsyncMongoClient(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
.. automethod:: aclose
.. describe:: c[db_name] || c.db_name
Get the `db_name` :class:`~pymongo.asynchronous.database.AsyncDatabase` on :class:`AsyncMongoClient` `c`.
Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used.
.. autoattribute:: topology_description
.. autoattribute:: address
.. autoattribute:: primary
.. autoattribute:: secondaries
.. autoattribute:: arbiters
.. autoattribute:: is_primary
.. autoattribute:: is_mongos
.. autoattribute:: nodes
.. autoattribute:: codec_options
.. autoattribute:: read_preference
.. autoattribute:: write_concern
.. autoattribute:: read_concern
.. autoattribute:: options
.. automethod:: start_session
.. automethod:: list_databases
.. automethod:: list_database_names
.. automethod:: drop_database
.. automethod:: get_default_database
.. automethod:: get_database
.. automethod:: server_info
.. automethod:: watch
.. automethod:: __getitem__
.. automethod:: __getattr__

View File

@ -9,6 +9,10 @@
Alias for :class:`pymongo.mongo_client.MongoClient`.
.. data:: AsyncMongoClient
Alias for :class:`pymongo.asynchronous.mongo_client.AsyncMongoClient`.
.. data:: ReadPreference
Alias for :class:`pymongo.read_preferences.ReadPreference`.
@ -27,8 +31,9 @@
Sub-modules:
.. toctree::
:maxdepth: 2
:maxdepth: 3
asynchronous/index
auth_oidc
change_stream
client_options

View File

@ -1892,6 +1892,9 @@ class AsyncGridOutCursor(AsyncCursor):
next_file = await super().next()
return AsyncGridOut(self._root_collection, file_document=next_file, session=self.session)
async def to_list(self) -> list[AsyncGridOut]:
return [x async for x in self] # noqa: C416,RUF100
__anext__ = next
def add_option(self, *args: Any, **kwargs: Any) -> NoReturn:

View File

@ -1878,6 +1878,9 @@ class GridOutCursor(Cursor):
next_file = super().next()
return GridOut(self._root_collection, file_document=next_file, session=self.session)
def to_list(self) -> list[GridOut]:
return [x for x in self] # noqa: C416,RUF100
__next__ = next
def add_option(self, *args: Any, **kwargs: Any) -> NoReturn:

View File

@ -87,7 +87,7 @@ def _resumable(exc: PyMongoError) -> bool:
return False
class ChangeStream(Generic[_DocumentType]):
class AsyncChangeStream(Generic[_DocumentType]):
"""The internal abstract base class for change stream cursors.
Should not be called directly by application developers. Use
@ -276,7 +276,7 @@ class ChangeStream(Generic[_DocumentType]):
self._closed = True
await self._cursor.close()
def __aiter__(self) -> ChangeStream[_DocumentType]:
def __aiter__(self) -> AsyncChangeStream[_DocumentType]:
return self
@property
@ -436,14 +436,14 @@ class ChangeStream(Generic[_DocumentType]):
return _bson_to_dict(change.raw, self._orig_codec_options)
return change
async def __aenter__(self) -> ChangeStream[_DocumentType]:
async def __aenter__(self) -> AsyncChangeStream[_DocumentType]:
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()
class CollectionChangeStream(ChangeStream[_DocumentType]):
class AsyncCollectionChangeStream(AsyncChangeStream[_DocumentType]):
"""A change stream that watches changes on a single collection.
Should not be called directly by application developers. Use
@ -463,7 +463,7 @@ class CollectionChangeStream(ChangeStream[_DocumentType]):
return self._target.database.client
class DatabaseChangeStream(ChangeStream[_DocumentType]):
class AsyncDatabaseChangeStream(AsyncChangeStream[_DocumentType]):
"""A change stream that watches changes on all collections in a database.
Should not be called directly by application developers. Use
@ -483,7 +483,7 @@ class DatabaseChangeStream(ChangeStream[_DocumentType]):
return self._target.client
class ClusterChangeStream(DatabaseChangeStream[_DocumentType]):
class AsyncClusterChangeStream(AsyncDatabaseChangeStream[_DocumentType]):
"""A change stream that watches changes on all collections in the cluster.
Should not be called directly by application developers. Use

View File

@ -36,7 +36,7 @@ guaranteed monotonic reads, even when reading from replica set secondaries.
.. seealso:: The MongoDB documentation on `causal-consistency <https://dochub.mongodb.org/core/causal-consistency>`_.
.. _transactions-ref:
.. _async-transactions-ref:
Transactions
============
@ -93,7 +93,7 @@ running either commitTransaction or abortTransaction, the session is unpinned.
.. seealso:: The MongoDB documentation on `transactions <https://dochub.mongodb.org/core/transactions>`_.
.. _snapshot-reads-ref:
.. _async-snapshot-reads-ref:
Snapshot Reads
==============

View File

@ -48,7 +48,7 @@ from pymongo.asynchronous.aggregation import (
_CollectionRawAggregationCommand,
)
from pymongo.asynchronous.bulk import _AsyncBulk
from pymongo.asynchronous.change_stream import CollectionChangeStream
from pymongo.asynchronous.change_stream import AsyncCollectionChangeStream
from pymongo.asynchronous.command_cursor import (
AsyncCommandCursor,
AsyncRawBatchCommandCursor,
@ -417,12 +417,12 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> CollectionChangeStream[_DocumentType]:
) -> AsyncCollectionChangeStream[_DocumentType]:
"""Watch changes on this collection.
Performs an aggregation with an implicit initial ``$changeStream``
stage and returns a
:class:`~pymongo.change_stream.CollectionChangeStream` cursor which
:class:`~pymongo.asynchronous.change_stream.AsyncCollectionChangeStream` cursor which
iterates over changes on this collection.
.. code-block:: python
@ -431,10 +431,10 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
async for change in stream:
print(change)
The :class:`~pymongo.change_stream.CollectionChangeStream` iterable
The :class:`~pymongo.asynchronous.change_stream.AsyncCollectionChangeStream` iterable
blocks until the next change document is returned or an error is
raised. If the
:meth:`~pymongo.change_stream.CollectionChangeStream.next` method
:meth:`~pymongo.asynchronous.change_stream.AsyncCollectionChangeStream.next` method
encounters a network error when retrieving a batch from the server,
it will automatically attempt to recreate the cursor such that no
change events are missed. Any error encountered during the resume
@ -501,7 +501,7 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
command.
:param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`.
:return: A :class:`~pymongo.change_stream.CollectionChangeStream` cursor.
:return: A :class:`~pymongo.asynchronous.change_stream.AsyncCollectionChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.
@ -525,7 +525,7 @@ class AsyncCollection(common.BaseObject, Generic[_DocumentType]):
.. _change streams specification:
https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md
"""
change_stream = CollectionChangeStream(
change_stream = AsyncCollectionChangeStream(
self,
pipeline,
full_document,

View File

@ -346,6 +346,17 @@ class AsyncCommandCursor(Generic[_DocumentType]):
else:
return None
async def _next_batch(self, result: list) -> bool:
"""Get all available documents from the cursor."""
if not len(self._data) and not self._killed:
await self._refresh()
if len(self._data):
result.extend(self._data)
self._data.clear()
return True
else:
return False
async def try_next(self) -> Optional[_DocumentType]:
"""Advance the cursor without blocking indefinitely.
@ -371,7 +382,11 @@ class AsyncCommandCursor(Generic[_DocumentType]):
await self.close()
async def to_list(self) -> list[_DocumentType]:
return [x async for x in self] # noqa: C416,RUF100
res: list[_DocumentType] = []
while self.alive:
if not await self._next_batch(res):
break
return res
class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]):

View File

@ -1260,6 +1260,20 @@ class AsyncCursor(Generic[_DocumentType]):
else:
raise StopAsyncIteration
async def _next_batch(self, result: list) -> bool:
"""Get all available documents from the cursor."""
if not self._exhaust_checked:
self._exhaust_checked = True
await self._supports_exhaust()
if self._empty:
return False
if len(self._data) or await self._refresh():
result.extend(self._data)
self._data.clear()
return True
else:
return False
async def __anext__(self) -> _DocumentType:
return await self.next()
@ -1273,7 +1287,11 @@ class AsyncCursor(Generic[_DocumentType]):
await self.close()
async def to_list(self) -> list[_DocumentType]:
return [x async for x in self] # noqa: C416,RUF100
res: list[_DocumentType] = []
while self.alive:
if not await self._next_batch(res):
break
return res
class AsyncRawBatchCursor(AsyncCursor, Generic[_DocumentType]):

View File

@ -35,7 +35,7 @@ from bson.dbref import DBRef
from bson.timestamp import Timestamp
from pymongo import _csot, common
from pymongo.asynchronous.aggregation import _DatabaseAggregationCommand
from pymongo.asynchronous.change_stream import DatabaseChangeStream
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
from pymongo.asynchronous.collection import AsyncCollection
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.common import _ecoc_coll_name, _esc_coll_name
@ -332,12 +332,12 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]):
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> DatabaseChangeStream[_DocumentType]:
) -> AsyncDatabaseChangeStream[_DocumentType]:
"""Watch changes on this database.
Performs an aggregation with an implicit initial ``$changeStream``
stage and returns a
:class:`~pymongo.change_stream.DatabaseChangeStream` cursor which
:class:`~pymongo.asynchronous.change_stream.AsyncDatabaseChangeStream` cursor which
iterates over changes on all collections in this database.
Introduced in MongoDB 4.0.
@ -348,10 +348,10 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]):
async for change in stream:
print(change)
The :class:`~pymongo.change_stream.DatabaseChangeStream` iterable
The :class:`~pymongo.asynchronous.change_stream.AsyncDatabaseChangeStream` iterable
blocks until the next change document is returned or an error is
raised. If the
:meth:`~pymongo.change_stream.DatabaseChangeStream.next` method
:meth:`~pymongo.asynchronous.change_stream.AsyncDatabaseChangeStream.next` method
encounters a network error when retrieving a batch from the server,
it will automatically attempt to recreate the cursor such that no
change events are missed. Any error encountered during the resume
@ -409,7 +409,7 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]):
command.
:param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`.
:return: A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor.
:return: A :class:`~pymongo.asynchronous.change_stream.AsyncDatabaseChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.
@ -430,7 +430,7 @@ class AsyncDatabase(common.BaseObject, Generic[_DocumentType]):
.. _change streams specification:
https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md
"""
change_stream = DatabaseChangeStream(
change_stream = AsyncDatabaseChangeStream(
self,
pipeline,
full_document,

View File

@ -60,7 +60,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry
from bson.timestamp import Timestamp
from pymongo import _csot, common, helpers_shared, uri_parser
from pymongo.asynchronous import client_session, database, periodic_executor
from pymongo.asynchronous.change_stream import ChangeStream, ClusterChangeStream
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream
from pymongo.asynchronous.client_session import _EmptyServerSession
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.settings import TopologySettings
@ -919,7 +919,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> ChangeStream[_DocumentType]:
) -> AsyncChangeStream[_DocumentType]:
"""Watch changes on this cluster.
Performs an aggregation with an implicit initial ``$changeStream``
@ -1017,7 +1017,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
.. _change streams specification:
https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md
"""
change_stream = ClusterChangeStream(
change_stream = AsyncClusterChangeStream(
self.admin,
pipeline,
full_document,

View File

@ -12,7 +12,10 @@
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Tools to parse mongo client options."""
"""Tools to parse mongo client options.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, cast

View File

@ -15,6 +15,8 @@
"""Tools for working with `collations`_.
.. _collations: https://www.mongodb.com/docs/manual/reference/collation/
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations

View File

@ -12,7 +12,10 @@
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Advanced options for MongoDB drivers implemented on top of PyMongo."""
"""Advanced options for MongoDB drivers implemented on top of PyMongo.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from collections import namedtuple

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support for automatic client-side field level encryption."""
"""Support for automatic client-side field level encryption.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Mapping, Optional

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions raised by PyMongo."""
"""Exceptions raised by PyMongo.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from ssl import SSLCertVerificationError as _CertificateError # noqa: F401

View File

@ -25,6 +25,9 @@ These loggers can be registered using :func:`register` or
or
``MongoClient(event_listeners=[CommandLogger()])``
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations

View File

@ -20,6 +20,9 @@
are included in the PyMongo distribution under the
:mod:`~pymongo.event_loggers` submodule.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
Use :func:`register` to register global listeners for specific events.
Listeners must inherit from one of the abstract classes below and implement
the correct functions for that class.

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Operation class definitions."""
"""Operation class definitions.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
import enum

View File

@ -12,7 +12,10 @@
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""AsyncConnection pool options for AsyncMongoClient/MongoClient."""
"""Pool options for AsyncMongoClient/MongoClient.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
import copy

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for working with read concerns."""
"""Tools for working with read concerns.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from typing import Any, Optional

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for choosing which member of a replica set to read from."""
"""Utilities for choosing which member of a replica set to read from.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Result class definitions."""
"""Result class definitions.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from typing import Any, Mapping, Optional, cast

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Represent one server the driver is connected to."""
"""Represent one server the driver is connected to.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
import time

View File

@ -423,7 +423,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
Performs an aggregation with an implicit initial ``$changeStream``
stage and returns a
:class:`~pymongo.change_stream.CollectionChangeStream` cursor which
:class:`~pymongo.synchronous.change_stream.CollectionChangeStream` cursor which
iterates over changes on this collection.
.. code-block:: python
@ -432,10 +432,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
async for change in stream:
print(change)
The :class:`~pymongo.change_stream.CollectionChangeStream` iterable
The :class:`~pymongo.synchronous.change_stream.CollectionChangeStream` iterable
blocks until the next change document is returned or an error is
raised. If the
:meth:`~pymongo.change_stream.CollectionChangeStream.next` method
:meth:`~pymongo.synchronous.change_stream.CollectionChangeStream.next` method
encounters a network error when retrieving a batch from the server,
it will automatically attempt to recreate the cursor such that no
change events are missed. Any error encountered during the resume
@ -502,7 +502,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
command.
:param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`.
:return: A :class:`~pymongo.change_stream.CollectionChangeStream` cursor.
:return: A :class:`~pymongo.synchronous.change_stream.CollectionChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.

View File

@ -346,6 +346,17 @@ class CommandCursor(Generic[_DocumentType]):
else:
return None
def _next_batch(self, result: list) -> bool:
"""Get all available documents from the cursor."""
if not len(self._data) and not self._killed:
self._refresh()
if len(self._data):
result.extend(self._data)
self._data.clear()
return True
else:
return False
def try_next(self) -> Optional[_DocumentType]:
"""Advance the cursor without blocking indefinitely.
@ -371,7 +382,11 @@ class CommandCursor(Generic[_DocumentType]):
self.close()
def to_list(self) -> list[_DocumentType]:
return [x for x in self] # noqa: C416,RUF100
res: list[_DocumentType] = []
while self.alive:
if not self._next_batch(res):
break
return res
class RawBatchCommandCursor(CommandCursor[_DocumentType]):

View File

@ -1258,6 +1258,20 @@ class Cursor(Generic[_DocumentType]):
else:
raise StopIteration
def _next_batch(self, result: list) -> bool:
"""Get all available documents from the cursor."""
if not self._exhaust_checked:
self._exhaust_checked = True
self._supports_exhaust()
if self._empty:
return False
if len(self._data) or self._refresh():
result.extend(self._data)
self._data.clear()
return True
else:
return False
def __next__(self) -> _DocumentType:
return self.next()
@ -1271,7 +1285,11 @@ class Cursor(Generic[_DocumentType]):
self.close()
def to_list(self) -> list[_DocumentType]:
return [x for x in self] # noqa: C416,RUF100
res: list[_DocumentType] = []
while self.alive:
if not self._next_batch(res):
break
return res
class RawBatchCursor(Cursor, Generic[_DocumentType]):

View File

@ -337,7 +337,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
Performs an aggregation with an implicit initial ``$changeStream``
stage and returns a
:class:`~pymongo.change_stream.DatabaseChangeStream` cursor which
:class:`~pymongo.synchronous.change_stream.DatabaseChangeStream` cursor which
iterates over changes on all collections in this database.
Introduced in MongoDB 4.0.
@ -348,10 +348,10 @@ class Database(common.BaseObject, Generic[_DocumentType]):
async for change in stream:
print(change)
The :class:`~pymongo.change_stream.DatabaseChangeStream` iterable
The :class:`~pymongo.synchronous.change_stream.DatabaseChangeStream` iterable
blocks until the next change document is returned or an error is
raised. If the
:meth:`~pymongo.change_stream.DatabaseChangeStream.next` method
:meth:`~pymongo.synchronous.change_stream.DatabaseChangeStream.next` method
encounters a network error when retrieving a batch from the server,
it will automatically attempt to recreate the cursor such that no
change events are missed. Any error encountered during the resume
@ -409,7 +409,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
command.
:param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`.
:return: A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor.
:return: A :class:`~pymongo.synchronous.change_stream.DatabaseChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.

View File

@ -12,7 +12,10 @@
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Represent a deployment of MongoDB servers."""
"""Represent a deployment of MongoDB servers.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from random import sample

View File

@ -13,7 +13,10 @@
# permissions and limitations under the License.
"""Tools to parse and validate a MongoDB URI."""
"""Tools to parse and validate a MongoDB URI.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
import re
@ -143,8 +146,21 @@ def parse_host(entity: str, default_port: Optional[int] = DEFAULT_PORT) -> _Addr
)
host, port = host.split(":", 1)
if isinstance(port, str):
if not port.isdigit() or int(port) > 65535 or int(port) <= 0:
raise ValueError(f"Port must be an integer between 0 and 65535: {port!r}")
if not port.isdigit():
# Special case check for mistakes like "mongodb://localhost:27017 ".
if all(c.isspace() or c.isdigit() for c in port):
for c in port:
if c.isspace():
raise ValueError(f"Port contains whitespace character: {c!r}")
# A non-digit port indicates that the URI is invalid, likely because the password
# or username were not escaped.
raise ValueError(
"Port contains non-digit characters. Hint: username and password must be escaped according to "
"RFC 3986, use urllib.parse.quote_plus"
)
if int(port) > 65535 or int(port) <= 0:
raise ValueError("Port must be an integer between 0 and 65535")
port = int(port)
# Normalize hostname to lowercase, since DNS is case-insensitive:

View File

@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for working with write concerns."""
"""Tools for working with write concerns.
.. seealso:: This module is compatible with both the synchronous and asynchronous PyMongo APIs.
"""
from __future__ import annotations
from typing import Any, Optional, Union

View File

@ -1380,6 +1380,43 @@ class TestCursor(AsyncIntegrationTest):
self.assertEqual("getMore", started[1].command_name)
self.assertNotIn("$readPreference", started[1].command)
@async_client_context.require_replica_set
async def test_to_list_tailable(self):
oplog = self.client.local.oplog.rs
last = await oplog.find().sort("$natural", pymongo.DESCENDING).limit(-1).next()
ts = last["ts"]
c = oplog.find(
{"ts": {"$gte": ts}}, cursor_type=pymongo.CursorType.TAILABLE_AWAIT, oplog_replay=True
)
docs = await c.to_list()
self.assertGreaterEqual(len(docs), 1)
async def test_to_list_empty(self):
c = self.db.does_not_exist.find()
docs = await c.to_list()
self.assertEqual([], docs)
@async_client_context.require_replica_set
async def test_command_cursor_to_list(self):
c = await self.db.test.aggregate([{"$changeStream": {}}])
docs = await c.to_list()
self.assertGreaterEqual(len(docs), 0)
@async_client_context.require_replica_set
async def test_command_cursor_to_list_empty(self):
c = await self.db.does_not_exist.aggregate([{"$changeStream": {}}])
docs = await c.to_list()
self.assertEqual([], docs)
class TestRawBatchCursor(AsyncIntegrationTest):
async def test_find_raw(self):

View File

@ -0,0 +1,103 @@
# Copyright 2023-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import os
from test import unittest
from test.asynchronous import AsyncIntegrationTest
from test.utils import async_single_client
from unittest.mock import patch
from bson import json_util
from pymongo.errors import OperationFailure
from pymongo.logger import _DEFAULT_DOCUMENT_LENGTH
_IS_SYNC = False
# https://github.com/mongodb/specifications/tree/master/source/command-logging-and-monitoring/tests#prose-tests
class TestLogger(AsyncIntegrationTest):
async def test_default_truncation_limit(self):
docs = [{"x": "y"} for _ in range(100)]
db = self.db
with patch.dict("os.environ"):
os.environ.pop("MONGOB_LOG_MAX_DOCUMENT_LENGTH", None)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
await db.test.insert_many(docs)
cmd_started_log = json_util.loads(cm.records[0].message)
self.assertEqual(len(cmd_started_log["command"]), _DEFAULT_DOCUMENT_LENGTH + 3)
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertLessEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
await db.test.find({}).to_list()
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)
async def test_configured_truncation_limit(self):
cmd = {"hello": True}
db = self.db
with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": "5"}):
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
await db.command(cmd)
cmd_started_log = json_util.loads(cm.records[0].message)
self.assertEqual(len(cmd_started_log["command"]), 5 + 3)
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertLessEqual(len(cmd_succeeded_log["reply"]), 5 + 3)
with self.assertRaises(OperationFailure):
await db.command({"notARealCommand": True})
cmd_failed_log = json_util.loads(cm.records[-1].message)
self.assertEqual(len(cmd_failed_log["failure"]), 5 + 3)
async def test_truncation_multi_byte_codepoints(self):
document_lengths = ["20000", "20001", "20002"]
multi_byte_char_str_len = 50_000
str_to_repeat = ""
multi_byte_char_str = ""
for i in range(multi_byte_char_str_len):
multi_byte_char_str += str_to_repeat
for length in document_lengths:
with patch.dict("os.environ", {"MONGOB_LOG_MAX_DOCUMENT_LENGTH": length}):
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
await self.db.test.insert_one({"x": multi_byte_char_str})
cmd_started_log = json_util.loads(cm.records[0].message)["command"]
cmd_started_log = cmd_started_log[:-3]
last_3_bytes = cmd_started_log.encode()[-3:].decode()
self.assertEqual(last_3_bytes, str_to_repeat)
async def test_logging_without_listeners(self):
c = await async_single_client()
self.assertEqual(len(c._event_listeners.event_listeners()), 0)
with self.assertLogs("pymongo.connection", level="DEBUG") as cm:
await c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
await c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
with self.assertLogs("pymongo.serverSelection", level="DEBUG") as cm:
await c.db.test.insert_one({"x": "1"})
self.assertGreater(len(cm.records), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -1371,6 +1371,43 @@ class TestCursor(IntegrationTest):
self.assertEqual("getMore", started[1].command_name)
self.assertNotIn("$readPreference", started[1].command)
@client_context.require_replica_set
def test_to_list_tailable(self):
oplog = self.client.local.oplog.rs
last = oplog.find().sort("$natural", pymongo.DESCENDING).limit(-1).next()
ts = last["ts"]
c = oplog.find(
{"ts": {"$gte": ts}}, cursor_type=pymongo.CursorType.TAILABLE_AWAIT, oplog_replay=True
)
docs = c.to_list()
self.assertGreaterEqual(len(docs), 1)
def test_to_list_empty(self):
c = self.db.does_not_exist.find()
docs = c.to_list()
self.assertEqual([], docs)
@client_context.require_replica_set
def test_command_cursor_to_list(self):
c = self.db.test.aggregate([{"$changeStream": {}}])
docs = c.to_list()
self.assertGreaterEqual(len(docs), 0)
@client_context.require_replica_set
def test_command_cursor_to_list_empty(self):
c = self.db.does_not_exist.aggregate([{"$changeStream": {}}])
docs = c.to_list()
self.assertEqual([], docs)
class TestRawBatchCursor(IntegrationTest):
def test_find_raw(self):

View File

@ -14,8 +14,7 @@
from __future__ import annotations
import os
from test import unittest
from test.test_client import IntegrationTest
from test import IntegrationTest, unittest
from test.utils import single_client
from unittest.mock import patch
@ -23,6 +22,8 @@ from bson import json_util
from pymongo.errors import OperationFailure
from pymongo.logger import _DEFAULT_DOCUMENT_LENGTH
_IS_SYNC = True
# https://github.com/mongodb/specifications/tree/master/source/command-logging-and-monitoring/tests#prose-tests
class TestLogger(IntegrationTest):
@ -42,7 +43,7 @@ class TestLogger(IntegrationTest):
self.assertLessEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)
with self.assertLogs("pymongo.command", level="DEBUG") as cm:
list(db.test.find({}))
db.test.find({}).to_list()
cmd_succeeded_log = json_util.loads(cm.records[1].message)
self.assertEqual(len(cmd_succeeded_log["reply"]), _DEFAULT_DOCUMENT_LENGTH + 3)

View File

@ -160,10 +160,6 @@ class TestURI(unittest.TestCase):
self.assertRaises(InvalidURI, parse_uri, "http://foo@foobar.com")
self.assertRaises(ValueError, parse_uri, "mongodb://::1", 27017)
# Extra whitespace should be visible in error message.
with self.assertRaisesRegex(ValueError, "'27017 '"):
parse_uri("mongodb://localhost:27017 ")
orig: dict = {
"nodelist": [("localhost", 27017)],
"username": None,
@ -536,6 +532,28 @@ class TestURI(unittest.TestCase):
self.assertEqual(user, res["username"])
self.assertEqual(pwd, res["password"])
def test_do_not_include_password_in_port_message(self):
with self.assertRaisesRegex(ValueError, "Port must be an integer between 0 and 65535"):
parse_uri("mongodb://localhost:65536")
with self.assertRaisesRegex(
ValueError, "Port contains non-digit characters. Hint: username "
) as ctx:
parse_uri("mongodb://user:PASS /@localhost:27017")
self.assertNotIn("PASS", str(ctx.exception))
# This "invalid" case is technically a valid URI:
res = parse_uri("mongodb://user:1234/@localhost:27017")
self.assertEqual([("user", 1234)], res["nodelist"])
self.assertEqual("@localhost:27017", res["database"])
def test_port_with_whitespace(self):
with self.assertRaisesRegex(ValueError, "Port contains whitespace character: ' '"):
parse_uri("mongodb://localhost:27017 ")
with self.assertRaisesRegex(ValueError, "Port contains whitespace character: ' '"):
parse_uri("mongodb://localhost: 27017")
with self.assertRaisesRegex(ValueError, r"Port contains whitespace character: '\\n'"):
parse_uri("mongodb://localhost:27\n017")
if __name__ == "__main__":
unittest.main()

View File

@ -34,6 +34,10 @@ replacements = {
"AsyncRawBatchCursor": "RawBatchCursor",
"AsyncRawBatchCommandCursor": "RawBatchCommandCursor",
"AsyncClientSession": "ClientSession",
"AsyncChangeStream": "ChangeStream",
"AsyncCollectionChangeStream": "CollectionChangeStream",
"AsyncDatabaseChangeStream": "DatabaseChangeStream",
"AsyncClusterChangeStream": "ClusterChangeStream",
"_AsyncBulk": "_Bulk",
"AsyncConnection": "Connection",
"async_command": "command",
@ -88,6 +92,8 @@ replacements = {
"get_async_mock_client": "get_mock_client",
"aconnect": "_connect",
"aclose": "close",
"async-transactions-ref": "transactions-ref",
"async-snapshot-reads-ref": "snapshot-reads-ref",
}
docstring_replacements: dict[tuple[str, str], str] = {
@ -148,6 +154,7 @@ converted_tests = [
"test_collection.py",
"test_cursor.py",
"test_database.py",
"test_logger.py",
"test_session.py",
"test_transactions.py",
]