From cae124c32c6d1d9e1e32b5731fe23b511e9e45eb Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 28 Jun 2023 14:19:04 -0700 Subject: [PATCH] PYTHON-3588 Expose an API to create a cursor from a command response (#1263) --- .evergreen/resync-specs.sh | 5 +- CONTRIBUTING.rst | 3 +- doc/changelog.rst | 6 + pymongo/command_cursor.py | 21 +- pymongo/database.py | 109 ++- test/run_command/unified/runCommand.json | 635 +++++++++++++ .../run_command/unified/runCursorCommand.json | 877 ++++++++++++++++++ test/test_database.py | 20 + test/test_run_command.py | 17 + test/unified_format.py | 36 +- tox.ini | 2 + 11 files changed, 1723 insertions(+), 8 deletions(-) create mode 100644 test/run_command/unified/runCommand.json create mode 100644 test/run_command/unified/runCursorCommand.json create mode 100644 test/test_run_command.py diff --git a/.evergreen/resync-specs.sh b/.evergreen/resync-specs.sh index 817a2d96b..a74a0125e 100755 --- a/.evergreen/resync-specs.sh +++ b/.evergreen/resync-specs.sh @@ -56,7 +56,7 @@ cpjson () { cd "$SPECS"/source/$1 find . -name '*.json' | grep -Ev "${BLOCKLIST}" | cpio -pdm \ $PYMONGO/test/$2 - printf "\nIgnored files for ${PWD}\n" + printf "\nIgnored files for ${PWD}:\n" IGNORED_FILES="$(printf "\n%s\n" "$(diff <(find . -name '*.json' | sort) \ <(find . -name '*.json' | grep -Ev "${BLOCKLIST}" | sort))" | \ sed -e '/^[0-9]/d' | sed -e 's|< ./||g' )" @@ -126,6 +126,9 @@ do retryable-writes|retryable_writes) cpjson retryable-writes/tests/ retryable_writes ;; + run-command|run_command) + cpjson run-command/tests/ run_command + ;; sdam|SDAM|server-discovery-and-monitoring|discovery_and_monitoring) cpjson server-discovery-and-monitoring/tests/errors \ discovery_and_monitoring/errors diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index a457b3e4c..a897d0e06 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -98,7 +98,8 @@ use the script provided in ``.evergreen/resync-specs.sh``.:: git clone git@github.com:mongodb/specifications.git export MDB_SPECS=~/specifications cd ~/mongo-python-driver/.evergreen - ./resync-specs.sh -b "connection-string*" crud bson-corpus + ./resync-specs.sh -b "" spec1 spec2 ... + ./resync-specs.sh -b "connection-string*" crud bson-corpus # Updates crud and bson-corpus specs while ignoring all files with the regex "connection-string*" cd .. The ``-b`` flag adds as a regex pattern to block files you do not wish to diff --git a/doc/changelog.rst b/doc/changelog.rst index eae105b61..b112d3bc0 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,6 +1,12 @@ Changelog ========= +Changes in Version 4.5 +----------------------- + +- Added :meth:`~pymongo.database.Database.cursor_command` + and :meth:`~pymongo.command_cursor.CommandCursor.try_next` to support executing an arbitrary command that returns a cursor. + Changes in Version 4.4 ----------------------- diff --git a/pymongo/command_cursor.py b/pymongo/command_cursor.py index c831dfb49..7a2e52868 100644 --- a/pymongo/command_cursor.py +++ b/pymongo/command_cursor.py @@ -292,7 +292,7 @@ class CommandCursor(Generic[_DocumentType]): __next__ = next - def _try_next(self, get_more_allowed): + def _try_next(self, get_more_allowed: bool) -> Optional[_DocumentType]: """Advance the cursor blocking for at most one getMore command.""" if not len(self.__data) and not self.__killed and get_more_allowed: self._refresh() @@ -301,6 +301,25 @@ class CommandCursor(Generic[_DocumentType]): else: return None + def try_next(self) -> Optional[_DocumentType]: + """Advance the cursor without blocking indefinitely. + + This method returns the next document without waiting + indefinitely for data. + + If no document is cached locally then this method runs a single + getMore command. If the getMore yields any documents, the next + document is returned, otherwise, if the getMore returns no documents + (because there is no additional data) then ``None`` is returned. + + :Returns: + The next document or ``None`` when no document is available + after running a single getMore or when the cursor is closed. + + .. versionadded:: 4.5 + """ + return self._try_next(get_more_allowed=True) + def __enter__(self) -> "CommandCursor[_DocumentType]": return self diff --git a/pymongo/database.py b/pymongo/database.py index 1fa9913c6..7829c28fe 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -42,7 +42,7 @@ from pymongo.change_stream import DatabaseChangeStream from pymongo.collection import Collection from pymongo.command_cursor import CommandCursor from pymongo.common import _ecoc_coll_name, _esc_coll_name -from pymongo.errors import CollectionInvalid, InvalidName +from pymongo.errors import CollectionInvalid, InvalidName, InvalidOperation from pymongo.read_preferences import ReadPreference, _ServerMode from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline @@ -833,6 +833,113 @@ class Database(common.BaseObject, Generic[_DocumentType]): **kwargs, ) + @_csot.apply + def cursor_command( + self, + command: Union[str, MutableMapping[str, Any]], + value: Any = 1, + check: bool = True, + allowable_errors: Optional[Sequence[Union[str, int]]] = None, + read_preference: Optional[_ServerMode] = None, + codec_options: Optional[bson.codec_options.CodecOptions[_CodecDocumentType]] = None, + session: Optional[ClientSession] = None, + comment: Optional[Any] = None, + batch_size: Optional[int] = None, + max_time_ms: Optional[int] = None, + **kwargs: Any, + ) -> CommandCursor: + """Issue a MongoDB command and parse the response as a cursor. + + If the response from the server does not include a cursor field, an error will be thrown. + + Otherwise, behaves identically to issuing a normal MongoDB command. + + :Parameters: + - `command`: document representing the command to be issued, + or the name of the command (for simple commands only). + + .. note:: the order of keys in the `command` document is + significant (the "verb" must come first), so commands + which require multiple keys (e.g. `findandmodify`) + should use an instance of :class:`~bson.son.SON` or + a string and kwargs instead of a Python `dict`. + + - `value` (optional): value to use for the command verb when + `command` is passed as a string + - `check` (optional): check the response for errors, raising + :class:`~pymongo.errors.OperationFailure` if there are any + - `allowable_errors`: if `check` is ``True``, error messages + in this list will be ignored by error-checking + - `read_preference` (optional): The read preference for this + operation. See :mod:`~pymongo.read_preferences` for options. + If the provided `session` is in a transaction, defaults to the + read preference configured for the transaction. + Otherwise, defaults to + :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`. + - `codec_options`: A :class:`~bson.codec_options.CodecOptions` + instance. + - `session` (optional): A + :class:`~pymongo.client_session.ClientSession`. + - `comment` (optional): A user-provided comment to attach to this + command. + - `**kwargs` (optional): additional keyword arguments will + be added to the command document before it is sent + + + .. note:: :meth:`command` does **not** obey this Database's + :attr:`read_preference` or :attr:`codec_options`. You must use the + ``read_preference`` and ``codec_options`` parameters instead. + + .. note:: :meth:`command` does **not** apply any custom TypeDecoders + when decoding the command response. + + .. note:: If this client has been configured to use MongoDB Stable + API (see :ref:`versioned-api-ref`), then :meth:`command` will + automatically add API versioning options to the given command. + Explicitly adding API versioning options in the command and + declaring an API version on the client is not supported. + + .. seealso:: The MongoDB documentation on `commands `_. + """ + with self.__client._tmp_session(session, close=False) as tmp_session: + opts = codec_options or DEFAULT_CODEC_OPTIONS + + if read_preference is None: + read_preference = ( + tmp_session and tmp_session._txn_read_preference() + ) or ReadPreference.PRIMARY + with self.__client._socket_for_reads(read_preference, tmp_session) as ( + sock_info, + read_preference, + ): + response = self._command( + sock_info, + command, + value, + check, + allowable_errors, + read_preference, + opts, + session=tmp_session, + **kwargs, + ) + coll = self.get_collection("$cmd", read_preference=read_preference) + if response.get("cursor"): + cmd_cursor = CommandCursor( + coll, + response["cursor"], + sock_info.address, + batch_size=batch_size or 0, + max_await_time_ms=max_time_ms, + session=tmp_session, + explicit_session=session is not None, + comment=comment, + ) + cmd_cursor._maybe_pin_connection(sock_info) + return cmd_cursor + else: + raise InvalidOperation("Command does not return a cursor.") + def _retryable_read_command( self, command, diff --git a/test/run_command/unified/runCommand.json b/test/run_command/unified/runCommand.json new file mode 100644 index 000000000..007e514bd --- /dev/null +++ b/test/run_command/unified/runCommand.json @@ -0,0 +1,635 @@ +{ + "description": "runCommand", + "schemaVersion": "1.3", + "createEntities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "db" + } + }, + { + "collection": { + "id": "collection", + "database": "db", + "collectionName": "collection" + } + }, + { + "database": { + "id": "dbWithRC", + "client": "client", + "databaseName": "dbWithRC", + "databaseOptions": { + "readConcern": { + "level": "local" + } + } + } + }, + { + "database": { + "id": "dbWithWC", + "client": "client", + "databaseName": "dbWithWC", + "databaseOptions": { + "writeConcern": { + "w": 0 + } + } + } + }, + { + "session": { + "id": "session", + "client": "client" + } + }, + { + "client": { + "id": "clientWithStableApi", + "observeEvents": [ + "commandStartedEvent" + ], + "serverApi": { + "version": "1", + "strict": true + } + } + }, + { + "database": { + "id": "dbWithStableApi", + "client": "clientWithStableApi", + "databaseName": "dbWithStableApi" + } + } + ], + "initialData": [ + { + "collectionName": "collection", + "databaseName": "db", + "documents": [] + } + ], + "tests": [ + { + "description": "always attaches $db and implicit lsid to given command and omits default readPreference", + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$db": "db", + "lsid": { + "$$exists": true + }, + "$readPreference": { + "$$exists": false + } + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "always gossips the $clusterTime on the sent command", + "runOnRequirements": [ + { + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "commandName": "ping" + } + }, + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$clusterTime": { + "$$exists": true + } + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "attaches the provided session lsid to given command", + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + }, + "session": "session" + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "lsid": { + "$$sessionLsid": "session" + }, + "$db": "db" + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "attaches the provided $readPreference to given command", + "runOnRequirements": [ + { + "topologies": [ + "replicaset", + "sharded-replicaset", + "load-balanced", + "sharded" + ] + } + ], + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + }, + "readPreference": { + "mode": "nearest" + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$readPreference": { + "mode": "nearest" + }, + "$db": "db" + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "does not attach $readPreference to given command on standalone", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + }, + "readPreference": { + "mode": "nearest" + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$readPreference": { + "$$exists": false + }, + "$db": "db" + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "does not attach primary $readPreference to given command", + "operations": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + }, + "readPreference": { + "mode": "primary" + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$readPreference": { + "$$exists": false + }, + "$db": "db" + }, + "commandName": "ping" + } + } + ] + } + ] + }, + { + "description": "does not inherit readConcern specified at the db level", + "operations": [ + { + "name": "runCommand", + "object": "dbWithRC", + "arguments": { + "commandName": "aggregate", + "command": { + "aggregate": "collection", + "pipeline": [], + "cursor": {} + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection", + "readConcern": { + "$$exists": false + }, + "$db": "dbWithRC" + }, + "commandName": "aggregate" + } + } + ] + } + ] + }, + { + "description": "does not inherit writeConcern specified at the db level", + "operations": [ + { + "name": "runCommand", + "object": "dbWithWC", + "arguments": { + "commandName": "insert", + "command": { + "insert": "collection", + "documents": [ + { + "foo": "bar" + } + ], + "ordered": true + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection", + "writeConcern": { + "$$exists": false + }, + "$db": "dbWithWC" + }, + "commandName": "insert" + } + } + ] + } + ] + }, + { + "description": "does not retry retryable errors on given command", + "runOnRequirements": [ + { + "minServerVersion": "4.2" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "client", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "ping" + ], + "closeConnection": true + } + } + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ] + }, + { + "description": "attaches transaction fields to given command", + "runOnRequirements": [ + { + "minServerVersion": "4.0", + "topologies": [ + "replicaset" + ] + }, + { + "minServerVersion": "4.2", + "topologies": [ + "sharded-replicaset", + "load-balanced" + ] + } + ], + "operations": [ + { + "name": "withTransaction", + "object": "session", + "arguments": { + "callback": [ + { + "name": "runCommand", + "object": "db", + "arguments": { + "session": "session", + "commandName": "insert", + "command": { + "insert": "collection", + "documents": [ + { + "foo": "transaction" + } + ], + "ordered": true + } + }, + "expectResult": { + "$$unsetOrMatches": { + "insertedId": { + "$$unsetOrMatches": 1 + } + } + } + } + ] + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection", + "documents": [ + { + "foo": "transaction" + } + ], + "ordered": true, + "lsid": { + "$$sessionLsid": "session" + }, + "txnNumber": 1, + "startTransaction": true, + "autocommit": false, + "readConcern": { + "$$exists": false + }, + "writeConcern": { + "$$exists": false + } + }, + "commandName": "insert", + "databaseName": "db" + } + }, + { + "commandStartedEvent": { + "command": { + "commitTransaction": 1, + "lsid": { + "$$sessionLsid": "session" + }, + "txnNumber": 1, + "autocommit": false, + "writeConcern": { + "$$exists": false + }, + "readConcern": { + "$$exists": false + } + }, + "commandName": "commitTransaction", + "databaseName": "admin" + } + } + ] + } + ] + }, + { + "description": "attaches apiVersion fields to given command when stableApi is configured on the client", + "runOnRequirements": [ + { + "minServerVersion": "5.0" + } + ], + "operations": [ + { + "name": "runCommand", + "object": "dbWithStableApi", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "clientWithStableApi", + "events": [ + { + "commandStartedEvent": { + "command": { + "ping": 1, + "$db": "dbWithStableApi", + "apiVersion": "1", + "apiStrict": true, + "apiDeprecationErrors": { + "$$unsetOrMatches": false + } + }, + "commandName": "ping" + } + } + ] + } + ] + } + ] +} diff --git a/test/run_command/unified/runCursorCommand.json b/test/run_command/unified/runCursorCommand.json new file mode 100644 index 000000000..4f1ec8a01 --- /dev/null +++ b/test/run_command/unified/runCursorCommand.json @@ -0,0 +1,877 @@ +{ + "description": "runCursorCommand", + "schemaVersion": "1.9", + "createEntities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent", + "connectionReadyEvent", + "connectionCheckedOutEvent", + "connectionCheckedInEvent" + ] + } + }, + { + "session": { + "id": "session", + "client": "client" + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "db" + } + }, + { + "collection": { + "id": "collection", + "database": "db", + "collectionName": "collection" + } + } + ], + "initialData": [ + { + "collectionName": "collection", + "databaseName": "db", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 5, + "x": 55 + } + ] + } + ], + "tests": [ + { + "description": "successfully executes checkMetadataConsistency cursor creating command", + "runOnRequirements": [ + { + "minServerVersion": "7.0", + "topologies": [ + "sharded" + ] + } + ], + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "checkMetadataConsistency", + "command": { + "checkMetadataConsistency": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "checkMetadataConsistency": 1, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "checkMetadataConsistency" + } + } + ] + } + ] + }, + { + "description": "errors if the command response is not a cursor", + "operations": [ + { + "name": "createCommandCursor", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ] + }, + { + "description": "creates an implicit session that is reused across getMores", + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "find", + "command": { + "find": "collection", + "batchSize": 2 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 5, + "x": 55 + } + ] + }, + { + "name": "assertSameLsidOnLastTwoCommands", + "object": "testRunner", + "arguments": { + "client": "client" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "batchSize": 2, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "getMore" + } + } + ] + } + ] + }, + { + "description": "accepts an explicit session that is reused across getMores", + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "find", + "session": "session", + "command": { + "find": "collection", + "batchSize": 2 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 5, + "x": 55 + } + ] + }, + { + "name": "assertSameLsidOnLastTwoCommands", + "object": "testRunner", + "arguments": { + "client": "client" + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "batchSize": 2, + "$db": "db", + "lsid": { + "$$sessionLsid": "session" + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "$db": "db", + "lsid": { + "$$sessionLsid": "session" + } + }, + "commandName": "getMore" + } + } + ] + } + ] + }, + { + "description": "returns pinned connections to the pool when the cursor is exhausted", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ] + } + ], + "operations": [ + { + "name": "createCommandCursor", + "object": "db", + "arguments": { + "commandName": "find", + "batchSize": 2, + "session": "session", + "command": { + "find": "collection", + "batchSize": 2 + } + }, + "saveResultAsEntity": "cursor" + }, + { + "name": "assertNumberConnectionsCheckedOut", + "object": "testRunner", + "arguments": { + "client": "client", + "connections": 1 + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "cursor", + "expectResult": { + "_id": 1, + "x": 11 + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "cursor", + "expectResult": { + "_id": 2, + "x": 22 + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "cursor", + "expectResult": { + "_id": 3, + "x": 33 + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "cursor", + "expectResult": { + "_id": 4, + "x": 44 + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "cursor", + "expectResult": { + "_id": 5, + "x": 55 + } + }, + { + "name": "assertNumberConnectionsCheckedOut", + "object": "testRunner", + "arguments": { + "client": "client", + "connections": 0 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "batchSize": 2, + "$db": "db", + "lsid": { + "$$sessionLsid": "session" + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "$db": "db", + "lsid": { + "$$sessionLsid": "session" + } + }, + "commandName": "getMore" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "$db": "db", + "lsid": { + "$$sessionLsid": "session" + } + }, + "commandName": "getMore" + } + } + ] + }, + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionReadyEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} + } + ] + } + ] + }, + { + "description": "returns pinned connections to the pool when the cursor is closed", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ] + } + ], + "operations": [ + { + "name": "createCommandCursor", + "object": "db", + "arguments": { + "commandName": "find", + "command": { + "find": "collection", + "batchSize": 2 + } + }, + "saveResultAsEntity": "cursor" + }, + { + "name": "assertNumberConnectionsCheckedOut", + "object": "testRunner", + "arguments": { + "client": "client", + "connections": 1 + } + }, + { + "name": "close", + "object": "cursor" + }, + { + "name": "assertNumberConnectionsCheckedOut", + "object": "testRunner", + "arguments": { + "client": "client", + "connections": 0 + } + } + ] + }, + { + "description": "supports configuring getMore batchSize", + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "find", + "batchSize": 5, + "command": { + "find": "collection", + "batchSize": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 5, + "x": 55 + } + ] + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "batchSize": 1, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "batchSize": 5, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "getMore" + } + } + ] + } + ] + }, + { + "description": "supports configuring getMore maxTimeMS", + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "find", + "maxTimeMS": 300, + "command": { + "find": "collection", + "maxTimeMS": 200, + "batchSize": 1 + } + }, + "ignoreResultAndError": true + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "maxTimeMS": 200, + "batchSize": 1, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "$db": "db", + "maxTimeMS": 300, + "lsid": { + "$$exists": true + } + }, + "commandName": "getMore" + } + } + ] + } + ] + }, + { + "description": "supports configuring getMore comment", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "runCursorCommand", + "object": "db", + "arguments": { + "commandName": "find", + "comment": { + "hello": "getMore" + }, + "command": { + "find": "collection", + "batchSize": 1, + "comment": { + "hello": "find" + } + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 5, + "x": 55 + } + ] + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection", + "batchSize": 1, + "comment": { + "hello": "find" + }, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection", + "comment": { + "hello": "getMore" + }, + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "getMore" + } + } + ] + } + ] + }, + { + "description": "does not close the cursor when receiving an empty batch", + "runOnRequirements": [ + { + "serverless": "forbid" + } + ], + "operations": [ + { + "name": "dropCollection", + "object": "db", + "arguments": { + "collection": "cappedCollection" + } + }, + { + "name": "createCollection", + "object": "db", + "arguments": { + "collection": "cappedCollection", + "capped": true, + "size": 4096, + "max": 3 + }, + "saveResultAsEntity": "cappedCollection" + }, + { + "name": "insertMany", + "object": "cappedCollection", + "arguments": { + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + } + ] + } + }, + { + "name": "createCommandCursor", + "object": "db", + "arguments": { + "cursorType": "tailable", + "commandName": "find", + "batchSize": 2, + "command": { + "find": "cappedCollection", + "tailable": true + } + }, + "saveResultAsEntity": "cursor" + }, + { + "name": "iterateOnce", + "object": "cursor" + }, + { + "name": "iterateOnce", + "object": "cursor" + }, + { + "name": "iterateOnce", + "object": "cursor" + }, + { + "name": "close", + "object": "cursor" + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "drop": "cappedCollection" + }, + "commandName": "drop" + } + }, + { + "commandStartedEvent": { + "command": { + "create": "cappedCollection" + }, + "commandName": "create" + } + }, + { + "commandStartedEvent": { + "command": { + "insert": "cappedCollection" + }, + "commandName": "insert" + } + }, + { + "commandStartedEvent": { + "command": { + "find": "cappedCollection", + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "find" + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "cappedCollection", + "$db": "db", + "lsid": { + "$$exists": true + } + }, + "commandName": "getMore" + } + }, + { + "commandStartedEvent": { + "command": { + "killCursors": "cappedCollection", + "cursors": { + "$$type": "array" + } + }, + "commandName": "killCursors" + } + } + ] + } + ] + } + ] +} diff --git a/test/test_database.py b/test/test_database.py index 140d169db..041b339e6 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -18,6 +18,8 @@ import re import sys from typing import Any, Iterable, List, Mapping, Union +from pymongo.command_cursor import CommandCursor + sys.path[0:0] = [""] from test import IntegrationTest, client_context, unittest @@ -42,6 +44,7 @@ from pymongo.errors import ( CollectionInvalid, ExecutionTimeout, InvalidName, + InvalidOperation, OperationFailure, WriteConcernError, ) @@ -407,6 +410,23 @@ class TestDatabase(IntegrationTest): for doc in result["cursor"]["firstBatch"]: self.assertTrue(isinstance(doc["r"], Regex)) + def test_cursor_command(self): + db = self.client.pymongo_test + db.test.drop() + + docs = [{"_id": i, "doc": i} for i in range(3)] + db.test.insert_many(docs) + + cursor = db.cursor_command("find", "test") + + self.assertIsInstance(cursor, CommandCursor) + + result_docs = list(cursor) + self.assertEqual(docs, result_docs) + + def test_cursor_command_invalid(self): + self.assertRaises(InvalidOperation, self.db.cursor_command, "usersInfo", "test") + def test_password_digest(self): self.assertRaises(TypeError, auth._password_digest, 5) self.assertRaises(TypeError, auth._password_digest, True) diff --git a/test/test_run_command.py b/test/test_run_command.py new file mode 100644 index 000000000..848fd2cb9 --- /dev/null +++ b/test/test_run_command.py @@ -0,0 +1,17 @@ +import os +import unittest +from test.unified_format import generate_test_classes + +_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "run_command") + + +globals().update( + generate_test_classes( + os.path.join(_TEST_PATH, "unified"), + module=__name__, + ) +) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/unified_format.py b/test/unified_format.py index 90cb442b2..72db9e7d4 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -64,10 +64,11 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.objectid import ObjectId from bson.regex import RE_TYPE, Regex from gridfs import GridFSBucket, GridOut -from pymongo import ASCENDING, MongoClient, _csot +from pymongo import ASCENDING, CursorType, MongoClient, _csot from pymongo.change_stream import ChangeStream from pymongo.client_session import ClientSession, TransactionOptions, _TxnState from pymongo.collection import Collection +from pymongo.command_cursor import CommandCursor from pymongo.database import Database from pymongo.encryption import ClientEncryption from pymongo.encryption_options import _HAVE_PYMONGOCRYPT @@ -1087,6 +1088,31 @@ class UnifiedSpecTestMixinV1(IntegrationTest): kwargs["command"] = ordered_command return target.command(**kwargs) + def _databaseOperation_runCursorCommand(self, target, **kwargs): + return list(self._databaseOperation_createCommandCursor(target, **kwargs)) + + def _databaseOperation_createCommandCursor(self, target, **kwargs): + self.__raise_if_unsupported("createCommandCursor", target, Database) + # Ensure the first key is the command name. + ordered_command = SON([(kwargs.pop("command_name"), 1)]) + ordered_command.update(kwargs["command"]) + kwargs["command"] = ordered_command + + cursor_type = kwargs.pop("cursor_type", "nonTailable") + if cursor_type == CursorType.TAILABLE: + ordered_command["tailable"] = True + elif cursor_type == CursorType.TAILABLE_AWAIT: + ordered_command["tailable"] = True + ordered_command["awaitData"] = True + elif cursor_type != "nonTailable": + self.fail(f"unknown cursorType: {cursor_type}") + + if "maxTimeMS" in kwargs: + kwargs["max_time_ms"] = kwargs["maxTimeMS"] + del kwargs["maxTimeMS"] + + return target.cursor_command(**kwargs) + def _databaseOperation_listCollections(self, target, *args, **kwargs): if "batch_size" in kwargs: kwargs["cursor"] = {"batchSize": kwargs.pop("batch_size")} @@ -1150,7 +1176,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest): return next(target) def _cursor_iterateUntilDocumentOrError(self, target, *args, **kwargs): - self.__raise_if_unsupported("iterateUntilDocumentOrError", target, NonLazyCursor) + self.__raise_if_unsupported( + "iterateUntilDocumentOrError", target, NonLazyCursor, CommandCursor + ) while target.alive: try: return next(target) @@ -1159,7 +1187,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): return None def _cursor_close(self, target, *args, **kwargs): - self.__raise_if_unsupported("close", target, NonLazyCursor) + self.__raise_if_unsupported("close", target, NonLazyCursor, CommandCursor) return target.close() def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs): @@ -1250,7 +1278,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): doc.setdefault("metadata", {})["contentType"] = doc.pop("contentType") elif isinstance(target, ChangeStream): method_name = f"_changeStreamOperation_{opname}" - elif isinstance(target, NonLazyCursor): + elif isinstance(target, (NonLazyCursor, CommandCursor)): method_name = f"_cursor_{opname}" elif isinstance(target, ClientSession): method_name = f"_sessionOperation_{opname}" diff --git a/tox.ini b/tox.ini index bdabf1770..240126f8a 100644 --- a/tox.ini +++ b/tox.ini @@ -91,6 +91,8 @@ commands = [testenv:typecheck] description = run mypy and pyright to typecheck +extras = + {[testenv:typecheck-mypy]extras} deps = {[testenv:typecheck-mypy]deps} {[testenv:typecheck-pyright]deps}