diff --git a/docs/change_streams.md b/docs/change_streams.md index c37976e054c..4d7652c291a 100644 --- a/docs/change_streams.md +++ b/docs/change_streams.md @@ -512,7 +512,28 @@ emit events for various DDL operations. In addition, setting `showExpandedEvents streams return the additional fields `collectionUUID` (for various change stream event types) and `updateDescription.disambiguatedPaths` (for update events). -#### `matchCollectionUUIDForUpdateLookup` (public) +#### `ignoreRemovedShards` (public) + +The `ignoreRemovedShards` parameter is only meaningful when a sharded cluster change stream uses the +v2 reader with precise shard targeting. The parameter controls the behavior when the change stream +encounters a time range for which a now-removed shard held relevant data. + +The possible values for this parameter are: + +- `false` (default, "Strict mode"): The change stream fails if it cannot retrieve events from a + required shard that has since been removed from the cluster. This guarantees that no events are + silently lost due to shard removal. +- `true` ("Ignore-Removed-Shards mode" or "IRS mode"): The change stream continues reading even when + a required shard has been removed. Events that would have originated from the removed shard during + the affected time range (a _degraded segment_) are skipped. Normal event delivery resumes once the + stream advances past the degraded time range. + +#### `allChangesForCluster` (public) + +This flag must be set when opening an all-cluster change stream. Will normally be set by drivers +automatically when opening an all-cluster change stream. + +#### `matchCollectionUUIDForUpdateLookup` (internal) The `matchCollectionUUIDForUpdateLookup` field can be used to ensure that "updateLookup" operations are performed on the correct collection in case multiple collections with the same name have existed @@ -526,10 +547,25 @@ document was looked up for. If set to `true`, "updateLookup" operations will com UUID of the change event with the UUID of the collection. If there is a UUID mismatch, the returned `fullDocument` field of the event will be set to `null`. -#### `allChangesForCluster` (public) +#### `version` (internal) -This flag must be set when opening an all-cluster change stream. Will normally be set by drivers -automatically when opening an all-cluster change stream. +The `version` parameter selects the change stream reader version, which controls the shard-targeting +behavior in sharded cluster deployments. On replica sets, this parameter is accepted but has no +effect. The supported values are: + +- `"v1"`: Opens cursors on all shards in the cluster, including the config server, regardless of + whether they can hold data for the monitored namespace. This is the legacy behavior from earlier + versions before v9.0. +- `"v2"` (default when shard placement history is available): Uses _precise shard targeting_ to open + cursors only on the shards that actually hold data for the monitored namespace at any given + cluster time. See [Precise Shard Targeting in Sharded Cluster Change Streams] below for details. + +If the `version` parameter is omitted, the server automatically selects `"v2"` when shard placement +history is available, and falls back to `"v1"` otherwise. Explicitly specifying `"v2"` when +placement history is not available silently falls back to `"v1"`. + +This parameter is used for testing and is not intended to be set by change stream consumers. The +server selects the appropriate version automatically. #### `showSystemEvents` (internal) @@ -631,16 +667,84 @@ and how close the change stream has advanced towards the end of the node's oplog Opening a change stream on a sharded cluster works differently. Here, the consumer opens the change stream against a _mongos_ instance. The _mongos_ instance will then use the cluster's topology -information to open the cursors on the config server and the data shards on behalf of the consumer. -Because of the ordering guarantee provided by change streams, _mongos_ must wait until all cursors -have either responded with events, or ran into a timeout and reported that currently no more events -are available for them. The latter is why change streams in a sharded cluster can have higher -latency than change streams in replica sets. +information to open the cursors on the appropriate data shards on behalf of the consumer. Because of +the ordering guarantee provided by change streams, _mongos_ must wait until all cursors have either +responded with events, or ran into a timeout and reported that currently no more events are +available for them. The latter is why change streams in a sharded cluster can have higher latency +than change streams in replica sets. For sharded cluster change streams, the merging of the multiple streams of change events from the different cursors is performed by the [`AsyncResultsMerger`](https://github.com/mongodb/mongo/blob/eb4c6148f6a25c444be39a0e330506834526d935/src/mongo/s/query/exec/async_results_merger.h#L100). +### Precise Shard Targeting in Sharded Cluster Change Streams + +By default, sharded cluster change streams use _precise shard targeting_ (also called "v2" +internally). Instead of opening cursors on all shards in the cluster, the change stream queries the +config server's placement history to determine which shards actually hold data for the monitored +namespace, and opens cursors only on those shards. This eliminates unnecessary polling of shards +that have no relevant data and reduces end-to-end latency. + +#### Initial shard set + +When a v2 change stream is opened, _mongos_ queries the config server's placement history for the +monitored namespace at the requested start time. The resulting set of shards is used to establish +the initial set of remote cursors. If the start time precedes the point at which the placement +history was first recorded (e.g., immediately after an FCV upgrade that introduced placement history +tracking), the change stream may fall back to targeting all shards until the historical placement +information becomes available. + +The initial set of targeted shards differs by scope: + +- **Collection-level change streams**: Cursors are opened only on shards than can hold chunks of the + target collection. If the collection's database does not exist yet at the requested start time, a + cursor is instead opened on the config server to detect when the database and collection are + created. +- **Database-level change streams**: Cursors are opened on shards that can hold any collection + belonging to the target database. If the database does not yet exist at the requested start time, + a cursor is opened on the config server to detect database creation. +- **Cluster-level change streams**: Cursors are always opened on all data-bearing shards plus the + config server. + +#### Dynamic cursor adjustment via control events + +As data placement changes (e.g., due to chunk migrations, `reshardCollection`, `movePrimary`, or +`dropCollection`), the change stream must adjust its cursor targeting accordingly. This is +accomplished through _control events_, which are internal, consumer-invisible events that carry +placement change information. + +When a DDL operation modifies data placement, the config server writes a `NamespacePlacementChanged` +entry to its oplog and notifies the affected data shards. Each affected shard writes a corresponding +no-op oplog entry. The `$_internalChangeStreamInjectControlEvents` stage on each shard converts +these no-op entries into control events (tagged with `$changeStreamControlEvent` metadata), which +are then forwarded to the `$_internalChangeStreamHandleTopologyChangeV2` stage on _mongos_. This +stage processes the control events and opens or closes shard cursors as appropriate. Control events +are never returned to the consumer. + +#### Read modes in v2 + +v2 change streams can operate in two modes, controlled by the `ignoreRemovedShards` parameter: + +- **Strict mode** (default, `ignoreRemovedShards: false`): If the change stream encounters a time + range for which a now-removed shard held relevant data, the stream fails. This guarantees + exactly-once delivery even across shard removal events. +- **Ignore-Removed-Shards (IRS) mode** (`ignoreRemovedShards: true`): The change stream continues + reading even when a required shard has been removed. Events from the removed shard during + _degraded segments_ (bounded time ranges where the shard was required but is gone) are skipped. + Once the stream advances past a degraded segment, normal event delivery resumes. This mode is + appropriate for consumers that can tolerate event loss during shard decommissioning. + +#### FCV compatibility and fallback + +v2 change streams require placement history to be recorded in the config server. This recording is +automatically performed in FCV 9.0 and higher. If the cluster's FCV is downgraded to a version that +does not maintain placement history (i.e. any FCV before 9.0), an active v2 change stream +transparently falls back to v1 mode without losing events (by internally reopening its cursors). +Resume tokens produced by v2 change streams are valid in v1 streams and vice versa. + +An existing v1 stream does not automatically upgrade to v2 when the FCV is upgraded. A new v2 stream +must be explicitly opened to take advantage of precise shard targeting. + ## Change Stream Pipeline Building A change stream pipeline issued by a consumer contains the `$changeStream` meta stage. This stage is @@ -682,42 +786,9 @@ non-DML change events in case `showExpandedEvents` is not set. ### Sharded Cluster Pipelines -For sharded cluster change streams, _mongos_ will first expand the `$changeStream` stage into the -following internal stages: - -- `$_internalChangeStreamOplogMatch` -- `$_internalChangeStreamUnwindTransaction` -- `$_internalChangeStreamTransform` -- `$_internalChangeStreamCheckInvalidate` (only present for collection-level and database-level - change streams) -- `$_internalChangeStreamCheckResumability` -- `$_internalChangeStreamAddPreImage` (only present if `fullDocumentBeforeChange` is not set to - `off`) -- `$_internalChangeStreamAddPostImage` (only present if `fullDocument` is not set to `default`) -- user-defined `$match` expression (only present if the user's change stream pipeline contains a - `$match` stage) -- user-defined `$project` expression (only present if the user's change stream pipeline contains a - `$project` stage) -- `$_internalChangeStreamSplitLargeEvent` (only present if the change stream is opened with the - `$changeStreamSplitLargeEvent` pipeline step) - ---- - -- `$_internalChangeStreamHandleTopologyChange` -- `$_internalChangeStreamEnsureResumeTokenPresent` (only present if the change stream resume token - is not a high water mark token) - -Additionally, the change stream pipeline on a sharded cluster will contain a `$match` stage to -filter out all non-DML change events in case `showExpandedEvents` is not set. - -After building the initial pipeline stages, _mongos_ will split the pipeline into two parts: - -- a part that is executed on data shards ("shard pipeline") and -- a part that is executed on _mongos_ ("merge pipeline"). - -The pipeline split point is above the `$_internalChangeStreamHandleTopologyChange` stage. _mongos_ -will also add a `$mergeCursors` stage that aggregates the responses from different shards and the -config server into a single, sorted stream. +For sharded cluster change streams, _mongos_ will expand the `$changeStream` stage into multiple +internal stages split across a _shard pipeline_ (executed on each data shard) and a _merge pipeline_ +(executed on _mongos_). The exact stages depend on the change stream reader version. #### Data Shard Pipeline @@ -729,6 +800,8 @@ The shard pipeline will look like this: - `$_internalChangeStreamCheckInvalidate` (only present for collection-level and database-level change streams) - `$_internalChangeStreamCheckResumability` +- `$_internalChangeStreamInjectControlEvents` (only present in v2 change streams; converts data + placement change events into internal control events) - `$_internalChangeStreamAddPreImage` (only present if `fullDocumentBeforeChange` is not set to `off`) - `$_internalChangeStreamAddPostImage` (only present if `fullDocument` is not set to `default`) @@ -744,10 +817,18 @@ The shard pipeline will look like this: The merge pipeline on _mongos_ will look like this: - `$mergeCursors` -- `$_internalChangeStreamHandleTopologyChange` +- `$_internalChangeStreamHandleTopologyChangeV2` (v2, default) _or_ + `$_internalChangeStreamHandleTopologyChange` (v1, legacy) - `$_internalChangeStreamEnsureResumeTokenPresent` (only present if the change stream resume token is not a high water mark token) +The pipeline split point is above the topology-change handler stage. _mongos_ also adds a +`$mergeCursors` stage that aggregates the responses from different shards (and in some cases the +config server) into a single, sorted stream. + +Additionally, the change stream pipeline on a sharded cluster will contain a `$match` stage to +filter out all non-DML change events in case `showExpandedEvents` is not set. + ### Details of individual Pipeline Stages #### `$_internalChangeStreamOplogMatch` @@ -857,17 +938,59 @@ The `ChangeStreamEnsureResumeTokenPresent` code is #### `$_internalChangeStreamHandleTopologyChange` -This stage is only present in sharded cluster change streams and is always part of the _mongos_ +This stage is only present in v1 sharded cluster change streams and is always part of the _mongos_ merge pipeline. The stage is responsible for opening additional cursors to shards that have been -added to the cluster. It will handle "insert" events into the `config.shards` collection that were -observed from the config server. +added to the cluster. It handles "insert" events into the `config.shards` collection that were +observed from the config server. In v2 change streams, this stage is replaced by +`$_internalChangeStreamHandleTopologyChangeV2`. The `DocumentSourceChangeStreamHandleTopologyChange` code can be found [here](https://github.com/mongodb/mongo/blob/eb4c6148f6a25c444be39a0e330506834526d935/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h#L63). The `ChangeStreamHandleTopologyChangeStage` code can be found [here](https://github.com/mongodb/mongo/blob/eb4c6148f6a25c444be39a0e330506834526d935/src/mongo/db/exec/agg/change_stream_handle_topology_change_stage.cpp#L121). +#### `$_internalChangeStreamHandleTopologyChangeV2` + +This stage is only present in v2 sharded cluster change streams and is always part of the _mongos_ +merge pipeline. It replaces the `$_internalChangeStreamHandleTopologyChange` stage used by v1 change +streams. Rather than tracking additions to `config.shards`, this stage processes _control events_, +and dynamically opens or closes remote shard cursors as data placement changes are detected. This +enables the precise shard targeting that distinguishes v2 from v1 change streams. + +The `DocumentSourceChangeStreamHandleTopologyChangeV2` code can be found +[here](https://github.com/mongodb/mongo/blob/329eda6d27129c10a4c0aedb261fd83c5e291516/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change_v2.h#L55). +The `ChangeStreamHandleTopologyChangeV2Stage` code can be found +[here](https://github.com/mongodb/mongo/blob/329eda6d27129c10a4c0aedb261fd83c5e291516/src/mongo/db/exec/agg/change_stream_handle_topology_change_v2_stage.h#L63). + +#### `$_internalChangeStreamInjectControlEvents` + +This stage is only present in v2 sharded cluster change streams and runs on each data shard as part +of the shard pipeline (i.e., it appears between `$_internalChangeStreamCheckResumability` and +`$_internalChangeStreamAddPreImage`). Its role is to intercept specific no-op oplog entries that +carry placement change notifications and covert them into internal _control events_. These oplog +entries are written on the config server or participating shards when a DDL operation modifies data +placement. + +The following oplog entry types are handled on the config server: + +- `insert` entries for the `config.databases` collection (also called 'DatabaseCreated' internally) +- `namespacePlacementChanged` entries + +On data-bearing shards, the following oplog entry types are handled: + +- `moveChunk` +- `movePrimary` +- `namespacePlacementChanged` + +Control events are not visible to change stream consumers. They are forwarded from the data shards +to the `$_internalChangeStreamHandleTopologyChangeV2` stage on _mongos_, which uses the placement +information they carry to adjust cursor targeting. After processing a control event, _mongos_ either +opens new shard cursors (for shards that have gained relevant data) or closes existing shard cursors +(for shards that have lost all relevant data). + +The `DocumentSourceChangeStreamInjectControlEvents` code can be found +[here](https://github.com/mongodb/mongo/blob/329eda6d27129c10a4c0aedb261fd83c5e291516/src/mongo/db/pipeline/document_source_change_stream_inject_control_events.h#L61). + ## Missing documentation (to be completed) - How are user-defined match expressions are handled, rewritten and pushed down. -- Changes to pipeline building and behavior due to SPM-1941.