diff --git a/doc/changelog.rst b/doc/changelog.rst index 24c80efa2..b8f346e57 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -22,6 +22,10 @@ PyMongo 4.3 brings a number of improvements including: deadlocks are still possible because libraries that PyMongo depends like OpenSSL cannot be made fork() safe in multithreaded applications. (`PYTHON-2484`_). For more info see :ref:`pymongo-fork-safe`. +- When used with MongoDB 6.0+, :class:`~pymongo.change_stream.ChangeStream` s + now allow for new types of events (such as DDL and C2C replication events) + to be recorded with the new parameter ``show_expanded_events`` + that can be passed to methods such as :meth:`~pymongo.collection.Collection.watch`. Bug fixes ......... diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index 62fe4bd05..a13f164f5 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -40,6 +40,7 @@ class _AggregationCommand(object): user_fields=None, result_processor=None, comment=None, + show_expanded_events=None, ): if "explain" in options: raise ConfigurationError( @@ -60,6 +61,7 @@ class _AggregationCommand(object): options["let"] = let if comment is not None: options["comment"] = comment + self._options = options # This is the batchSize that will be used for setting the initial diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 0edf513a3..775f93c79 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -109,6 +109,7 @@ class ChangeStream(Generic[_DocumentType]): start_after: Optional[Mapping[str, Any]], comment: Optional[Any] = None, full_document_before_change: Optional[str] = None, + show_expanded_events: Optional[bool] = None, ) -> None: if pipeline is None: pipeline = [] @@ -143,6 +144,7 @@ class ChangeStream(Generic[_DocumentType]): self._comment = comment self._closed = False self._timeout = self._target._timeout + self._show_expanded_events = show_expanded_events # Initialize cursor. self._cursor = self._create_cursor() @@ -175,6 +177,10 @@ class ChangeStream(Generic[_DocumentType]): if self._start_at_operation_time is not None: options["startAtOperationTime"] = self._start_at_operation_time + + if self._show_expanded_events: + options["showExpandedEvents"] = self._show_expanded_events + return options def _command_options(self): @@ -230,6 +236,7 @@ class ChangeStream(Generic[_DocumentType]): explicit_session, result_processor=self._process_result, comment=self._comment, + show_expanded_events=self._show_expanded_events, ) return self._client._retryable_read( cmd.get_cursor, self._target._read_preference_for(session), session diff --git a/pymongo/collection.py b/pymongo/collection.py index 9a9ba5661..8f1afc575 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -2495,6 +2495,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): start_after: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, full_document_before_change: Optional[str] = None, + show_expanded_events: Optional[bool] = None, ) -> CollectionChangeStream[_DocumentType]: """Watch changes on this collection. @@ -2579,12 +2580,16 @@ class Collection(common.BaseObject, Generic[_DocumentType]): This option and `resume_after` are mutually exclusive. - `comment` (optional): A user-provided comment to attach to this command. + - `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`. :Returns: A :class:`~pymongo.change_stream.CollectionChangeStream` cursor. + .. versionchanged:: 4.3 + Added `show_expanded_events` parameter. + .. versionchanged:: 4.2 - Added ``full_document_before_change`` parameter. + Added ``full_document_before_change`` parameter. .. versionchanged:: 4.1 Added ``comment`` parameter. @@ -2615,6 +2620,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]): start_after, comment, full_document_before_change, + show_expanded_events, ) @_csot.apply diff --git a/pymongo/database.py b/pymongo/database.py index 4f87a58dd..59328a1b5 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -547,6 +547,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): start_after: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, full_document_before_change: Optional[str] = None, + show_expanded_events: Optional[bool] = None, ) -> DatabaseChangeStream[_DocumentType]: """Watch changes on this database. @@ -624,10 +625,14 @@ class Database(common.BaseObject, Generic[_DocumentType]): This option and `resume_after` are mutually exclusive. - `comment` (optional): A user-provided comment to attach to this command. + - `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`. :Returns: A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor. + .. versionchanged:: 4.3 + Added `show_expanded_events` parameter. + .. versionchanged:: 4.2 Added ``full_document_before_change`` parameter. @@ -657,6 +662,7 @@ class Database(common.BaseObject, Generic[_DocumentType]): start_after, comment, full_document_before_change, + show_expanded_events=show_expanded_events, ) def _command( diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 7e4e4f10c..7d16e5877 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -895,6 +895,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): start_after: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, full_document_before_change: Optional[str] = None, + show_expanded_events: Optional[bool] = None, ) -> ChangeStream[_DocumentType]: """Watch changes on this cluster. @@ -972,10 +973,14 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): This option and `resume_after` are mutually exclusive. - `comment` (optional): A user-provided comment to attach to this command. + - `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`. :Returns: A :class:`~pymongo.change_stream.ClusterChangeStream` cursor. + .. versionchanged:: 4.3 + Added `show_expanded_events` parameter. + .. versionchanged:: 4.2 Added ``full_document_before_change`` parameter. @@ -1005,6 +1010,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]): start_after, comment, full_document_before_change, + show_expanded_events=show_expanded_events, ) @property diff --git a/test/change_streams/unified/change-streams-disambiguatedPaths.json b/test/change_streams/unified/change-streams-disambiguatedPaths.json new file mode 100644 index 000000000..91d8e66da --- /dev/null +++ b/test/change_streams/unified/change-streams-disambiguatedPaths.json @@ -0,0 +1,252 @@ +{ + "description": "disambiguatedPaths", + "schemaVersion": "1.4", + "createEntities": [ + { + "client": { + "id": "client0", + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + } + ], + "runOnRequirements": [ + { + "minServerVersion": "6.1.0", + "topologies": [ + "replicaset", + "sharded-replicaset", + "load-balanced", + "sharded" + ], + "serverless": "forbid" + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [] + } + ], + "tests": [ + { + "description": "disambiguatedPaths is not present when showExpandedEvents is false/unset", + "operations": [ + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": { + "1": 1 + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "a.1": 2 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "updateDescription": { + "updatedFields": { + "$$exists": true + }, + "removedFields": { + "$$exists": true + }, + "truncatedArrays": { + "$$exists": true + }, + "disambiguatedPaths": { + "$$exists": false + } + } + } + } + ] + }, + { + "description": "disambiguatedPaths is present on updateDescription when an ambiguous path is present", + "operations": [ + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": { + "1": 1 + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "a.1": 2 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "updateDescription": { + "updatedFields": { + "$$exists": true + }, + "removedFields": { + "$$exists": true + }, + "truncatedArrays": { + "$$exists": true + }, + "disambiguatedPaths": { + "a.1": [ + "a", + "1" + ] + } + } + } + } + ] + }, + { + "description": "disambiguatedPaths returns array indices as integers", + "operations": [ + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": [ + { + "1": 1 + } + ] + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "a.0.1": 2 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "updateDescription": { + "updatedFields": { + "$$exists": true + }, + "removedFields": { + "$$exists": true + }, + "truncatedArrays": { + "$$exists": true + }, + "disambiguatedPaths": { + "a.0.1": [ + "a", + { + "$$type": "int" + }, + "1" + ] + } + } + } + } + ] + } + ] +} diff --git a/test/change_streams/unified/change-streams-showExpandedEvents.json b/test/change_streams/unified/change-streams-showExpandedEvents.json new file mode 100644 index 000000000..3eed2f534 --- /dev/null +++ b/test/change_streams/unified/change-streams-showExpandedEvents.json @@ -0,0 +1,517 @@ +{ + "description": "change-streams-showExpandedEvents", + "schemaVersion": "1.7", + "runOnRequirements": [ + { + "minServerVersion": "6.0.0", + "topologies": [ + "replicaset", + "sharded-replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "killCursors" + ], + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "database": { + "id": "database1", + "client": "client0", + "databaseName": "database1" + } + }, + { + "collection": { + "id": "collection1", + "database": "database1", + "collectionName": "collection1" + } + }, + { + "database": { + "id": "shardedDb", + "client": "client0", + "databaseName": "shardedDb" + } + }, + { + "database": { + "id": "adminDb", + "client": "client0", + "databaseName": "admin" + } + }, + { + "collection": { + "id": "shardedCollection", + "database": "shardedDb", + "collectionName": "shardedCollection" + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [] + } + ], + "tests": [ + { + "description": "when provided, showExpandedEvents is sent as a part of the aggregate command", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "showExpandedEvents": true + } + } + ] + }, + "commandName": "aggregate", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "when omitted, showExpandedEvents is not sent as a part of the aggregate command", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "showExpandedEvents": { + "$$exists": false + } + } + } + ] + }, + "commandName": "aggregate", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "when showExpandedEvents is true, new fields on change stream events are handled appropriately", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "a": 1 + } + } + }, + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "rename", + "object": "collection0", + "arguments": { + "to": "foo", + "dropTarget": true + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "collectionUUID": { + "$$exists": true + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "createIndexes", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "operationDescription": { + "$$exists": true + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "rename", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "to": { + "db": "database0", + "coll": "foo" + }, + "operationDescription": { + "dropTarget": { + "$$exists": true + }, + "to": { + "db": "database0", + "coll": "foo" + } + } + } + } + ] + }, + { + "description": "when showExpandedEvents is true, createIndex events are reported", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "operationType": { + "$ne": "create" + } + } + } + ], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "createIndexes" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, dropIndexes events are reported", + "operations": [ + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_1" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "dropIndex", + "object": "collection0", + "arguments": { + "name": "x_1" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "dropIndexes" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, create events are reported", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, create events on views are reported", + "operations": [ + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "foo" + } + }, + { + "name": "createChangeStream", + "object": "database0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "createCollection", + "object": "database0", + "arguments": { + "collection": "foo", + "viewOn": "testName" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "create" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, modify events are reported", + "operations": [ + { + "name": "createIndex", + "object": "collection0", + "arguments": { + "keys": { + "x": 1 + }, + "name": "x_2" + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "runCommand", + "object": "database0", + "arguments": { + "command": { + "collMod": "collection0" + }, + "commandName": "collMod" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "modify" + } + } + ] + }, + { + "description": "when showExpandedEvents is true, shardCollection events are reported", + "runOnRequirements": [ + { + "topologies": [ + "sharded-replicaset", + "sharded" + ] + } + ], + "operations": [ + { + "name": "dropCollection", + "object": "shardedDb", + "arguments": { + "collection": "shardedCollection" + } + }, + { + "name": "createCollection", + "object": "shardedDb", + "arguments": { + "collection": "shardedCollection" + } + }, + { + "name": "createChangeStream", + "object": "shardedCollection", + "arguments": { + "pipeline": [], + "showExpandedEvents": true + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "runCommand", + "object": "adminDb", + "arguments": { + "command": { + "shardCollection": "shardedDb.shardedCollection", + "key": { + "_id": 1 + } + }, + "commandName": "shardCollection" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "shardCollection" + } + } + ] + } + ] +} diff --git a/test/utils.py b/test/utils.py index 1ac726d2d..33a594d15 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1127,6 +1127,8 @@ def prepare_spec_arguments(spec, arguments, opname, entity_map, with_txn_callbac arguments["index_or_name"] = arguments.pop(arg_name) elif opname == "rename" and arg_name == "to": arguments["new_name"] = arguments.pop(arg_name) + elif opname == "rename" and arg_name == "dropTarget": + arguments["dropTarget"] = arguments.pop(arg_name) elif arg_name == "cursorType": cursor_type = arguments.pop(arg_name) if cursor_type == "tailable":