PYTHON-3588 Expose an API to create a cursor from a command response (#1263)

This commit is contained in:
Noah Stapp 2023-06-28 14:19:04 -07:00 committed by GitHub
parent 820823891d
commit cae124c32c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1723 additions and 8 deletions

View File

@ -56,7 +56,7 @@ cpjson () {
cd "$SPECS"/source/$1
find . -name '*.json' | grep -Ev "${BLOCKLIST}" | cpio -pdm \
$PYMONGO/test/$2
printf "\nIgnored files for ${PWD}\n"
printf "\nIgnored files for ${PWD}:\n"
IGNORED_FILES="$(printf "\n%s\n" "$(diff <(find . -name '*.json' | sort) \
<(find . -name '*.json' | grep -Ev "${BLOCKLIST}" | sort))" | \
sed -e '/^[0-9]/d' | sed -e 's|< ./||g' )"
@ -126,6 +126,9 @@ do
retryable-writes|retryable_writes)
cpjson retryable-writes/tests/ retryable_writes
;;
run-command|run_command)
cpjson run-command/tests/ run_command
;;
sdam|SDAM|server-discovery-and-monitoring|discovery_and_monitoring)
cpjson server-discovery-and-monitoring/tests/errors \
discovery_and_monitoring/errors

View File

@ -98,7 +98,8 @@ use the script provided in ``.evergreen/resync-specs.sh``.::
git clone git@github.com:mongodb/specifications.git
export MDB_SPECS=~/specifications
cd ~/mongo-python-driver/.evergreen
./resync-specs.sh -b "connection-string*" crud bson-corpus
./resync-specs.sh -b "<regex>" spec1 spec2 ...
./resync-specs.sh -b "connection-string*" crud bson-corpus # Updates crud and bson-corpus specs while ignoring all files with the regex "connection-string*"
cd ..
The ``-b`` flag adds as a regex pattern to block files you do not wish to

View File

@ -1,6 +1,12 @@
Changelog
=========
Changes in Version 4.5
-----------------------
- Added :meth:`~pymongo.database.Database.cursor_command`
and :meth:`~pymongo.command_cursor.CommandCursor.try_next` to support executing an arbitrary command that returns a cursor.
Changes in Version 4.4
-----------------------

View File

@ -292,7 +292,7 @@ class CommandCursor(Generic[_DocumentType]):
__next__ = next
def _try_next(self, get_more_allowed):
def _try_next(self, get_more_allowed: bool) -> Optional[_DocumentType]:
"""Advance the cursor blocking for at most one getMore command."""
if not len(self.__data) and not self.__killed and get_more_allowed:
self._refresh()
@ -301,6 +301,25 @@ class CommandCursor(Generic[_DocumentType]):
else:
return None
def try_next(self) -> Optional[_DocumentType]:
"""Advance the cursor without blocking indefinitely.
This method returns the next document without waiting
indefinitely for data.
If no document is cached locally then this method runs a single
getMore command. If the getMore yields any documents, the next
document is returned, otherwise, if the getMore returns no documents
(because there is no additional data) then ``None`` is returned.
:Returns:
The next document or ``None`` when no document is available
after running a single getMore or when the cursor is closed.
.. versionadded:: 4.5
"""
return self._try_next(get_more_allowed=True)
def __enter__(self) -> "CommandCursor[_DocumentType]":
return self

View File

