PYTHON-3232 Improved change stream event visibility for C2C Replication (#1062)

This commit is contained in:
Julius Park 2022-09-27 15:31:20 -07:00 committed by GitHub
parent 2af12e6463
commit c874c96e29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 803 additions and 1 deletions

View File

@ -22,6 +22,10 @@ PyMongo 4.3 brings a number of improvements including:
deadlocks are still possible because libraries that PyMongo depends like
OpenSSL cannot be made fork() safe in multithreaded applications.
(`PYTHON-2484`_). For more info see :ref:`pymongo-fork-safe`.
- When used with MongoDB 6.0+, :class:`~pymongo.change_stream.ChangeStream` s
now allow for new types of events (such as DDL and C2C replication events)
to be recorded with the new parameter ``show_expanded_events``
that can be passed to methods such as :meth:`~pymongo.collection.Collection.watch`.
Bug fixes
.........

View File

@ -40,6 +40,7 @@ class _AggregationCommand(object):
user_fields=None,
result_processor=None,
comment=None,
show_expanded_events=None,
):
if "explain" in options:
raise ConfigurationError(
@ -60,6 +61,7 @@ class _AggregationCommand(object):
options["let"] = let
if comment is not None:
options["comment"] = comment
self._options = options
# This is the batchSize that will be used for setting the initial

View File

@ -109,6 +109,7 @@ class ChangeStream(Generic[_DocumentType]):
start_after: Optional[Mapping[str, Any]],
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> None:
if pipeline is None:
pipeline = []
@ -143,6 +144,7 @@ class ChangeStream(Generic[_DocumentType]):
self._comment = comment
self._closed = False
self._timeout = self._target._timeout
self._show_expanded_events = show_expanded_events
# Initialize cursor.
self._cursor = self._create_cursor()
@ -175,6 +177,10 @@ class ChangeStream(Generic[_DocumentType]):
if self._start_at_operation_time is not None:
options["startAtOperationTime"] = self._start_at_operation_time
if self._show_expanded_events:
options["showExpandedEvents"] = self._show_expanded_events
return options
def _command_options(self):
@ -230,6 +236,7 @@ class ChangeStream(Generic[_DocumentType]):
explicit_session,
result_processor=self._process_result,
comment=self._comment,
show_expanded_events=self._show_expanded_events,
)
return self._client._retryable_read(
cmd.get_cursor, self._target._read_preference_for(session), session

View File

@ -2495,6 +2495,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> CollectionChangeStream[_DocumentType]:
"""Watch changes on this collection.
@ -2579,12 +2580,16 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
- `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`.
:Returns:
A :class:`~pymongo.change_stream.CollectionChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.
.. versionchanged:: 4.2
Added ``full_document_before_change`` parameter.
Added ``full_document_before_change`` parameter.
.. versionchanged:: 4.1
Added ``comment`` parameter.
@ -2615,6 +2620,7 @@ class Collection(common.BaseObject, Generic[_DocumentType]):
start_after,
comment,
full_document_before_change,
show_expanded_events,
)
@_csot.apply

View File

@ -547,6 +547,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> DatabaseChangeStream[_DocumentType]:
"""Watch changes on this database.
@ -624,10 +625,14 @@ class Database(common.BaseObject, Generic[_DocumentType]):
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
- `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`.
:Returns:
A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.
.. versionchanged:: 4.2
Added ``full_document_before_change`` parameter.
@ -657,6 +662,7 @@ class Database(common.BaseObject, Generic[_DocumentType]):
start_after,
comment,
full_document_before_change,
show_expanded_events=show_expanded_events,
)
def _command(

View File

@ -895,6 +895,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
start_after: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
full_document_before_change: Optional[str] = None,
show_expanded_events: Optional[bool] = None,
) -> ChangeStream[_DocumentType]:
"""Watch changes on this cluster.
@ -972,10 +973,14 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
This option and `resume_after` are mutually exclusive.
- `comment` (optional): A user-provided comment to attach to this
command.
- `show_expanded_events` (optional): Include expanded events such as DDL events like `dropIndexes`.
:Returns:
A :class:`~pymongo.change_stream.ClusterChangeStream` cursor.
.. versionchanged:: 4.3
Added `show_expanded_events` parameter.
.. versionchanged:: 4.2
Added ``full_document_before_change`` parameter.
@ -1005,6 +1010,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
start_after,
comment,
full_document_before_change,
show_expanded_events=show_expanded_events,
)
@property

View File

@ -0,0 +1,252 @@
{
"description": "disambiguatedPaths",
"schemaVersion": "1.4",
"createEntities": [
{
"client": {
"id": "client0",
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
}
],
"runOnRequirements": [
{
"minServerVersion": "6.1.0",
"topologies": [
"replicaset",
"sharded-replicaset",
"load-balanced",
"sharded"
],
"serverless": "forbid"
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": []
}
],
"tests": [
{
"description": "disambiguatedPaths is not present when showExpandedEvents is false/unset",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": {
"1": 1
}
}
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": []
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "updateOne",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$set": {
"a.1": 2
}
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "update",
"ns": {
"db": "database0",
"coll": "collection0"
},
"updateDescription": {
"updatedFields": {
"$$exists": true
},
"removedFields": {
"$$exists": true
},
"truncatedArrays": {
"$$exists": true
},
"disambiguatedPaths": {
"$$exists": false
}
}
}
}
]
},
{
"description": "disambiguatedPaths is present on updateDescription when an ambiguous path is present",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": {
"1": 1
}
}
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "updateOne",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$set": {
"a.1": 2
}
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "update",
"ns": {
"db": "database0",
"coll": "collection0"
},
"updateDescription": {
"updatedFields": {
"$$exists": true
},
"removedFields": {
"$$exists": true
},
"truncatedArrays": {
"$$exists": true
},
"disambiguatedPaths": {
"a.1": [
"a",
"1"
]
}
}
}
}
]
},
{
"description": "disambiguatedPaths returns array indices as integers",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1,
"a": [
{
"1": 1
}
]
}
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "updateOne",
"object": "collection0",
"arguments": {
"filter": {
"_id": 1
},
"update": {
"$set": {
"a.0.1": 2
}
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "update",
"ns": {
"db": "database0",
"coll": "collection0"
},
"updateDescription": {
"updatedFields": {
"$$exists": true
},
"removedFields": {
"$$exists": true
},
"truncatedArrays": {
"$$exists": true
},
"disambiguatedPaths": {
"a.0.1": [
"a",
{
"$$type": "int"
},
"1"
]
}
}
}
}
]
}
]
}

View File

@ -0,0 +1,517 @@
{
"description": "change-streams-showExpandedEvents",
"schemaVersion": "1.7",
"runOnRequirements": [
{
"minServerVersion": "6.0.0",
"topologies": [
"replicaset",
"sharded-replicaset",
"sharded"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
],
"ignoreCommandMonitoringEvents": [
"killCursors"
],
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
},
{
"database": {
"id": "database1",
"client": "client0",
"databaseName": "database1"
}
},
{
"collection": {
"id": "collection1",
"database": "database1",
"collectionName": "collection1"
}
},
{
"database": {
"id": "shardedDb",
"client": "client0",
"databaseName": "shardedDb"
}
},
{
"database": {
"id": "adminDb",
"client": "client0",
"databaseName": "admin"
}
},
{
"collection": {
"id": "shardedCollection",
"database": "shardedDb",
"collectionName": "shardedCollection"
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": []
}
],
"tests": [
{
"description": "when provided, showExpandedEvents is sent as a part of the aggregate command",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
}
],
"expectEvents": [
{
"client": "client0",
"ignoreExtraEvents": true,
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"showExpandedEvents": true
}
}
]
},
"commandName": "aggregate",
"databaseName": "database0"
}
}
]
}
]
},
{
"description": "when omitted, showExpandedEvents is not sent as a part of the aggregate command",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": []
},
"saveResultAsEntity": "changeStream0"
}
],
"expectEvents": [
{
"client": "client0",
"ignoreExtraEvents": true,
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"cursor": {},
"pipeline": [
{
"$changeStream": {
"showExpandedEvents": {
"$$exists": false
}
}
}
]
},
"commandName": "aggregate",
"databaseName": "database0"
}
}
]
}
]
},
{
"description": "when showExpandedEvents is true, new fields on change stream events are handled appropriately",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"a": 1
}
}
},
{
"name": "createIndex",
"object": "collection0",
"arguments": {
"keys": {
"x": 1
},
"name": "x_1"
}
},
{
"name": "rename",
"object": "collection0",
"arguments": {
"to": "foo",
"dropTarget": true
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "insert",
"ns": {
"db": "database0",
"coll": "collection0"
},
"collectionUUID": {
"$$exists": true
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "createIndexes",
"ns": {
"db": "database0",
"coll": "collection0"
},
"operationDescription": {
"$$exists": true
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "rename",
"ns": {
"db": "database0",
"coll": "collection0"
},
"to": {
"db": "database0",
"coll": "foo"
},
"operationDescription": {
"dropTarget": {
"$$exists": true
},
"to": {
"db": "database0",
"coll": "foo"
}
}
}
}
]
},
{
"description": "when showExpandedEvents is true, createIndex events are reported",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [
{
"$match": {
"operationType": {
"$ne": "create"
}
}
}
],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createIndex",
"object": "collection0",
"arguments": {
"keys": {
"x": 1
},
"name": "x_1"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "createIndexes"
}
}
]
},
{
"description": "when showExpandedEvents is true, dropIndexes events are reported",
"operations": [
{
"name": "createIndex",
"object": "collection0",
"arguments": {
"keys": {
"x": 1
},
"name": "x_1"
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "dropIndex",
"object": "collection0",
"arguments": {
"name": "x_1"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "dropIndexes"
}
}
]
},
{
"description": "when showExpandedEvents is true, create events are reported",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "database0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "create"
}
}
]
},
{
"description": "when showExpandedEvents is true, create events on views are reported",
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "foo"
}
},
{
"name": "createChangeStream",
"object": "database0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "foo",
"viewOn": "testName"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "create"
}
}
]
},
{
"description": "when showExpandedEvents is true, modify events are reported",
"operations": [
{
"name": "createIndex",
"object": "collection0",
"arguments": {
"keys": {
"x": 1
},
"name": "x_2"
}
},
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "runCommand",
"object": "database0",
"arguments": {
"command": {
"collMod": "collection0"
},
"commandName": "collMod"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "modify"
}
}
]
},
{
"description": "when showExpandedEvents is true, shardCollection events are reported",
"runOnRequirements": [
{
"topologies": [
"sharded-replicaset",
"sharded"
]
}
],
"operations": [
{
"name": "dropCollection",
"object": "shardedDb",
"arguments": {
"collection": "shardedCollection"
}
},
{
"name": "createCollection",
"object": "shardedDb",
"arguments": {
"collection": "shardedCollection"
}
},
{
"name": "createChangeStream",
"object": "shardedCollection",
"arguments": {
"pipeline": [],
"showExpandedEvents": true
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "runCommand",
"object": "adminDb",
"arguments": {
"command": {
"shardCollection": "shardedDb.shardedCollection",
"key": {
"_id": 1
}
},
"commandName": "shardCollection"
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"operationType": "shardCollection"
}
}
]
}
]
}

View File

@ -1127,6 +1127,8 @@ def prepare_spec_arguments(spec, arguments, opname, entity_map, with_txn_callbac
arguments["index_or_name"] = arguments.pop(arg_name)
elif opname == "rename" and arg_name == "to":
arguments["new_name"] = arguments.pop(arg_name)
elif opname == "rename" and arg_name == "dropTarget":
arguments["dropTarget"] = arguments.pop(arg_name)
elif arg_name == "cursorType":
cursor_type = arguments.pop(arg_name)
if cursor_type == "tailable":