From 4e1035ece889b1526ecb4d305e11c5f6d96a15c7 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 10 Sep 2024 12:16:03 -0500 Subject: [PATCH] PYTHON-4747 Sync collection.py to master --- pymongo/collection.py | 25 + pymongo/synchronous/collection.py | 879 ++++++++++++++++-------------- 2 files changed, 500 insertions(+), 404 deletions(-) create mode 100644 pymongo/collection.py diff --git a/pymongo/collection.py b/pymongo/collection.py new file mode 100644 index 000000000..f726ed037 --- /dev/null +++ b/pymongo/collection.py @@ -0,0 +1,25 @@ +# Copyright 2024-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Re-import of synchronous Collection API for compatibility.""" +from __future__ import annotations + +from pymongo.synchronous.collection import * # noqa: F403 +from pymongo.synchronous.collection import __doc__ as original_doc + +__doc__ = original_doc +__all__ = [ # noqa: F405 + "Collection", + "ReturnDocument", +] diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index ddfe9f1df..93e24432e 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -15,6 +15,7 @@ """Collection level utilities for Mongo.""" from __future__ import annotations +import warnings from collections import abc from typing import ( TYPE_CHECKING, @@ -40,24 +41,16 @@ from bson.objectid import ObjectId from bson.raw_bson import RawBSONDocument from bson.son import SON from bson.timestamp import Timestamp -from pymongo import ASCENDING, _csot, common, helpers, message -from pymongo.aggregation import ( - _CollectionAggregationCommand, - _CollectionRawAggregationCommand, -) -from pymongo.bulk import _Bulk -from pymongo.change_stream import CollectionChangeStream +from pymongo import ASCENDING, _csot, common, helpers_shared, message from pymongo.collation import validate_collation_or_none -from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor from pymongo.common import _ecoc_coll_name, _esc_coll_name -from pymongo.cursor import Cursor, RawBatchCursor from pymongo.errors import ( ConfigurationError, InvalidName, InvalidOperation, OperationFailure, ) -from pymongo.helpers import _check_write_command_response +from pymongo.helpers_shared import _check_write_command_response from pymongo.message import _UNICODE_REPLACE_CODEC_OPTIONS from pymongo.operations import ( DeleteMany, @@ -72,7 +65,7 @@ from pymongo.operations import ( _IndexList, _Op, ) -from pymongo.read_concern import DEFAULT_READ_CONCERN, ReadConcern +from pymongo.read_concern import DEFAULT_READ_CONCERN from pymongo.read_preferences import ReadPreference, _ServerMode from pymongo.results import ( BulkWriteResult, @@ -81,9 +74,25 @@ from pymongo.results import ( InsertOneResult, UpdateResult, ) +from pymongo.synchronous.aggregation import ( + _CollectionAggregationCommand, + _CollectionRawAggregationCommand, +) +from pymongo.synchronous.bulk import _Bulk +from pymongo.synchronous.change_stream import CollectionChangeStream +from pymongo.synchronous.command_cursor import ( + CommandCursor, + RawBatchCommandCursor, +) +from pymongo.synchronous.cursor import ( + Cursor, + RawBatchCursor, +) from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean +_IS_SYNC = True + T = TypeVar("T") _FIND_AND_MODIFY_DOC_FIELDS = {"value": 1} @@ -114,12 +123,14 @@ class ReturnDocument: if TYPE_CHECKING: - from pymongo.aggregation import _AggregationCommand - from pymongo.client_session import ClientSession + import bson from pymongo.collation import Collation - from pymongo.database import Database - from pymongo.pool import Connection - from pymongo.server import Server + from pymongo.read_concern import ReadConcern + from pymongo.synchronous.aggregation import _AggregationCommand + from pymongo.synchronous.client_session import ClientSession + from pymongo.synchronous.database import Database + from pymongo.synchronous.pool import Connection + from pymongo.synchronous.server import Server class Collection(common.BaseObject, Generic[_DocumentType]): @@ -155,8 +166,8 @@ class Collection(common.BaseObject, Generic[_DocumentType]): :param database: the database to get a collection from :param name: the name of the collection to get - :param create: if ``True``, force collection - creation even without options being set + :param create: If ``True``, force collection + creation even without options being set. :param codec_options: An instance of :class:`~bson.codec_options.CodecOptions`. If ``None`` (the default) database.codec_options is used. @@ -171,11 +182,11 @@ class Collection(common.BaseObject, Generic[_DocumentType]): :param collation: An instance of :class:`~pymongo.collation.Collation`. If a collation is provided, it will be passed to the create collection command. - :param session: a + :param session: A :class:`~pymongo.client_session.ClientSession` that is used with - the create collection command - :param kwargs: additional keyword arguments will - be passed as options for the create collection command + the create collection command. + :param kwargs: Additional keyword arguments will + be passed as options for the create collection command. .. versionchanged:: 4.2 Added the ``clusteredIndex`` and ``encryptedFields`` parameters. @@ -220,6 +231,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]): ) if not isinstance(name, str): raise TypeError("name must be an instance of str") + from pymongo.synchronous.database import Database + + if not isinstance(database, Database): + raise TypeError(f"Collection requires a Database but {type(database)} given") if not name or ".." in name: raise InvalidName("collection names cannot be empty") @@ -229,33 +244,315 @@ class Collection(common.BaseObject, Generic[_DocumentType]): raise InvalidName("collection names must not start or end with '.': %r" % name) if "\x00" in name: raise InvalidName("collection names must not contain the null character") - collation = validate_collation_or_none(kwargs.pop("collation", None)) - self.__database: Database[_DocumentType] = database - self.__name = name - self.__full_name = f"{self.__database.name}.{self.__name}" - self.__write_response_codec_options = self.codec_options._replace( + self._database: Database[_DocumentType] = database + self._name = name + self._full_name = f"{self._database.name}.{self._name}" + self._write_response_codec_options = self.codec_options._replace( unicode_decode_error_handler="replace", document_class=dict ) self._timeout = database.client.options.timeout - encrypted_fields = kwargs.pop("encryptedFields", None) - if create or kwargs or collation: - if encrypted_fields: - common.validate_is_mapping("encrypted_fields", encrypted_fields) - opts = {"clusteredIndex": {"key": {"_id": 1}, "unique": True}} - self.__create( - _esc_coll_name(encrypted_fields, name), opts, None, session, qev2_required=True + + if create or kwargs: + if _IS_SYNC: + warnings.warn( + "The `create` and `kwargs` arguments to Collection are deprecated and will be removed in PyMongo 5.0", + DeprecationWarning, + stacklevel=2, ) - self.__create(_ecoc_coll_name(encrypted_fields, name), opts, None, session) - self.__create(name, kwargs, collation, session, encrypted_fields=encrypted_fields) - self.create_index([("__safeContent__", ASCENDING)], session) + self._create(kwargs, session) # type: ignore[unused-coroutine] else: - self.__create(name, kwargs, collation, session) + raise ValueError("Collection does not support the `create` or `kwargs` arguments.") + + def __getattr__(self, name: str) -> Collection[_DocumentType]: + """Get a sub-collection of this collection by name. + + Raises InvalidName if an invalid collection name is used. + + :param name: the name of the collection to get + """ + if name.startswith("_"): + full_name = f"{self._name}.{name}" + raise AttributeError( + f"{type(self).__name__} has no attribute {name!r}. To access the {full_name}" + f" collection, use database['{full_name}']." + ) + return self.__getitem__(name) + + def __getitem__(self, name: str) -> Collection[_DocumentType]: + return Collection( + self._database, + f"{self._name}.{name}", + False, + self.codec_options, + self.read_preference, + self.write_concern, + self.read_concern, + ) + + def __repr__(self) -> str: + return f"{type(self).__name__}({self._database!r}, {self._name!r})" + + def __eq__(self, other: Any) -> bool: + if isinstance(other, Collection): + return self._database == other.database and self._name == other.name + return NotImplemented + + def __ne__(self, other: Any) -> bool: + return not self == other + + def __hash__(self) -> int: + return hash((self._database, self._name)) + + def __bool__(self) -> NoReturn: + raise NotImplementedError( + f"{type(self).__name__} objects do not implement truth " + "value testing or bool(). Please compare " + "with None instead: collection is not None" + ) + + @property + def full_name(self) -> str: + """The full name of this :class:`Collection`. + + The full name is of the form `database_name.collection_name`. + """ + return self._full_name + + @property + def name(self) -> str: + """The name of this :class:`Collection`.""" + return self._name + + @property + def database(self) -> Database[_DocumentType]: + """The :class:`~pymongo.database.Database` that this + :class:`Collection` is a part of. + """ + return self._database + + def with_options( + self, + codec_options: Optional[bson.CodecOptions[_DocumentTypeArg]] = None, + read_preference: Optional[_ServerMode] = None, + write_concern: Optional[WriteConcern] = None, + read_concern: Optional[ReadConcern] = None, + ) -> Collection[_DocumentType]: + """Get a clone of this collection changing the specified settings. + + >>> coll1.read_preference + Primary() + >>> from pymongo import ReadPreference + >>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY) + >>> coll1.read_preference + Primary() + >>> coll2.read_preference + Secondary(tag_sets=None) + + :param codec_options: An instance of + :class:`~bson.codec_options.CodecOptions`. If ``None`` (the + default) the :attr:`codec_options` of this :class:`Collection` + is used. + :param read_preference: The read preference to use. If + ``None`` (the default) the :attr:`read_preference` of this + :class:`Collection` is used. See :mod:`~pymongo.read_preferences` + for options. + :param write_concern: An instance of + :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the + default) the :attr:`write_concern` of this :class:`Collection` + is used. + :param read_concern: An instance of + :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the + default) the :attr:`read_concern` of this :class:`Collection` + is used. + """ + return Collection( + self._database, + self._name, + False, + codec_options or self.codec_options, + read_preference or self.read_preference, + write_concern or self.write_concern, + read_concern or self.read_concern, + ) + + def _write_concern_for_cmd( + self, cmd: Mapping[str, Any], session: Optional[ClientSession] + ) -> WriteConcern: + raw_wc = cmd.get("writeConcern") + if raw_wc is not None: + return WriteConcern(**raw_wc) + else: + return self._write_concern_for(session) + + # See PYTHON-3084. + __iter__ = None + + def __next__(self) -> NoReturn: + raise TypeError("'Collection' object is not iterable") + + next = __next__ + + def __call__(self, *args: Any, **kwargs: Any) -> NoReturn: + """This is only here so that some API misusages are easier to debug.""" + if "." not in self._name: + raise TypeError( + f"'{type(self).__name__}' object is not callable. If you " + "meant to call the '%s' method on a 'Database' " + "object it is failing because no such method " + "exists." % self._name + ) + raise TypeError( + f"'{type(self).__name__}' object is not callable. If you meant to " + f"call the '%s' method on a '{type(self).__name__}' object it is " + "failing because no such method exists." % self._name.split(".")[-1] + ) + + def watch( + self, + pipeline: Optional[_Pipeline] = None, + full_document: Optional[str] = None, + resume_after: Optional[Mapping[str, Any]] = None, + max_await_time_ms: Optional[int] = None, + batch_size: Optional[int] = None, + collation: Optional[_CollationIn] = None, + start_at_operation_time: Optional[Timestamp] = None, + session: Optional[ClientSession] = None, + start_after: Optional[Mapping[str, Any]] = None, + comment: Optional[Any] = None, + full_document_before_change: Optional[str] = None, + show_expanded_events: Optional[bool] = None, + ) -> CollectionChangeStream[_DocumentType]: + """Watch changes on this collection. + + Performs an aggregation with an implicit initial ``$changeStream`` + stage and returns a + :class:`~pymongo.change_stream.CollectionChangeStream` cursor which + iterates over changes on this collection. + + .. code-block:: python + + with db.collection.watch() as stream: + for change in stream: + print(change) + + The :class:`~pymongo.change_stream.CollectionChangeStream` iterable + blocks until the next change document is returned or an error is + raised. If the + :meth:`~pymongo.change_stream.CollectionChangeStream.next` method + encounters a network error when retrieving a batch from the server, + it will automatically attempt to recreate the cursor such that no + change events are missed. Any error encountered during the resume + attempt indicates there may be an outage and will be raised. + + .. code-block:: python + + try: + with db.coll.watch([{"$match": {"operationType": "insert"}}]) as stream: + for insert_change in stream: + print(insert_change) + except pymongo.errors.PyMongoError: + # The ChangeStream encountered an unrecoverable error or the + # resume attempt failed to recreate the cursor. + logging.error("...") + + For a precise description of the resume process see the + `change streams specification`_. + + .. note:: Using this helper method is preferred to directly calling + :meth:`~pymongo.collection.Collection.aggregate` with a + ``$changeStream`` stage, for the purpose of supporting + resumability. + + .. warning:: This Collection's :attr:`read_concern` must be + ``ReadConcern("majority")`` in order to use the ``$changeStream`` + stage. + + :param pipeline: A list of aggregation pipeline stages to + append to an initial ``$changeStream`` stage. Not all + pipeline stages are valid after a ``$changeStream`` stage, see the + MongoDB documentation on change streams for the supported stages. + :param full_document: The fullDocument to pass as an option + to the ``$changeStream`` stage. Allowed values: 'updateLookup', + 'whenAvailable', 'required'. When set to 'updateLookup', the + change notification for partial updates will include both a delta + describing the changes to the document, as well as a copy of the + entire document that was changed from some time after the change + occurred. + :param full_document_before_change: Allowed values: 'whenAvailable' + and 'required'. Change events may now result in a + 'fullDocumentBeforeChange' response field. + :param resume_after: A resume token. If provided, the + change stream will start returning changes that occur directly + after the operation specified in the resume token. A resume token + is the _id value of a change document. + :param max_await_time_ms: The maximum time in milliseconds + for the server to wait for changes before responding to a getMore + operation. + :param batch_size: The maximum number of documents to return + per batch. + :param collation: The :class:`~pymongo.collation.Collation` + to use for the aggregation. + :param start_at_operation_time: If provided, the resulting + change stream will only return changes that occurred at or after + the specified :class:`~bson.timestamp.Timestamp`. Requires + MongoDB >= 4.0. + :param session: a + :class:`~pymongo.client_session.ClientSession`. + :param start_after: The same as `resume_after` except that + `start_after` can resume notifications after an invalidate event. + This option and `resume_after` are mutually exclusive. + :param comment: A user-provided comment to attach to this + command. + :param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`. + + :return: A :class:`~pymongo.change_stream.CollectionChangeStream` cursor. + + .. versionchanged:: 4.3 + Added `show_expanded_events` parameter. + + .. versionchanged:: 4.2 + Added ``full_document_before_change`` parameter. + + .. versionchanged:: 4.1 + Added ``comment`` parameter. + + .. versionchanged:: 3.9 + Added the ``start_after`` parameter. + + .. versionchanged:: 3.7 + Added the ``start_at_operation_time`` parameter. + + .. versionadded:: 3.6 + + .. seealso:: The MongoDB documentation on `changeStreams `_. + + .. _change streams specification: + https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md + """ + change_stream = CollectionChangeStream( + self, + pipeline, + full_document, + resume_after, + max_await_time_ms, + batch_size, + collation, + start_at_operation_time, + session, + start_after, + comment, + full_document_before_change, + show_expanded_events, + ) + + change_stream._initialize_cursor() + return change_stream def _conn_for_writes( self, session: Optional[ClientSession], operation: str ) -> ContextManager[Connection]: - return self.__database.client._conn_for_writes(session, operation) + return self._database.client._conn_for_writes(session, operation) def _command( self, @@ -297,9 +594,9 @@ class Collection(common.BaseObject, Generic[_DocumentType]): :return: The result document. """ - with self.__database.client._tmp_session(session) as s: + with self._database.client._tmp_session(session) as s: return conn.command( - self.__database.name, + self._database.name, command, read_preference or self._read_preference_for(session), codec_options or self.codec_options, @@ -310,12 +607,12 @@ class Collection(common.BaseObject, Generic[_DocumentType]): parse_write_concern_error=True, collation=collation, session=s, - client=self.__database.client, + client=self._database.client, retryable_write=retryable_write, user_fields=user_fields, ) - def __create( + def _create_helper( self, name: str, options: MutableMapping[str, Any], @@ -350,117 +647,30 @@ class Collection(common.BaseObject, Generic[_DocumentType]): session=session, ) - def __getattr__(self, name: str) -> Collection[_DocumentType]: - """Get a sub-collection of this collection by name. - - Raises InvalidName if an invalid collection name is used. - - :param name: the name of the collection to get - """ - if name.startswith("_"): - full_name = f"{self.__name}.{name}" - raise AttributeError( - f"Collection has no attribute {name!r}. To access the {full_name}" - f" collection, use database['{full_name}']." - ) - return self.__getitem__(name) - - def __getitem__(self, name: str) -> Collection[_DocumentType]: - return Collection( - self.__database, - f"{self.__name}.{name}", - False, - self.codec_options, - self.read_preference, - self.write_concern, - self.read_concern, - ) - - def __repr__(self) -> str: - return f"Collection({self.__database!r}, {self.__name!r})" - - def __eq__(self, other: Any) -> bool: - if isinstance(other, Collection): - return self.__database == other.database and self.__name == other.name - return NotImplemented - - def __ne__(self, other: Any) -> bool: - return not self == other - - def __hash__(self) -> int: - return hash((self.__database, self.__name)) - - def __bool__(self) -> NoReturn: - raise NotImplementedError( - "Collection objects do not implement truth " - "value testing or bool(). Please compare " - "with None instead: collection is not None" - ) - - @property - def full_name(self) -> str: - """The full name of this :class:`Collection`. - - The full name is of the form `database_name.collection_name`. - """ - return self.__full_name - - @property - def name(self) -> str: - """The name of this :class:`Collection`.""" - return self.__name - - @property - def database(self) -> Database[_DocumentType]: - """The :class:`~pymongo.database.Database` that this - :class:`Collection` is a part of. - """ - return self.__database - - def with_options( + def _create( self, - codec_options: Optional[CodecOptions[_DocumentTypeArg]] = None, - read_preference: Optional[_ServerMode] = None, - write_concern: Optional[WriteConcern] = None, - read_concern: Optional[ReadConcern] = None, - ) -> Collection[_DocumentType]: - """Get a clone of this collection changing the specified settings. - - >>> coll1.read_preference - Primary() - >>> from pymongo import ReadPreference - >>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY) - >>> coll1.read_preference - Primary() - >>> coll2.read_preference - Secondary(tag_sets=None) - - :param codec_options: An instance of - :class:`~bson.codec_options.CodecOptions`. If ``None`` (the - default) the :attr:`codec_options` of this :class:`Collection` - is used. - :param read_preference: The read preference to use. If - ``None`` (the default) the :attr:`read_preference` of this - :class:`Collection` is used. See :mod:`~pymongo.read_preferences` - for options. - :param write_concern: An instance of - :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the - default) the :attr:`write_concern` of this :class:`Collection` - is used. - :param read_concern: An instance of - :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the - default) the :attr:`read_concern` of this :class:`Collection` - is used. - """ - return Collection( - self.__database, - self.__name, - False, - codec_options or self.codec_options, - read_preference or self.read_preference, - write_concern or self.write_concern, - read_concern or self.read_concern, - ) + options: MutableMapping[str, Any], + session: Optional[ClientSession], + ) -> None: + collation = validate_collation_or_none(options.pop("collation", None)) + encrypted_fields = options.pop("encryptedFields", None) + if encrypted_fields: + common.validate_is_mapping("encrypted_fields", encrypted_fields) + opts = {"clusteredIndex": {"key": {"_id": 1}, "unique": True}} + self._create_helper( + _esc_coll_name(encrypted_fields, self._name), + opts, + None, + session, + qev2_required=True, + ) + self._create_helper(_ecoc_coll_name(encrypted_fields, self._name), opts, None, session) + self._create_helper( + self._name, options, collation, session, encrypted_fields=encrypted_fields + ) + self.create_index([("__safeContent__", ASCENDING)], session) + else: + self._create_helper(self._name, options, collation, session) @_csot.apply def bulk_write( @@ -584,18 +794,18 @@ class Collection(common.BaseObject, Generic[_DocumentType]): command["bypassDocumentValidation"] = True result = conn.command( - self.__database.name, + self._database.name, command, write_concern=write_concern, - codec_options=self.__write_response_codec_options, + codec_options=self._write_response_codec_options, session=session, - client=self.__database.client, + client=self._database.client, retryable_write=retryable_write, ) _check_write_command_response(result) - self.__database.client._retryable_write( + self._database.client._retryable_write( acknowledged, _insert_command, session, operation=_Op.INSERT ) @@ -788,7 +998,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): "Must be connected to MongoDB 4.2+ to use hint on unacknowledged update commands." ) if not isinstance(hint, str): - hint = helpers._index_document(hint) + hint = helpers_shared._index_document(hint) update_doc["hint"] = hint command = {"update": self.name, "ordered": ordered, "updates": [update_doc]} if let is not None: @@ -803,14 +1013,16 @@ class Collection(common.BaseObject, Generic[_DocumentType]): # The command result has to be published for APM unmodified # so we make a shallow copy here before adding updatedExisting. - result = conn.command( - self.__database.name, - command, - write_concern=write_concern, - codec_options=self.__write_response_codec_options, - session=session, - client=self.__database.client, - retryable_write=retryable_write, + result = ( + conn.command( + self._database.name, + command, + write_concern=write_concern, + codec_options=self._write_response_codec_options, + session=session, + client=self._database.client, + retryable_write=retryable_write, + ) ).copy() _check_write_command_response(result) # Add the updatedExisting field for compatibility. @@ -869,7 +1081,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): comment=comment, ) - return self.__database.client._retryable_write( + return self._database.client._retryable_write( (write_concern or self.write_concern).acknowledged and not multi, _update, session, @@ -1224,15 +1436,15 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionchanged:: 3.6 Added ``session`` parameter. """ - dbo = self.__database.client.get_database( - self.__database.name, + dbo = self._database.client.get_database( + self._database.name, self.codec_options, self.read_preference, self.write_concern, self.read_concern, ) dbo.drop_collection( - self.__name, session=session, comment=comment, encrypted_fields=encrypted_fields + self._name, session=session, comment=comment, encrypted_fields=encrypted_fields ) def _delete( @@ -1267,7 +1479,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): "Must be connected to MongoDB 4.4+ to use hint on unacknowledged delete commands." ) if not isinstance(hint, str): - hint = helpers._index_document(hint) + hint = helpers_shared._index_document(hint) delete_doc["hint"] = hint command = {"delete": self.name, "ordered": ordered, "deletes": [delete_doc]} @@ -1280,12 +1492,12 @@ class Collection(common.BaseObject, Generic[_DocumentType]): # Delete command. result = conn.command( - self.__database.name, + self._database.name, command, write_concern=write_concern, - codec_options=self.__write_response_codec_options, + codec_options=self._write_response_codec_options, session=session, - client=self.__database.client, + client=self._database.client, retryable_write=retryable_write, ) _check_write_command_response(result) @@ -1324,7 +1536,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): comment=comment, ) - return self.__database.client._retryable_write( + return self._database.client._retryable_write( (write_concern or self.write_concern).acknowledged and not multi, _delete, session, @@ -1711,7 +1923,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 3.6 """ # OP_MSG is required to support encryption. - if self.__database.client._encrypter: + if self._database.client._encrypter: raise InvalidOperation("find_raw_batches does not support auto encryption") return RawBatchCursor(self, *args, **kwargs) @@ -1731,7 +1943,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): cmd, read_preference=read_preference, allowable_errors=["ns missing"], - codec_options=self.__write_response_codec_options, + codec_options=self._write_response_codec_options, read_concern=self.read_concern, collation=collation, session=session, @@ -1754,7 +1966,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): cmd, read_preference, allowable_errors=[26], # Ignore NamespaceNotFound. - codec_options=self.__write_response_codec_options, + codec_options=self._write_response_codec_options, read_concern=self.read_concern, collation=collation, session=session, @@ -1803,7 +2015,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): conn: Connection, read_preference: Optional[_ServerMode], ) -> int: - cmd: dict[str, Any] = {"count": self.__name} + cmd: dict[str, Any] = {"count": self._name} cmd.update(kwargs) return self._count_cmd(session, conn, read_preference, cmd, collation=None) @@ -1879,9 +2091,9 @@ class Collection(common.BaseObject, Generic[_DocumentType]): if comment is not None: kwargs["comment"] = comment pipeline.append({"$group": {"_id": 1, "n": {"$sum": 1}}}) - cmd = {"aggregate": self.__name, "pipeline": pipeline, "cursor": {}} + cmd = {"aggregate": self._name, "pipeline": pipeline, "cursor": {}} if "hint" in kwargs and not isinstance(kwargs["hint"], str): - kwargs["hint"] = helpers._index_document(kwargs["hint"]) + kwargs["hint"] = helpers_shared._index_document(kwargs["hint"]) collation = validate_collation_or_none(kwargs.pop("collation", None)) cmd.update(kwargs) @@ -1900,12 +2112,15 @@ class Collection(common.BaseObject, Generic[_DocumentType]): def _retryable_non_cursor_read( self, - func: Callable[[Optional[ClientSession], Server, Connection, Optional[_ServerMode]], T], + func: Callable[ + [Optional[ClientSession], Server, Connection, Optional[_ServerMode]], + T, + ], session: Optional[ClientSession], operation: str, ) -> T: """Non-cursor read helper to handle implicit session creation.""" - client = self.__database.client + client = self._database.client with client._tmp_session(session) as s: return client._retryable_read(func, self._read_preference_for(s), s, operation) @@ -1954,10 +2169,10 @@ class Collection(common.BaseObject, Generic[_DocumentType]): common.validate_list("indexes", indexes) if comment is not None: kwargs["comment"] = comment - return self.__create_indexes(indexes, session, **kwargs) + return self._create_indexes(indexes, session, **kwargs) @_csot.apply - def __create_indexes( + def _create_indexes( self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any ) -> list[str]: """Internal createIndexes helper. @@ -2117,7 +2332,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): if comment is not None: cmd_options["comment"] = comment index = IndexModel(keys, **kwargs) - return self.__create_indexes([index], session, **cmd_options)[0] + return (self._create_indexes([index], session, **cmd_options))[0] def drop_indexes( self, @@ -2150,7 +2365,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): """ if comment is not None: kwargs["comment"] = comment - self.drop_index("*", session=session, **kwargs) + self._drop_index("*", session=session, **kwargs) @_csot.apply def drop_index( @@ -2199,14 +2414,24 @@ class Collection(common.BaseObject, Generic[_DocumentType]): when connected to MongoDB >= 3.4. """ + self._drop_index(index_or_name, session, comment, **kwargs) + + @_csot.apply + def _drop_index( + self, + index_or_name: _IndexKeyHint, + session: Optional[ClientSession] = None, + comment: Optional[Any] = None, + **kwargs: Any, + ) -> None: name = index_or_name if isinstance(index_or_name, list): - name = helpers._gen_index_name(index_or_name) + name = helpers_shared._gen_index_name(index_or_name) if not isinstance(name, str): raise TypeError("index_or_name must be an instance of str or list") - cmd = {"dropIndexes": self.__name, "index": name} + cmd = {"dropIndexes": self._name, "index": name} cmd.update(kwargs) if comment is not None: cmd["comment"] = comment @@ -2247,6 +2472,13 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 3.0 """ + return self._list_indexes(session, comment) + + def _list_indexes( + self, + session: Optional[ClientSession] = None, + comment: Optional[Any] = None, + ) -> CommandCursor[MutableMapping[str, Any]]: codec_options: CodecOptions = CodecOptions(SON) coll = cast( Collection[MutableMapping[str, Any]], @@ -2261,14 +2493,14 @@ class Collection(common.BaseObject, Generic[_DocumentType]): conn: Connection, read_preference: _ServerMode, ) -> CommandCursor[MutableMapping[str, Any]]: - cmd = {"listIndexes": self.__name, "cursor": {}} + cmd = {"listIndexes": self._name, "cursor": {}} if comment is not None: cmd["comment"] = comment try: - cursor = self._command(conn, cmd, read_preference, codec_options, session=session)[ - "cursor" - ] + cursor = ( + self._command(conn, cmd, read_preference, codec_options, session=session) + )["cursor"] except OperationFailure as exc: # Ignore NamespaceNotFound errors to match the behavior # of reading from *.system.indexes. @@ -2286,8 +2518,8 @@ class Collection(common.BaseObject, Generic[_DocumentType]): cmd_cursor._maybe_pin_connection(conn) return cmd_cursor - with self.__database.client._tmp_session(session, False) as s: - return self.__database.client._retryable_read( + with self._database.client._tmp_session(session, False) as s: + return self._database.client._retryable_read( _cmd, read_pref, s, operation=_Op.LIST_INDEXES ) @@ -2325,7 +2557,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionchanged:: 3.6 Added ``session`` parameter. """ - cursor = self.list_indexes(session=session, comment=comment) + cursor = self._list_indexes(session=session, comment=comment) info = {} for index in cursor: index["key"] = list(index["key"].items()) @@ -2378,7 +2610,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): user_fields={"cursor": {"firstBatch": 1}}, ) - return self.__database.client._retryable_read( + return self._database.client._retryable_read( cmd.get_cursor, cmd.get_read_preference(session), # type: ignore[arg-type] session, @@ -2414,7 +2646,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): """ if not isinstance(model, SearchIndexModel): model = SearchIndexModel(**model) - return self.create_search_indexes([model], session, comment, **kwargs)[0] + return (self._create_search_indexes([model], session, comment, **kwargs))[0] def create_search_indexes( self, @@ -2438,6 +2670,15 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 4.5 """ + return self._create_search_indexes(models, session, comment, **kwargs) + + def _create_search_indexes( + self, + models: list[SearchIndexModel], + session: Optional[ClientSession] = None, + comment: Optional[Any] = None, + **kwargs: Any, + ) -> list[str]: if comment is not None: kwargs["comment"] = comment @@ -2482,7 +2723,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 4.5 """ - cmd = {"dropSearchIndex": self.__name, "name": name} + cmd = {"dropSearchIndex": self._name, "name": name} cmd.update(kwargs) if comment is not None: cmd["comment"] = comment @@ -2518,7 +2759,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 4.5 """ - cmd = {"updateSearchIndex": self.__name, "name": name, "definition": definition} + cmd = {"updateSearchIndex": self._name, "name": name, "definition": definition} cmd.update(kwargs) if comment is not None: cmd["comment"] = comment @@ -2551,16 +2792,14 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionchanged:: 3.6 Added ``session`` parameter. """ - dbo = self.__database.client.get_database( - self.__database.name, + dbo = self._database.client.get_database( + self._database.name, self.codec_options, self.read_preference, self.write_concern, self.read_concern, ) - cursor = dbo.list_collections( - session=session, filter={"name": self.__name}, comment=comment - ) + cursor = dbo.list_collections(session=session, filter={"name": self._name}, comment=comment) result = None for doc in cursor: @@ -2601,7 +2840,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): user_fields={"cursor": {"firstBatch": 1}}, ) - return self.__database.client._retryable_read( + return self._database.client._retryable_read( cmd.get_cursor, cmd.get_read_preference(session), # type: ignore[arg-type] session, @@ -2692,7 +2931,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - with self.__database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session, close=False) as s: return self._aggregate( _CollectionAggregationCommand, pipeline, @@ -2735,11 +2974,11 @@ class Collection(common.BaseObject, Generic[_DocumentType]): .. versionadded:: 3.6 """ # OP_MSG is required to support encryption. - if self.__database.client._encrypter: + if self._database.client._encrypter: raise InvalidOperation("aggregate_raw_batches does not support auto encryption") if comment is not None: kwargs["comment"] = comment - with self.__database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session, close=False) as s: return cast( RawBatchCursor[_DocumentType], self._aggregate( @@ -2752,144 +2991,6 @@ class Collection(common.BaseObject, Generic[_DocumentType]): ), ) - def watch( - self, - pipeline: Optional[_Pipeline] = None, - full_document: Optional[str] = None, - resume_after: Optional[Mapping[str, Any]] = None, - max_await_time_ms: Optional[int] = None, - batch_size: Optional[int] = None, - collation: Optional[_CollationIn] = None, - start_at_operation_time: Optional[Timestamp] = None, - session: Optional[ClientSession] = None, - start_after: Optional[Mapping[str, Any]] = None, - comment: Optional[Any] = None, - full_document_before_change: Optional[str] = None, - show_expanded_events: Optional[bool] = None, - ) -> CollectionChangeStream[_DocumentType]: - """Watch changes on this collection. - - Performs an aggregation with an implicit initial ``$changeStream`` - stage and returns a - :class:`~pymongo.change_stream.CollectionChangeStream` cursor which - iterates over changes on this collection. - - .. code-block:: python - - with db.collection.watch() as stream: - for change in stream: - print(change) - - The :class:`~pymongo.change_stream.CollectionChangeStream` iterable - blocks until the next change document is returned or an error is - raised. If the - :meth:`~pymongo.change_stream.CollectionChangeStream.next` method - encounters a network error when retrieving a batch from the server, - it will automatically attempt to recreate the cursor such that no - change events are missed. Any error encountered during the resume - attempt indicates there may be an outage and will be raised. - - .. code-block:: python - - try: - with db.collection.watch([{"$match": {"operationType": "insert"}}]) as stream: - for insert_change in stream: - print(insert_change) - except pymongo.errors.PyMongoError: - # The ChangeStream encountered an unrecoverable error or the - # resume attempt failed to recreate the cursor. - logging.error("...") - - For a precise description of the resume process see the - `change streams specification`_. - - .. note:: Using this helper method is preferred to directly calling - :meth:`~pymongo.collection.Collection.aggregate` with a - ``$changeStream`` stage, for the purpose of supporting - resumability. - - .. warning:: This Collection's :attr:`read_concern` must be - ``ReadConcern("majority")`` in order to use the ``$changeStream`` - stage. - - :param pipeline: A list of aggregation pipeline stages to - append to an initial ``$changeStream`` stage. Not all - pipeline stages are valid after a ``$changeStream`` stage, see the - MongoDB documentation on change streams for the supported stages. - :param full_document: The fullDocument to pass as an option - to the ``$changeStream`` stage. Allowed values: 'updateLookup', - 'whenAvailable', 'required'. When set to 'updateLookup', the - change notification for partial updates will include both a delta - describing the changes to the document, as well as a copy of the - entire document that was changed from some time after the change - occurred. - :param full_document_before_change: Allowed values: 'whenAvailable' - and 'required'. Change events may now result in a - 'fullDocumentBeforeChange' response field. - :param resume_after: A resume token. If provided, the - change stream will start returning changes that occur directly - after the operation specified in the resume token. A resume token - is the _id value of a change document. - :param max_await_time_ms: The maximum time in milliseconds - for the server to wait for changes before responding to a getMore - operation. - :param batch_size: The maximum number of documents to return - per batch. - :param collation: The :class:`~pymongo.collation.Collation` - to use for the aggregation. - :param start_at_operation_time: If provided, the resulting - change stream will only return changes that occurred at or after - the specified :class:`~bson.timestamp.Timestamp`. Requires - MongoDB >= 4.0. - :param session: a - :class:`~pymongo.client_session.ClientSession`. - :param start_after: The same as `resume_after` except that - `start_after` can resume notifications after an invalidate event. - This option and `resume_after` are mutually exclusive. - :param comment: A user-provided comment to attach to this - command. - :param show_expanded_events: Include expanded events such as DDL events like `dropIndexes`. - - :return: A :class:`~pymongo.change_stream.CollectionChangeStream` cursor. - - .. versionchanged:: 4.3 - Added `show_expanded_events` parameter. - - .. versionchanged:: 4.2 - Added ``full_document_before_change`` parameter. - - .. versionchanged:: 4.1 - Added ``comment`` parameter. - - .. versionchanged:: 3.9 - Added the ``start_after`` parameter. - - .. versionchanged:: 3.7 - Added the ``start_at_operation_time`` parameter. - - .. versionadded:: 3.6 - - .. seealso:: The MongoDB documentation on `changeStreams `_. - - .. _change streams specification: - https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.md - """ - return CollectionChangeStream( - self, - pipeline, - full_document, - resume_after, - max_await_time_ms, - batch_size, - collation, - start_at_operation_time, - session, - start_after, - comment, - full_document_before_change, - show_expanded_events, - ) - @_csot.apply def rename( self, @@ -2936,22 +3037,22 @@ class Collection(common.BaseObject, Generic[_DocumentType]): if "$" in new_name and not new_name.startswith("oplog.$main"): raise InvalidName("collection names must not contain '$'") - new_name = f"{self.__database.name}.{new_name}" - cmd = {"renameCollection": self.__full_name, "to": new_name} + new_name = f"{self._database.name}.{new_name}" + cmd = {"renameCollection": self._full_name, "to": new_name} cmd.update(kwargs) if comment is not None: cmd["comment"] = comment write_concern = self._write_concern_for_cmd(cmd, session) with self._conn_for_writes(session, operation=_Op.RENAME) as conn: - with self.__database.client._tmp_session(session) as s: + with self._database.client._tmp_session(session) as s: return conn.command( "admin", cmd, write_concern=write_concern, parse_write_concern_error=True, session=s, - client=self.__database.client, + client=self._database.client, ) def distinct( @@ -2998,7 +3099,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): """ if not isinstance(key, str): raise TypeError("key must be an instance of str") - cmd = {"distinct": self.__name, "key": key} + cmd = {"distinct": self._name, "key": key} if filter is not None: if "query" in kwargs: raise ConfigurationError("can't pass both filter and query") @@ -3014,28 +3115,21 @@ class Collection(common.BaseObject, Generic[_DocumentType]): conn: Connection, read_preference: Optional[_ServerMode], ) -> list: - return self._command( - conn, - cmd, - read_preference=read_preference, - read_concern=self.read_concern, - collation=collation, - session=session, - user_fields={"values": 1}, + return ( + self._command( + conn, + cmd, + read_preference=read_preference, + read_concern=self.read_concern, + collation=collation, + session=session, + user_fields={"values": 1}, + ) )["values"] return self._retryable_non_cursor_read(_cmd, session, operation=_Op.DISTINCT) - def _write_concern_for_cmd( - self, cmd: Mapping[str, Any], session: Optional[ClientSession] - ) -> WriteConcern: - raw_wc = cmd.get("writeConcern") - if raw_wc is not None: - return WriteConcern(**raw_wc) - else: - return self._write_concern_for(session) - - def __find_and_modify( + def _find_and_modify( self, filter: Mapping[str, Any], projection: Optional[Union[Mapping[str, Any], Iterable[str]]], @@ -3055,25 +3149,25 @@ class Collection(common.BaseObject, Generic[_DocumentType]): "return_document must be ReturnDocument.BEFORE or ReturnDocument.AFTER" ) collation = validate_collation_or_none(kwargs.pop("collation", None)) - cmd = {"findAndModify": self.__name, "query": filter, "new": return_document} + cmd = {"findAndModify": self._name, "query": filter, "new": return_document} if let is not None: common.validate_is_mapping("let", let) cmd["let"] = let cmd.update(kwargs) if projection is not None: - cmd["fields"] = helpers._fields_list_to_dict(projection, "projection") + cmd["fields"] = helpers_shared._fields_list_to_dict(projection, "projection") if sort is not None: - cmd["sort"] = helpers._index_document(sort) + cmd["sort"] = helpers_shared._index_document(sort) if upsert is not None: validate_boolean("upsert", upsert) cmd["upsert"] = upsert if hint is not None: if not isinstance(hint, str): - hint = helpers._index_document(hint) + hint = helpers_shared._index_document(hint) write_concern = self._write_concern_for_cmd(cmd, session) - def _find_and_modify( + def _find_and_modify_helper( session: Optional[ClientSession], conn: Connection, retryable_write: bool ) -> Any: acknowledged = write_concern.acknowledged @@ -3107,9 +3201,9 @@ class Collection(common.BaseObject, Generic[_DocumentType]): return out.get("value") - return self.__database.client._retryable_write( + return self._database.client._retryable_write( write_concern.acknowledged, - _find_and_modify, + _find_and_modify_helper, session, operation=_Op.FIND_AND_MODIFY, ) @@ -3199,7 +3293,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): kwargs["remove"] = True if comment is not None: kwargs["comment"] = comment - return self.__find_and_modify( + return self._find_and_modify( filter, projection, sort, let=let, hint=hint, session=session, **kwargs ) @@ -3298,7 +3392,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): kwargs["update"] = replacement if comment is not None: kwargs["comment"] = comment - return self.__find_and_modify( + return self._find_and_modify( filter, projection, sort, @@ -3362,7 +3456,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): The *upsert* option can be used to create the document if it doesn't already exist. - >>> db.example.delete_many({}).deleted_count + >>> (db.example.delete_many({})).deleted_count 1 >>> db.example.find_one_and_update( ... {'_id': 'userid'}, @@ -3446,7 +3540,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): kwargs["update"] = update if comment is not None: kwargs["comment"] = comment - return self.__find_and_modify( + return self._find_and_modify( filter, projection, sort, @@ -3458,26 +3552,3 @@ class Collection(common.BaseObject, Generic[_DocumentType]): session=session, **kwargs, ) - - # See PYTHON-3084. - __iter__ = None - - def __next__(self) -> NoReturn: - raise TypeError("'Collection' object is not iterable") - - next = __next__ - - def __call__(self, *args: Any, **kwargs: Any) -> NoReturn: - """This is only here so that some API misusages are easier to debug.""" - if "." not in self.__name: - raise TypeError( - "'Collection' object is not callable. If you " - "meant to call the '%s' method on a 'Database' " - "object it is failing because no such method " - "exists." % self.__name - ) - raise TypeError( - "'Collection' object is not callable. If you meant to " - "call the '%s' method on a 'Collection' object it is " - "failing because no such method exists." % self.__name.split(".")[-1] - )