@ -42,7 +42,7 @@ from pymongo.change_stream import DatabaseChangeStream
from pymongo.collection import Collection
from pymongo.command_cursor import CommandCursor
from pymongo.common import _ecoc_coll_name, _esc_coll_name
from pymongo.errors import CollectionInvalid, InvalidName
from pymongo.errors import CollectionInvalid, InvalidName, InvalidOperation
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
@ -833,6 +833,113 @@ class Database(common.BaseObject, Generic[_DocumentType]):
**kwargs,
)
@_csot.apply
def cursor_command(
self,
command: Union[str, MutableMapping[str, Any]],
value: Any = 1,
check: bool = True,
allowable_errors: Optional[Sequence[Union[str, int]]] = None,
read_preference: Optional[_ServerMode] = None,
codec_options: Optional[bson.codec_options.CodecOptions[_CodecDocumentType]] = None,
session: Optional[ClientSession] = None,
comment: Optional[Any] = None,
batch_size: Optional[int] = None,
max_time_ms: Optional[int] = None,
**kwargs: Any,
) -> CommandCursor:
"""Issue a MongoDB command and parse the response as a cursor.
If the response from the server does not include a cursor field, an error will be thrown.
Otherwise, behaves identically to issuing a normal MongoDB command.
:Parameters:
- `command`: document representing the command to be issued,
or the name of the command (for simple commands only).
.. note:: the order of keys in the `command` document is
significant (the "verb" must come first), so commands
which require multiple keys (e.g. `findandmodify`)
should use an instance of :class:`~bson.son.SON` or
a string and kwargs instead of a Python `dict`.
- `value` (optional): value to use for the command verb when
`command` is passed as a string
- `check` (optional): check the response for errors, raising
:class:`~pymongo.errors.OperationFailure` if there are any
- `allowable_errors`: if `check` is ``True``, error messages
in this list will be ignored by error-checking
- `read_preference` (optional): The read preference for this
operation. See :mod:`~pymongo.read_preferences` for options.
If the provided `session` is in a transaction, defaults to the
read preference configured for the transaction.
Otherwise, defaults to
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
- `codec_options`: A :class:`~bson.codec_options.CodecOptions`
instance.
- `session` (optional): A
:class:`~pymongo.client_session.ClientSession`.
- `comment` (optional): A user-provided comment to attach to this
command.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
.. note:: :meth:`command` does **not** obey this Database's
:attr:`read_preference` or :attr:`codec_options`. You must use the
``read_preference`` and ``codec_options`` parameters instead.
.. note:: :meth:`command` does **not** apply any custom TypeDecoders
when decoding the command response.
.. note:: If this client has been configured to use MongoDB Stable
API (see :ref:`versioned-api-ref`), then :meth:`command` will
automatically add API versioning options to the given command.
Explicitly adding API versioning options in the command and
declaring an API version on the client is not supported.
.. seealso:: The MongoDB documentation on `commands <https://dochub.mongodb.org/core/commands>`_.
"""
with self.__client._tmp_session(session, close=False) as tmp_session:
opts = codec_options or DEFAULT_CODEC_OPTIONS
if read_preference is None:
read_preference = (
tmp_session and tmp_session._txn_read_preference()
) or ReadPreference.PRIMARY
with self.__client._socket_for_reads(read_preference, tmp_session) as (
sock_info,
read_preference,
):
response = self._command(
sock_info,
command,
value,
check,
allowable_errors,
read_preference,
opts,
session=tmp_session,
**kwargs,
)
coll = self.get_collection("$cmd", read_preference=read_preference)
if response.get("cursor"):
cmd_cursor = CommandCursor(
coll,
response["cursor"],
sock_info.address,
batch_size=batch_size or 0,
max_await_time_ms=max_time_ms,
session=tmp_session,
explicit_session=session is not None,
comment=comment,
)
cmd_cursor._maybe_pin_connection(sock_info)
return cmd_cursor
else:
raise InvalidOperation("Command does not return a cursor.")
def _retryable_read_command(
self,
command,

View File

@ -0,0 +1,635 @@
{
"description": "runCommand",
"schemaVersion": "1.3",
"createEntities": [
{
"client": {
"id": "client",
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "db"
}
},
{
"collection": {
"id": "collection",
"database": "db",
"collectionName": "collection"
}
},
{
"database": {
"id": "dbWithRC",
"client": "client",
"databaseName": "dbWithRC",
"databaseOptions": {
"readConcern": {
"level": "local"
}
}
}
},
{
"database": {
"id": "dbWithWC",
"client": "client",
"databaseName": "dbWithWC",
"databaseOptions": {
"writeConcern": {
"w": 0
}
}
}
},
{
"session": {
"id": "session",
"client": "client"
}
},
{
"client": {
"id": "clientWithStableApi",
"observeEvents": [
"commandStartedEvent"
],
"serverApi": {
"version": "1",
"strict": true
}
}
},
{
"database": {
"id": "dbWithStableApi",
"client": "clientWithStableApi",
"databaseName": "dbWithStableApi"
}
}
],
"initialData": [
{
"collectionName": "collection",
"databaseName": "db",
"documents": []
}
],
"tests": [
{
"description": "always attaches $db and implicit lsid to given command and omits default readPreference",
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$db": "db",
"lsid": {
"$$exists": true
},
"$readPreference": {
"$$exists": false
}
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "always gossips the $clusterTime on the sent command",
"runOnRequirements": [
{
"topologies": [
"replicaset",
"sharded"
]
}
],
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"commandName": "ping"
}
},
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$clusterTime": {
"$$exists": true
}
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "attaches the provided session lsid to given command",
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
},
"session": "session"
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"lsid": {
"$$sessionLsid": "session"
},
"$db": "db"
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "attaches the provided $readPreference to given command",
"runOnRequirements": [
{
"topologies": [
"replicaset",
"sharded-replicaset",
"load-balanced",
"sharded"
]
}
],
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
},
"readPreference": {
"mode": "nearest"
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$readPreference": {
"mode": "nearest"
},
"$db": "db"
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "does not attach $readPreference to given command on standalone",
"runOnRequirements": [
{
"topologies": [
"single"
]
}
],
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
},
"readPreference": {
"mode": "nearest"
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$readPreference": {
"$$exists": false
},
"$db": "db"
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "does not attach primary $readPreference to given command",
"operations": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
},
"readPreference": {
"mode": "primary"
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$readPreference": {
"$$exists": false
},
"$db": "db"
},
"commandName": "ping"
}
}
]
}
]
},
{
"description": "does not inherit readConcern specified at the db level",
"operations": [
{
"name": "runCommand",
"object": "dbWithRC",
"arguments": {
"commandName": "aggregate",
"command": {
"aggregate": "collection",
"pipeline": [],
"cursor": {}
}
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection",
"readConcern": {
"$$exists": false
},
"$db": "dbWithRC"
},
"commandName": "aggregate"
}
}
]
}
]
},
{
"description": "does not inherit writeConcern specified at the db level",
"operations": [
{
"name": "runCommand",
"object": "dbWithWC",
"arguments": {
"commandName": "insert",
"command": {
"insert": "collection",
"documents": [
{
"foo": "bar"
}
],
"ordered": true
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "collection",
"writeConcern": {
"$$exists": false
},
"$db": "dbWithWC"
},
"commandName": "insert"
}
}
]
}
]
},
{
"description": "does not retry retryable errors on given command",
"runOnRequirements": [
{
"minServerVersion": "4.2"
}
],
"operations": [
{
"name": "failPoint",
"object": "testRunner",
"arguments": {
"client": "client",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"ping"
],
"closeConnection": true
}
}
}
},
{
"name": "runCommand",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectError": {
"isClientError": true
}
}
]
},
{
"description": "attaches transaction fields to given command",
"runOnRequirements": [
{
"minServerVersion": "4.0",
"topologies": [
"replicaset"
]
},
{
"minServerVersion": "4.2",
"topologies": [
"sharded-replicaset",
"load-balanced"
]
}
],
"operations": [
{
"name": "withTransaction",
"object": "session",
"arguments": {
"callback": [
{
"name": "runCommand",
"object": "db",
"arguments": {
"session": "session",
"commandName": "insert",
"command": {
"insert": "collection",
"documents": [
{
"foo": "transaction"
}
],
"ordered": true
}
},
"expectResult": {
"$$unsetOrMatches": {
"insertedId": {
"$$unsetOrMatches": 1
}
}
}
}
]
}
}
],
"expectEvents": [
{
"client": "client",
"events": [
{
"commandStartedEvent": {
"command": {
"insert": "collection",
"documents": [
{
"foo": "transaction"
}
],
"ordered": true,
"lsid": {
"$$sessionLsid": "session"
},
"txnNumber": 1,
"startTransaction": true,
"autocommit": false,
"readConcern": {
"$$exists": false
},
"writeConcern": {
"$$exists": false
}
},
"commandName": "insert",
"databaseName": "db"
}
},
{
"commandStartedEvent": {
"command": {
"commitTransaction": 1,
"lsid": {
"$$sessionLsid": "session"
},
"txnNumber": 1,
"autocommit": false,
"writeConcern": {
"$$exists": false
},
"readConcern": {
"$$exists": false
}
},
"commandName": "commitTransaction",
"databaseName": "admin"
}
}
]
}
]
},
{
"description": "attaches apiVersion fields to given command when stableApi is configured on the client",
"runOnRequirements": [
{
"minServerVersion": "5.0"
}
],
"operations": [
{
"name": "runCommand",
"object": "dbWithStableApi",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectResult": {
"ok": 1
}
}
],
"expectEvents": [
{
"client": "clientWithStableApi",
"events": [
{
"commandStartedEvent": {
"command": {
"ping": 1,
"$db": "dbWithStableApi",
"apiVersion": "1",
"apiStrict": true,
"apiDeprecationErrors": {
"$$unsetOrMatches": false
}
},
"commandName": "ping"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,877 @@
{
"description": "runCursorCommand",
"schemaVersion": "1.9",
"createEntities": [
{
"client": {
"id": "client",
"useMultipleMongoses": false,
"observeEvents": [
"commandStartedEvent",
"connectionReadyEvent",
"connectionCheckedOutEvent",
"connectionCheckedInEvent"
]
}
},
{
"session": {
"id": "session",
"client": "client"
}
},
{
"database": {
"id": "db",
"client": "client",
"databaseName": "db"
}
},
{
"collection": {
"id": "collection",
"database": "db",
"collectionName": "collection"
}
}
],
"initialData": [
{
"collectionName": "collection",
"databaseName": "db",
"documents": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
},
{
"_id": 4,
"x": 44
},
{
"_id": 5,
"x": 55
}
]
}
],
"tests": [
{
"description": "successfully executes checkMetadataConsistency cursor creating command",
"runOnRequirements": [
{
"minServerVersion": "7.0",
"topologies": [
"sharded"
]
}
],
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "checkMetadataConsistency",
"command": {
"checkMetadataConsistency": 1
}
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"checkMetadataConsistency": 1,
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "checkMetadataConsistency"
}
}
]
}
]
},
{
"description": "errors if the command response is not a cursor",
"operations": [
{
"name": "createCommandCursor",
"object": "db",
"arguments": {
"commandName": "ping",
"command": {
"ping": 1
}
},
"expectError": {
"isClientError": true
}
}
]
},
{
"description": "creates an implicit session that is reused across getMores",
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "find",
"command": {
"find": "collection",
"batchSize": 2
}
},
"expectResult": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
},
{
"_id": 4,
"x": 44
},
{
"_id": 5,
"x": 55
}
]
},
{
"name": "assertSameLsidOnLastTwoCommands",
"object": "testRunner",
"arguments": {
"client": "client"
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"batchSize": 2,
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "getMore"
}
}
]
}
]
},
{
"description": "accepts an explicit session that is reused across getMores",
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "find",
"session": "session",
"command": {
"find": "collection",
"batchSize": 2
}
},
"expectResult": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
},
{
"_id": 4,
"x": 44
},
{
"_id": 5,
"x": 55
}
]
},
{
"name": "assertSameLsidOnLastTwoCommands",
"object": "testRunner",
"arguments": {
"client": "client"
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"batchSize": 2,
"$db": "db",
"lsid": {
"$$sessionLsid": "session"
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"$db": "db",
"lsid": {
"$$sessionLsid": "session"
}
},
"commandName": "getMore"
}
}
]
}
]
},
{
"description": "returns pinned connections to the pool when the cursor is exhausted",
"runOnRequirements": [
{
"topologies": [
"load-balanced"
]
}
],
"operations": [
{
"name": "createCommandCursor",
"object": "db",
"arguments": {
"commandName": "find",
"batchSize": 2,
"session": "session",
"command": {
"find": "collection",
"batchSize": 2
}
},
"saveResultAsEntity": "cursor"
},
{
"name": "assertNumberConnectionsCheckedOut",
"object": "testRunner",
"arguments": {
"client": "client",
"connections": 1
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "cursor",
"expectResult": {
"_id": 1,
"x": 11
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "cursor",
"expectResult": {
"_id": 2,
"x": 22
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "cursor",
"expectResult": {
"_id": 3,
"x": 33
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "cursor",
"expectResult": {
"_id": 4,
"x": 44
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "cursor",
"expectResult": {
"_id": 5,
"x": 55
}
},
{
"name": "assertNumberConnectionsCheckedOut",
"object": "testRunner",
"arguments": {
"client": "client",
"connections": 0
}
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"batchSize": 2,
"$db": "db",
"lsid": {
"$$sessionLsid": "session"
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"$db": "db",
"lsid": {
"$$sessionLsid": "session"
}
},
"commandName": "getMore"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"$db": "db",
"lsid": {
"$$sessionLsid": "session"
}
},
"commandName": "getMore"
}
}
]
},
{
"client": "client",
"eventType": "cmap",
"events": [
{
"connectionReadyEvent": {}
},
{
"connectionCheckedOutEvent": {}
},
{
"connectionCheckedInEvent": {}
}
]
}
]
},
{
"description": "returns pinned connections to the pool when the cursor is closed",
"runOnRequirements": [
{
"topologies": [
"load-balanced"
]
}
],
"operations": [
{
"name": "createCommandCursor",
"object": "db",
"arguments": {
"commandName": "find",
"command": {
"find": "collection",
"batchSize": 2
}
},
"saveResultAsEntity": "cursor"
},
{
"name": "assertNumberConnectionsCheckedOut",
"object": "testRunner",
"arguments": {
"client": "client",
"connections": 1
}
},
{
"name": "close",
"object": "cursor"
},
{
"name": "assertNumberConnectionsCheckedOut",
"object": "testRunner",
"arguments": {
"client": "client",
"connections": 0
}
}
]
},
{
"description": "supports configuring getMore batchSize",
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "find",
"batchSize": 5,
"command": {
"find": "collection",
"batchSize": 1
}
},
"expectResult": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
},
{
"_id": 4,
"x": 44
},
{
"_id": 5,
"x": 55
}
]
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"batchSize": 1,
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"batchSize": 5,
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "getMore"
}
}
]
}
]
},
{
"description": "supports configuring getMore maxTimeMS",
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "find",
"maxTimeMS": 300,
"command": {
"find": "collection",
"maxTimeMS": 200,
"batchSize": 1
}
},
"ignoreResultAndError": true
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"ignoreExtraEvents": true,
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"maxTimeMS": 200,
"batchSize": 1,
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"$db": "db",
"maxTimeMS": 300,
"lsid": {
"$$exists": true
}
},
"commandName": "getMore"
}
}
]
}
]
},
{
"description": "supports configuring getMore comment",
"runOnRequirements": [
{
"minServerVersion": "4.4"
}
],
"operations": [
{
"name": "runCursorCommand",
"object": "db",
"arguments": {
"commandName": "find",
"comment": {
"hello": "getMore"
},
"command": {
"find": "collection",
"batchSize": 1,
"comment": {
"hello": "find"
}
}
},
"expectResult": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
},
{
"_id": 3,
"x": 33
},
{
"_id": 4,
"x": 44
},
{
"_id": 5,
"x": 55
}
]
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"find": "collection",
"batchSize": 1,
"comment": {
"hello": "find"
},
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "collection",
"comment": {
"hello": "getMore"
},
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "getMore"
}
}
]
}
]
},
{
"description": "does not close the cursor when receiving an empty batch",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "dropCollection",
"object": "db",
"arguments": {
"collection": "cappedCollection"
}
},
{
"name": "createCollection",
"object": "db",
"arguments": {
"collection": "cappedCollection",
"capped": true,
"size": 4096,
"max": 3
},
"saveResultAsEntity": "cappedCollection"
},
{
"name": "insertMany",
"object": "cappedCollection",
"arguments": {
"documents": [
{
"_id": 1,
"x": 11
},
{
"_id": 2,
"x": 22
}
]
}
},
{
"name": "createCommandCursor",
"object": "db",
"arguments": {
"cursorType": "tailable",
"commandName": "find",
"batchSize": 2,
"command": {
"find": "cappedCollection",
"tailable": true
}
},
"saveResultAsEntity": "cursor"
},
{
"name": "iterateOnce",
"object": "cursor"
},
{
"name": "iterateOnce",
"object": "cursor"
},
{
"name": "iterateOnce",
"object": "cursor"
},
{
"name": "close",
"object": "cursor"
}
],
"expectEvents": [
{
"client": "client",
"eventType": "command",
"events": [
{
"commandStartedEvent": {
"command": {
"drop": "cappedCollection"
},
"commandName": "drop"
}
},
{
"commandStartedEvent": {
"command": {
"create": "cappedCollection"
},
"commandName": "create"
}
},
{
"commandStartedEvent": {
"command": {
"insert": "cappedCollection"
},
"commandName": "insert"
}
},
{
"commandStartedEvent": {
"command": {
"find": "cappedCollection",
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"getMore": {
"$$type": [
"int",
"long"
]
},
"collection": "cappedCollection",
"$db": "db",
"lsid": {
"$$exists": true
}
},
"commandName": "getMore"
}
},
{
"commandStartedEvent": {
"command": {
"killCursors": "cappedCollection",
"cursors": {
"$$type": "array"
}
},
"commandName": "killCursors"
}
}
]
}
]
}
]
}

View File

@ -18,6 +18,8 @@ import re
import sys
from typing import Any, Iterable, List, Mapping, Union
from pymongo.command_cursor import CommandCursor
sys.path[0:0] = [""]
from test import IntegrationTest, client_context, unittest
@ -42,6 +44,7 @@ from pymongo.errors import (
CollectionInvalid,
ExecutionTimeout,
InvalidName,
InvalidOperation,
OperationFailure,
WriteConcernError,
)
@ -407,6 +410,23 @@ class TestDatabase(IntegrationTest):
for doc in result["cursor"]["firstBatch"]:
self.assertTrue(isinstance(doc["r"], Regex))
def test_cursor_command(self):
db = self.client.pymongo_test
db.test.drop()
docs = [{"_id": i, "doc": i} for i in range(3)]
db.test.insert_many(docs)
cursor = db.cursor_command("find", "test")
self.assertIsInstance(cursor, CommandCursor)
result_docs = list(cursor)
self.assertEqual(docs, result_docs)
def test_cursor_command_invalid(self):
self.assertRaises(InvalidOperation, self.db.cursor_command, "usersInfo", "test")
def test_password_digest(self):
self.assertRaises(TypeError, auth._password_digest, 5)
self.assertRaises(TypeError, auth._password_digest, True)

17
test/test_run_command.py Normal file
View File

@ -0,0 +1,17 @@
import os
import unittest
from test.unified_format import generate_test_classes
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "run_command")
globals().update(
generate_test_classes(
os.path.join(_TEST_PATH, "unified"),
module=__name__,
)
)
if __name__ == "__main__":
unittest.main()

View File

@ -64,10 +64,11 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS
from bson.objectid import ObjectId
from bson.regex import RE_TYPE, Regex
from gridfs import GridFSBucket, GridOut
from pymongo import ASCENDING, MongoClient, _csot
from pymongo import ASCENDING, CursorType, MongoClient, _csot
from pymongo.change_stream import ChangeStream
from pymongo.client_session import ClientSession, TransactionOptions, _TxnState
from pymongo.collection import Collection
from pymongo.command_cursor import CommandCursor
from pymongo.database import Database
from pymongo.encryption import ClientEncryption
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
@ -1087,6 +1088,31 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
kwargs["command"] = ordered_command
return target.command(**kwargs)
def _databaseOperation_runCursorCommand(self, target, **kwargs):
return list(self._databaseOperation_createCommandCursor(target, **kwargs))
def _databaseOperation_createCommandCursor(self, target, **kwargs):
self.__raise_if_unsupported("createCommandCursor", target, Database)
# Ensure the first key is the command name.
ordered_command = SON([(kwargs.pop("command_name"), 1)])
ordered_command.update(kwargs["command"])
kwargs["command"] = ordered_command
cursor_type = kwargs.pop("cursor_type", "nonTailable")
if cursor_type == CursorType.TAILABLE:
ordered_command["tailable"] = True
elif cursor_type == CursorType.TAILABLE_AWAIT:
ordered_command["tailable"] = True
ordered_command["awaitData"] = True
elif cursor_type != "nonTailable":
self.fail(f"unknown cursorType: {cursor_type}")
if "maxTimeMS" in kwargs:
kwargs["max_time_ms"] = kwargs["maxTimeMS"]
del kwargs["maxTimeMS"]
return target.cursor_command(**kwargs)
def _databaseOperation_listCollections(self, target, *args, **kwargs):
if "batch_size" in kwargs:
kwargs["cursor"] = {"batchSize": kwargs.pop("batch_size")}
@ -1150,7 +1176,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
return next(target)
def _cursor_iterateUntilDocumentOrError(self, target, *args, **kwargs):
self.__raise_if_unsupported("iterateUntilDocumentOrError", target, NonLazyCursor)
self.__raise_if_unsupported(
"iterateUntilDocumentOrError", target, NonLazyCursor, CommandCursor
)
while target.alive:
try:
return next(target)
@ -1159,7 +1187,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
return None
def _cursor_close(self, target, *args, **kwargs):
self.__raise_if_unsupported("close", target, NonLazyCursor)
self.__raise_if_unsupported("close", target, NonLazyCursor, CommandCursor)
return target.close()
def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs):
@ -1250,7 +1278,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
doc.setdefault("metadata", {})["contentType"] = doc.pop("contentType")
elif isinstance(target, ChangeStream):
method_name = f"_changeStreamOperation_{opname}"
elif isinstance(target, NonLazyCursor):
elif isinstance(target, (NonLazyCursor, CommandCursor)):
method_name = f"_cursor_{opname}"
elif isinstance(target, ClientSession):
method_name = f"_sessionOperation_{opname}"

View File

@ -91,6 +91,8 @@ commands =
[testenv:typecheck]
description = run mypy and pyright to typecheck
extras =
{[testenv:typecheck-mypy]extras}
deps =
{[testenv:typecheck-mypy]deps}
{[testenv:typecheck-pyright]deps}