SERVER-111611 Enable pre-existing collection change stream tests for v2 design (#43568)

GitOrigin-RevId: 7bdc2fde7be841772ce24c5f219cc828e853235c
This commit is contained in:
Denis Grebennicov 2025-11-20 12:22:24 +01:00 committed by MongoDB Bot
parent 4536ea5807
commit a58ab22027
40 changed files with 1176 additions and 395 deletions

4
.github/CODEOWNERS vendored
View File

@ -199,7 +199,6 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/benchmarks_query.yml @10gen/query @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/bulk_write* @10gen/query-execution-write-exec @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/bulk_write_multi_op_sharded_collections_jscore_passthrough.yml @10gen/query-execution-router @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/change_streams* @10gen/query-execution-change-streams @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/clustered_collection_passthrough.yml @10gen/query-execution @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/concurrency_compute_mode.yml @10gen/query-integration-features @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/core_wildcard_indexes.yml @10gen/query-execution-write-exec @svc-auto-approve-bot
@ -249,6 +248,8 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/sharding_jscore_passthrough_with_config_shard.yml @10gen/server-cluster-scalability @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/sharded_collections_jscore_passthrough_with_config_shard.yml @10gen/server-cluster-scalability @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/concurrency_simultaneous_replication.yml @10gen/server-replication-reviewers @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/change_streams* @10gen/query-execution-change-streams @svc-auto-approve-bot
/buildscripts/resmokeconfig/suites/**/sharding_change_streams_v2.yml @10gen/query-execution-change-streams @svc-auto-approve-bot
# The following patterns are parsed from ./buildscripts/resmokelib/OWNERS.yml
/buildscripts/resmokelib/ @10gen/devprod-correctness @svc-auto-approve-bot
@ -1017,6 +1018,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/jstests/libs/override_methods/**/implicit_identity_views.js @10gen/query-optimization @svc-auto-approve-bot
/jstests/libs/override_methods/**/implicitly_retry_on_conflicting_operation_during_fuzztest.js @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot
/jstests/libs/override_methods/**/validate_collections_on_shutdown.js @10gen/server-validate @svc-auto-approve-bot
/jstests/libs/override_methods/**/implicit_change_stream_v2.js @10gen/query-execution-change-streams @svc-auto-approve-bot
# The following patterns are parsed from ./jstests/libs/override_methods/collection_write_path/OWNERS.yml
/jstests/libs/override_methods/collection_write_path/**/* @10gen/server-collection-write-path @svc-auto-approve-bot

View File

@ -0,0 +1,72 @@
##########################################################
# THIS IS A GENERATED FILE -- DO NOT MODIFY.
# IF YOU WISH TO MODIFY THIS SUITE, MODIFY THE CORRESPONDING MATRIX SUITE MAPPING FILE
# AND REGENERATE THE MATRIX SUITES.
#
# matrix suite mapping file: buildscripts/resmokeconfig/matrix_suites/mappings/change_streams_secondary_reads_sharded_collections_v2.yml
# regenerate matrix suites: buildscripts/resmoke.py generate-matrix-suites && bazel run //:format
##########################################################
description:
This suite is a combination of change_streams_mongos_passthrough, change_streams_secondary_reads,
and change_streams_sharded_collections_passthrough. You can run any of these tests
individually to debug any issues that might arrise.
executor:
archive:
hooks:
- CheckReplDBHash
- ValidateCollections
config:
shell_options:
eval:
globalThis.testingReplication = true; await import('jstests/libs/override_methods/set_read_and_write_concerns.js');;
await import('jstests/libs/override_methods/set_read_preference_secondary.js');;
await import('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');;
await import('jstests/libs/override_methods/implicit_change_stream_v2.js');
global_vars:
TestData:
defaultReadConcernLevel: null
setShellParameter: defaultFindReplicaSetHostTimeoutMS=120000
fixture:
class: ShardedClusterFixture
mongod_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
logComponentVerbosity:
command: 1
query: 1
replication: 3
verbosity: 0
periodicNoopIntervalSecs: 1
writePeriodicNoops: true
mongos_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
logComponentVerbosity:
command: 1
network:
asio: 2
verbosity: 1
verbosity: 0
num_rs_nodes_per_shard: 2
hooks:
- class: CheckReplDBHash
- class: ValidateCollections
- class: CleanEveryN
n: 20
matrix_suite: true
selector:
exclude_files:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
- jstests/change_streams/report_post_batch_resume_token.js
- jstests/change_streams/resume_from_high_water_mark_token.js
exclude_with_any_tags:
- assumes_write_concern_unchanged
- assumes_against_mongod_not_mongos
- assumes_unsharded_collection
- assumes_no_implicit_collection_creation_on_get_collection
- assumes_read_preference_unchanged
roots:
- jstests/change_streams/**/*.js
test_kind: js_test

View File

@ -50,7 +50,6 @@ selector:
exclude_files:
- jstests/change_streams/does_not_implicitly_create_database.js
- jstests/change_streams/oplog_rewrite/*.js
- jstests/change_streams/projection_fakes_internal_event.js
- jstests/change_streams/split_large_event.js
- src/mongo/db/modules/enterprise/jstests/fle2/basic_create_collection.js
- src/mongo/db/modules/enterprise/jstests/fle2/fle2_bulk_write.js

View File

@ -50,7 +50,6 @@ selector:
exclude_files:
- jstests/change_streams/does_not_implicitly_create_database.js
- jstests/change_streams/oplog_rewrite/*.js
- jstests/change_streams/projection_fakes_internal_event.js
- jstests/change_streams/split_large_event.js
- src/mongo/db/modules/enterprise/jstests/fle2/basic_create_collection.js
- src/mongo/db/modules/enterprise/jstests/fle2/fle2_bulk_write.js

View File

@ -0,0 +1,24 @@
# This suite combines testing for change_streams_mongos_passthrough, change_streams_secondary_reads,
# and change_streams_sharded_collections_passthrough, while running change streams in v2 version.
# If part of this suite fails, you can comment out individual parts or run the above tests
# to dig deeper into what is failing.
base_suite: change_streams
description: >-
This suite is a combination of change_streams_mongos_passthrough,
change_streams_secondary_reads, and change_streams_sharded_collections_passthrough.
You can run any of these tests individually to debug any issues that might arrise.
overrides:
- "change_streams.mongos_passthrough"
- "change_streams.secondary_reads"
- "change_streams.base_eval"
eval:
- "change_streams.secondary_reads_eval"
- "change_streams.sharded_collections_passthrough_eval"
- "change_streams.change_streams_v2_eval"
#uncomment this line if you comment out the secondary reads portion of this file
# - "change_streams.causal_consistency"
excludes:
- "change_streams.mongos_passthrough_excludes"
- "change_streams.sharded_collections_passthrough_excludes"
- "change_streams.secondary_reads_excludes"
- "change_streams.batch_zero_excludes"

View File

@ -68,6 +68,17 @@
value:
eval: "await import('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');"
- name: change_streams_v2_eval
value:
eval: "await import('jstests/libs/override_methods/implicit_change_stream_v2.js');"
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
- name: batch_zero_excludes
value:
exclude_files:
- jstests/change_streams/report_post_batch_resume_token.js
- jstests/change_streams/resume_from_high_water_mark_token.js
- name: secondary_reads
value:
executor:

View File

@ -104,7 +104,6 @@
exclude_files:
- jstests/change_streams/does_not_implicitly_create_database.js
- jstests/change_streams/oplog_rewrite/*.js
- jstests/change_streams/projection_fakes_internal_event.js
- jstests/change_streams/split_large_event.js
exclude_with_any_tags:
- assumes_against_mongod_not_mongos

View File

@ -45,9 +45,6 @@ filters:
- "bulk_write_multi_op_sharded_collections_jscore_passthrough.yml":
approvers:
- 10gen/query-execution-router
- "change_streams*":
approvers:
- 10gen/query-execution-change-streams
- "clustered_collection_passthrough.yml":
approvers:
- 10gen/query-execution
@ -195,3 +192,9 @@ filters:
- "concurrency_simultaneous_replication.yml":
approvers:
- 10gen/server-replication-reviewers
- "change_streams*":
approvers:
- 10gen/query-execution-change-streams
- "sharding_change_streams_v2.yml":
approvers:
- 10gen/query-execution-change-streams

View File

@ -0,0 +1,78 @@
test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# This test creates a collection (and index) inside a transaction. Even though the collections are
# unsharded this suite enables sharding in the test database which makes transactions against
# it distributed. This causes the following tests to fail since creating a collection in a
# distributed transaction is not allowed.
- jstests/change_streams/ddl_create_index_txn.js
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
- jstests/change_streams/create_event_from_chunk_migration.js
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
- jstests/change_streams/reshard_collection_event.js
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
- jstests/change_streams/report_post_batch_resume_token.js
- jstests/change_streams/resume_from_high_water_mark_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
# set_read_and_write_concerns.js override when it refuses to replace the readConcern or
# writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
# warranted.
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
executor:
archive:
hooks:
- CheckReplDBHash
- CheckMetadataConsistencyInBackground
- ValidateCollections
config:
shell_options:
global_vars:
TestData:
defaultReadConcernLevel: null
# Enable causal consistency for change streams suites using 1 node replica sets. See
# change_streams.yml for detailed explanation.
eval: >-
globalThis.testingReplication = true;
await import('jstests/libs/override_methods/set_read_and_write_concerns.js');
await import('jstests/libs/override_methods/enable_sessions.js');
await import('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
await import('jstests/libs/override_methods/implicit_change_stream_v2.js');
# Set longer host discovery time to handle change stream resumable errors.
setShellParameter: defaultFindReplicaSetHostTimeoutMS=120000
hooks:
- class: CheckReplDBHash
- class: CheckMetadataConsistencyInBackground
- class: RunQueryStats
- class: ValidateCollections
- class: CheckOrphansDeleted
- class: CleanEveryN
n: 20
fixture:
class: ShardedClusterFixture
# Use two shards to make sure we will only talk to the primary shard for the database and will
# not delay changes to wait for notifications or a clock advancement from other shards.
num_shards: 2
mongos_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
mongod_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
periodicNoopIntervalSecs: 1
writePeriodicNoops: true

View File

@ -0,0 +1,81 @@
test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
- jstests/change_streams/create_event_from_chunk_migration.js
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
- jstests/change_streams/reshard_collection_event.js
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
- jstests/change_streams/report_post_batch_resume_token.js
- jstests/change_streams/resume_from_high_water_mark_token.js
exclude_with_any_tags:
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
# No need to use a passthrough to add transactions to a test that already has its own
# transactions.
- uses_transactions
# These tests make assumptions about change stream results that are no longer true once operations
# get bundled into transactions.
- change_stream_does_not_expect_txns
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
executor:
archive:
hooks:
- CheckReplDBHash
- CheckReplOplogs
- CheckMetadataConsistencyInBackground
- ValidateCollections
config:
shell_options:
global_vars:
TestData:
networkErrorAndTxnOverrideConfig:
wrapCRUDinTransactions: true
# Enable the transactions passthrough.
eval: >-
globalThis.testingReplication = true;
await import("jstests/libs/override_methods/enable_sessions.js");
await import("jstests/libs/override_methods/txn_passthrough_cmd_massage.js");
await import("jstests/libs/override_methods/network_error_and_txn_override.js");
await import("jstests/libs/override_methods/implicit_filter_eot_changestreams.js");
await import('jstests/libs/override_methods/implicit_change_stream_v2.js');
# Set longer host discovery time to handle change stream resumable errors.
setShellParameter: defaultFindReplicaSetHostTimeoutMS=120000
hooks:
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
# validating the entire contents of the collection.
- class: CheckReplOplogs
- class: CheckReplDBHash
- class: CheckMetadataConsistencyInBackground
- class: RunQueryStats
- class: ValidateCollections
- class: CheckOrphansDeleted
- class: CleanEveryN
n: 20
fixture:
class: ShardedClusterFixture
# Use two shards to make sure we will only talk to the primary shard for the database and will
# not delay changes to wait for notifications or a clock advancement from other shards.
num_shards: 2
mongos_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
mongod_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
periodicNoopIntervalSecs: 1
writePeriodicNoops: true

View File

@ -6,8 +6,6 @@ selector:
exclude_files:
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# Stage not supported inside of a multi-document transaction: $indexStats.
- jstests/change_streams/projection_fakes_internal_event.js
# TODO: SERVER-98064 Investigate split_large_event.js failures in change_streams_multi_stmt_txn_sharded_collections_passthrough
- jstests/change_streams/split_large_event.js
exclude_with_any_tags:

View File

@ -0,0 +1,83 @@
test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# TODO: SERVER-98064 Investigate split_large_event.js failures in change_streams_multi_stmt_txn_sharded_collections_passthrough
- jstests/change_streams/split_large_event.js
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
- jstests/change_streams/create_event_from_chunk_migration.js
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
- jstests/change_streams/reshard_collection_event.js
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
- jstests/change_streams/report_post_batch_resume_token.js
- jstests/change_streams/resume_from_high_water_mark_token.js
exclude_with_any_tags:
# These tests would fail with "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
# No need to use a passthrough to add transactions to a test that already has its own
# transactions.
- uses_transactions
# These tests make assumptions about change stream results that are no longer true once operations
# get bundled into transactions.
- change_stream_does_not_expect_txns
# Exclude any tests that don't support sharding.
- assumes_against_mongod_not_mongos
- assumes_unsharded_collection
executor:
archive:
hooks:
- CheckReplDBHash
- CheckReplOplogs
- CheckMetadataConsistencyInBackground
- ValidateCollections
config:
shell_options:
global_vars:
TestData:
networkErrorAndTxnOverrideConfig:
wrapCRUDinTransactions: true
# Enable the transactions passthrough.
eval: >-
globalThis.testingReplication = true;
await import("jstests/libs/override_methods/enable_sessions.js");
await import("jstests/libs/override_methods/txn_passthrough_cmd_massage.js");
await import("jstests/libs/override_methods/network_error_and_txn_override.js");
await import("jstests/libs/override_methods/implicit_filter_eot_changestreams.js");
await import("jstests/libs/override_methods/implicitly_shard_accessed_collections.js");
await import('jstests/libs/override_methods/implicit_change_stream_v2.js');
# Set longer host discovery time to handle change stream resumable errors.
setShellParameter: defaultFindReplicaSetHostTimeoutMS=120000
hooks:
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
# validating the entire contents of the collection.
- class: CheckReplOplogs
- class: CheckReplDBHash
- class: CheckMetadataConsistencyInBackground
- class: RunQueryStats
- class: ValidateCollections
- class: CheckOrphansDeleted
- class: CleanEveryN
n: 20
fixture:
class: ShardedClusterFixture
mongos_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
mongod_options:
bind_ip_all: ""
set_parameters:
enableTestCommands: 1
writePeriodicNoops: 1
periodicNoopIntervalSecs: 1
num_shards: 2

View File

@ -3,9 +3,6 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# Expects a change stream cursor to be open on each of 2 shards.
- jstests/change_streams/projection_fakes_internal_event.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the

View File

@ -0,0 +1,26 @@
test_kind: js_test
selector:
roots:
- jstests/sharding/query/change_streams/*.js
exclude_files:
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
- jstests/sharding/query/change_streams/change_streams_primary_shard_unaware.js
- jstests/sharding/query/change_streams/config_server_data_shard_error_handling.js
# TODO: SERVER-113459 ChangeStreamHandleTopologyChangeV2Stage does not respect maxTimeMS of getMore request when in Waiting state.
- jstests/sharding/query/change_streams/change_stream_enforce_max_time_ms_on_mongos.js
# Test is testing behavior specific to change streams v1.
- jstests/sharding/query/change_streams/change_streams_establishment_finds_new_shards.js
executor:
archive:
tests:
- jstests/sharding/*reshard*.js
config:
shell_options:
nodb: ""
# Ensure we always start change streams with v2 version.
eval: >-
await import('jstests/libs/override_methods/implicit_change_stream_v2.js');

View File

@ -265,6 +265,7 @@ selector:
# then remove them from the exclusion list.
- jstests/sharding/query/api_version/api_version_stage_allowance_checks.js
- jstests/sharding/query/log_remote_op_wait.js
- jstests/sharding/query/change_streams/projection_fakes_internal_event.js
- jstests/sharding/query/change_streams/change_stream_against_shard_mongod.js
- jstests/sharding/query/change_streams/change_stream_chunk_migration.js
- jstests/sharding/query/change_streams/change_stream_empty_apply_ops.js

View File

@ -590,6 +590,18 @@ tasks:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_mongos_sessions_passthrough_v2
tags:
[
"assigned_to_jira_team_server_query_execution",
"default",
"change_streams",
]
commands:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_multi_stmt_txn_mongos_passthrough
tags:
@ -602,6 +614,18 @@ tasks:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_multi_stmt_txn_mongos_passthrough_v2
tags:
[
"assigned_to_jira_team_server_query_execution",
"default",
"change_streams",
]
commands:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_multi_stmt_txn_passthrough
tags:
@ -626,6 +650,18 @@ tasks:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_multi_stmt_txn_sharded_collections_passthrough_v2
tags:
[
"assigned_to_jira_team_server_query_execution",
"default",
"change_streams",
]
commands:
- func: "do setup"
- func: "run tests"
- <<: *gen_task_template
name: change_streams_multiversion_gen
tags:
@ -674,6 +710,19 @@ tasks:
- func: "do setup"
- func: "run tests"
- <<: *task_template
name: change_streams_secondary_reads_sharded_collections_v2
tags:
[
"assigned_to_jira_team_server_query_execution",
"default",
"change_streams",
"secondary_reads",
]
commands:
- func: "do setup"
- func: "run tests"
- <<: *gen_task_template
name: change_streams_sharded_collections_multiversion_gen
tags:
@ -1425,6 +1474,15 @@ tasks:
vars:
resmoke_jobs_max: 1
- <<: *gen_task_template
name: sharding_change_streams_v2_gen
tags:
["assigned_to_jira_team_server_query_execution", "default", "sharding"]
commands:
- func: "generate resmoke tasks"
vars:
use_large_distro: "true"
- <<: *gen_task_template
name: sharding_api_strict_passthrough_gen
tags:

View File

@ -177,8 +177,11 @@ buildvariants:
- name: causally_consistent_jscore_passthrough_gen
- name: change_streams
- name: change_streams_mongos_sessions_passthrough
- name: change_streams_mongos_sessions_passthrough_v2
- name: change_streams_multi_stmt_txn_mongos_passthrough
- name: change_streams_multi_stmt_txn_mongos_passthrough_v2
- name: change_streams_multi_stmt_txn_sharded_collections_passthrough
- name: change_streams_multi_stmt_txn_sharded_collections_passthrough_v2
- name: change_streams_per_shard_cursor_passthrough
- name: change_streams_sharded_collections_query_shape_hash_stability_multiversion_gen
- name: fle2_sharding_high_cardinality

View File

@ -34,7 +34,12 @@ assert.commandFailedWithCode(
);
// Verify change streams cannot be created on views.
assert.commandFailedWithCode(
db.runCommand({aggregate: normalViewName, pipeline: [{$changeStream: {}}], cursor: {}}),
ErrorCodes.CommandNotSupportedOnView,
);
let response = db.runCommand({aggregate: normalViewName, pipeline: [{$changeStream: {}}], cursor: {}});
if (response.ok) {
// In case we are running change streams version 2, the cursor may not be opened on the shard.
// To ensure the failure indeed occurs, we issue a getMore command to ensure that the cursor
// will be attempted to be opened on the shard and will fail.
assert.eq(response._changeStreamVersion, "v2", "Change stream of version v1 should fail immediately");
response = db.runCommand({getMore: response.cursor.id, collection: normalViewName});
}
assert.commandFailedWithCode(response, ErrorCodes.CommandNotSupportedOnView);

View File

@ -29,4 +29,6 @@ const csCursor2 = testColl.watch([{$match: {"fullDocument._id": {$gt: 1}}}, {$pr
// As long as SERVER-71565 is not fixed, we expect the new change stream to throw a fatal error
// because the resume token is not present in the new stream.
assert.throwsWithCode(() => csCursor2.hasNext(), ErrorCodes.ChangeStreamFatalError);
assert.throwsWithCode(() => {
assert.soon(() => csCursor2.hasNext());
}, ErrorCodes.ChangeStreamFatalError);

View File

@ -1,219 +0,0 @@
/**
* Tests that a user projection which fakes an internal topology-change event is handled gracefully
* in a sharded cluster.
* TODO SERVER-65778: rework this test when we can handle faked internal events more robustly.
*
* Tests that if a user fakes an internal event with a projection nothing crashes, so not valuable
* to test with a config shard.
* @tags: [assumes_read_preference_unchanged, config_shard_incompatible]
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
const numShards = 2;
const st = new ShardingTest({
shards: numShards,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
});
const mongosConn = st.s;
const testDB = mongosConn.getDB(jsTestName());
const adminDB = mongosConn.getDB("admin");
const testColl = testDB.test;
// Insert one test document that points to a valid shard, and one that points to an invalid shard.
// These will generate change events that look identical to a config.shards entry, except for 'ns'.
// It also means that the documentKey field in the resume token will look like a potentially valid
// new-shard document.
const existingShardDoc = testDB.getSiblingDB("config").shards.find({_id: st.rs0.name}).next();
const existingShardWrongNameDoc = {
_id: "nonExistentName",
host: existingShardDoc.host,
};
const existingShardWrongHostDoc = {
_id: st.rs1.name,
host: `${st.rs1.name}/${st.rs1.host}-wrong:${st.rs1.ports[0]}`,
};
const fakeShardDoc = {
_id: "shardX",
host: "shardX/nonExistentHost:27017",
};
const invalidShardDoc = {
_id: "shardY",
host: null,
};
const configDotShardsNs = {
db: "config",
coll: "shards",
};
assert.commandWorked(testColl.insert(existingShardWrongNameDoc));
assert.commandWorked(testColl.insert(existingShardWrongHostDoc));
assert.commandWorked(testColl.insert(existingShardDoc));
assert.commandWorked(testColl.insert(invalidShardDoc));
assert.commandWorked(testColl.insert(fakeShardDoc));
// Log the shard description documents that we just inserted into the collection.
jsTestLog("Shard docs: " + tojson(testColl.find().toArray()));
// Helper function which opens a stream with the given projection and asserts that its behaviour
// conforms to the specified arguments; it will either throw the given error code, or return the
// expected events. Passing an empty array will confirm that we see no events in the stream. We
// further confirm that the faked events do not cause additional cursors to be opened.
function assertChangeStreamBehaviour(projection, expectedEvents, expectedErrorCode = null) {
// Can't expect both to see events and to throw an exception.
assert(!(expectedEvents && expectedErrorCode));
// Generate a random ID for this stream.
const commentID = `${Math.random()}`;
// Create a change stream cursor with the specified projection.
let csCursor = testColl.watch([{$addFields: projection}], {
startAtOperationTime: Timestamp(1, 1),
comment: commentID,
});
// Confirm that the observed events match the expected events, if specified.
if (expectedEvents && expectedEvents.length > 0) {
for (let expectedEvent of expectedEvents) {
assert.soon(() => csCursor.hasNext());
const nextEvent = csCursor.next();
for (let fieldName in expectedEvent) {
assert.eq(expectedEvent[fieldName], nextEvent[fieldName], {expectedEvent, nextEvent});
}
}
}
// If there are no expected events, confirm that the token advances without seeing anything.
if (expectedEvents && expectedEvents.length == 0) {
const startPoint = csCursor.getResumeToken();
assert.soon(() => {
assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
return bsonWoCompare(csCursor.getResumeToken(), startPoint) > 0;
});
}
// If we expect an error code, assert that we throw it soon.
if (expectedErrorCode) {
assert.soon(() => {
try {
assert.throwsWithCode(() => csCursor.hasNext(), expectedErrorCode);
} catch (err) {
return false;
}
return true;
});
} else {
// Otherwise, confirm that we still only have a single cursor on each shard. It's possible
// that the same cursor will be listed as both active and inactive, so group by cursorId.
const openCursors = adminDB
.aggregate([
{$currentOp: {idleCursors: true}},
{$match: {"cursor.originatingCommand.comment": commentID}},
{
$group: {
_id: {shard: "$shard", cursorId: "$cursor.cursorId"},
currentOps: {$push: "$$ROOT"},
},
},
])
.toArray();
assert.eq(
openCursors.length,
numShards,
// Dump all the running operations for better debuggability.
() => tojson(adminDB.aggregate([{$currentOp: {idleCursors: true}}]).toArray()),
);
}
// Close the change stream when we are done.
csCursor.close();
}
// Test that a projection which fakes a 'migrateChunkToNewShard' event is swallowed but has no
// effect.
let testProjection = {operationType: "migrateChunkToNewShard"};
assertChangeStreamBehaviour(testProjection, []);
// Test that a projection which fakes an event on config.shards with a non-string operationType is
// allowed to pass through.
testProjection = {
ns: configDotShardsNs,
operationType: null,
};
assertChangeStreamBehaviour(testProjection, [
{operationType: null, fullDocument: existingShardWrongNameDoc},
{operationType: null, fullDocument: existingShardWrongHostDoc},
{operationType: null, fullDocument: existingShardDoc},
{operationType: null, fullDocument: invalidShardDoc},
{operationType: null, fullDocument: fakeShardDoc},
]);
// Test that a projection which fakes an event on config.shards with a non-timestamp clusterTime
// is allowed to pass through.
testProjection = {
ns: configDotShardsNs,
clusterTime: null,
};
assertChangeStreamBehaviour(testProjection, [
{clusterTime: null, fullDocument: existingShardWrongNameDoc},
{clusterTime: null, fullDocument: existingShardWrongHostDoc},
{clusterTime: null, fullDocument: existingShardDoc},
{clusterTime: null, fullDocument: invalidShardDoc},
{clusterTime: null, fullDocument: fakeShardDoc},
]);
// Test that a projection which fakes an event on config.shards with a non-object fullDocument
// is allowed to pass through.
testProjection = {
ns: configDotShardsNs,
fullDocument: null,
};
assertChangeStreamBehaviour(testProjection, [
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
]);
// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
// pointing to an existing shard is swallowed but has no effect.
testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardDoc,
};
assertChangeStreamBehaviour(testProjection, []);
// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
// pointing to an existing shard's host, but the wrong shard name, throws as it attempts to connect.
testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardWrongNameDoc,
};
assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound);
// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
// pointing to an existing shard's name, but the wrong host, is swallowed and has no effect.
testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardWrongHostDoc,
};
assertChangeStreamBehaviour(testProjection, []);
// Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument
// pointing to a non-existent shard throws as it attempts to connect.
testProjection = {
ns: configDotShardsNs,
fullDocument: fakeShardDoc,
};
assertChangeStreamBehaviour(testProjection, null, ErrorCodes.ShardNotFound);
// Test that a projection which fakes a new-shard event on config.shards with an invalid
// fullDocument throws a validation exception.
testProjection = {
ns: configDotShardsNs,
fullDocument: invalidShardDoc,
};
assertChangeStreamBehaviour(testProjection, null, ErrorCodes.TypeMismatch);
st.stop();

View File

@ -51,3 +51,6 @@ filters:
- "validate_collections_on_shutdown.js":
approvers:
- 10gen/server-validate
- "implicit_change_stream_v2.js":
approvers:
- 10gen/query-execution-change-streams

View File

@ -0,0 +1,41 @@
/**
* Overrides the $changeStream aggregation pipeline to run in version "v2".
*/
import {OverrideHelpers} from "jstests/libs/override_methods/override_helpers.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
let featureFlagEnabled = null;
function isFeatureFlagEnabled(conn) {
if (featureFlagEnabled === null) {
featureFlagEnabled = FeatureFlagUtil.isPresentAndEnabled(conn, "ChangeStreamPreciseShardTargeting");
}
return featureFlagEnabled;
}
function isChangeStreamCommandWithoutVersion(cmdObj) {
return (
cmdObj &&
cmdObj.aggregate &&
// TODO: SERVER-111325 Implement DatabaseChangeStreamShardTargeterImpl module.
cmdObj.aggregate !== 1 &&
// TODO: SERVER-111381 Implement AllDatabasesChangeStreamShardTargeterImpl module.
cmdObj.aggregate.$db !== "admin" &&
Array.isArray(cmdObj.pipeline) &&
cmdObj.pipeline.length > 0 &&
typeof cmdObj.pipeline[0].$changeStream == "object" &&
cmdObj.pipeline[0].$changeStream.constructor === Object &&
!cmdObj.pipeline[0].$changeStream.hasOwnProperty("version")
);
}
function runChangeStreamWithV2Version(conn, _dbName, _commandName, commandObj, func, makeFuncArgs) {
// TODO: SERVER-52253 Enable feature flag for Improved change stream handling of cluster topology changes.
if (isChangeStreamCommandWithoutVersion(commandObj) && isFeatureFlagEnabled(conn)) {
commandObj.pipeline[0].$changeStream["version"] = "v2";
}
return func.apply(conn, makeFuncArgs(commandObj));
}
OverrideHelpers.prependOverrideInParallelShell("jstests/libs/override_methods/implicit_change_stream_v2.js");
OverrideHelpers.overrideRunCommand(runChangeStreamWithV2Version);

View File

@ -264,7 +264,7 @@ export function ChangeStreamTest(_db, options) {
});
updateResumeToken(res.cursor, res.cursor.firstBatch);
_allCursors.push({db: _db.getName(), coll: collName, cursorId: res.cursor.id});
return res.cursor;
return {...res.cursor, _changeStreamVersion: res._changeStreamVersion};
});
};

View File

@ -21,7 +21,8 @@ const st = new ShardingTest({
});
const testDB = st.s.getDB(jsTestName());
const coll = testDB.test;
const collName = "test";
const coll = testDB[collName];
// The set of errors which might be thrown if we attempt to getMore after stopping a shard.
const expectedStopShardErrors = [
@ -92,7 +93,10 @@ for (let i = 1; i <= 10; ++i) {
// ... and read all the corresponding events from the stream.
assert.soon(() => {
return csCursor.hasNext() && csCursor.next().documentKey._id === 10;
if (csCursor.hasNext()) {
return csCursor.next().documentKey._id === 10;
}
return false;
});
// Issue a "find" query to retrieve the first few documents, leaving the cursor open.
@ -137,7 +141,15 @@ assert.commandFailedWithCode(err, expectedStopShardErrors);
assert(!("errorLabels" in err), err);
// Now confirm that attempting to open a new stream fails on the initial aggregate.
err = assert.throws(() => coll.watch([]));
{
err = assert.throws(() => {
let res = coll.watch([]);
assert.soon(() => {
const cursorId = res._cursorid ?? res.cursor.id;
res = assert.commandWorked(testDB.runCommand({getMore: cursorId, collection: collName}));
});
});
}
assert.commandFailedWithCode(err, ErrorCodes.FailedToSatisfyReadPreference);
// Confirm that the response includes the "ResumableChangeStreamError" error label.

View File

@ -6,6 +6,7 @@
// ]
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
const rsNodeOptions = {
// Use a higher frequency for periodic noops to speed up the test.
@ -13,7 +14,7 @@ const rsNodeOptions = {
};
const st = new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
jsTestLog("Starting new shard (but not adding to shard set yet)");
jsTest.log.info("Starting new shard (but not adding to shard set yet)");
const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions});
newShard.startSet({shardsvr: ""});
newShard.initiate();
@ -37,25 +38,47 @@ assert.commandWorked(
);
// While opening the cursor, wait for the failpoint and add the new shard.
function addShardAndMigrate(mongosHost, newShardURL, newShardName, collFullName) {
const mongos = new Mongo(mongosHost);
const db = mongos.getDB("admin");
jsTest.log.info("Looking for failpoint shardedAggregateHangBeforeEstablishingShardCursors in the logs");
checkLog.contains(db, "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled");
jsTest.log.info(`Adding new shard ${newShardURL}`);
assert.commandWorked(db.adminCommand({addShard: newShardURL, name: newShardName}));
jsTest.log.info("Moving chunk to new shard");
assert.commandWorked(
db.adminCommand({
moveChunk: collFullName,
find: {_id: 20},
to: newShardName,
_waitForDelete: true,
}),
);
jsTest.log.info("Disabling failpoint shardedAggregateHangBeforeEstablishingShardCursors");
assert.commandWorked(
db.adminCommand({
configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors",
mode: "off",
}),
);
}
const awaitNewShard = startParallelShell(
`
checkLog.contains(db,
"shardedAggregateHangBeforeEstablishingShardCursors fail point enabled");
assert.commandWorked(
db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"}));
// Migrate the [10, MaxKey] chunk to "newShard".
assert.commandWorked(db.adminCommand({moveChunk: "${mongosColl.getFullName()}",
find: {_id: 20},
to: "${newShard.name}",
_waitForDelete: true}));
assert.commandWorked(
db.adminCommand(
{configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors",
mode: "off"}));`,
funWithArgs(
addShardAndMigrate,
mongos.host, // Instead of st.s
newShard.getURL(),
newShard.name,
mongosColl.getFullName(),
),
mongos.port,
);
jsTestLog("Opening $changeStream cursor");
jsTest.log.info("Opening $changeStream cursor");
const changeStream = mongosColl.aggregate([{$changeStream: {}}]);
assert(!changeStream.hasNext(), "Do not expect any results yet");
@ -68,7 +91,7 @@ assert.commandWorked(mongosColl.insert({_id: 20}, {writeConcern: {w: "majority"}
// Expect to see them both.
for (let id of [0, 20]) {
jsTestLog("Expecting Item " + id);
jsTest.log.info("Expecting Item " + id);
assert.soon(() => changeStream.hasNext());
let next = changeStream.next();
assert.eq(next.operationType, "insert");

View File

@ -176,18 +176,26 @@ function testUnshardedBecomesSharded(collToWatch) {
],
});
// Verify that the kNewShardDetected event is successfully delivered to mongoS even in cases
// where the event does not match the user's filter.
// TODO SERVER-30784: remove this test-case, or rework it without the failpoint, when the
// kNewShardDetected event is the only way we detect a new shard for the collection.
mongosDB.adminCommand({configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "alwaysOn"});
ChangeStreamTest.assertChangeStreamThrowsCode({
db: mongosDB,
collName: collToWatch,
pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}, {$match: {operationType: "delete"}}],
expectedCode: ErrorCodes.ChangeStreamTopologyChange,
});
mongosDB.adminCommand({configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "off"});
if (cursor._changeStreamVersion === "v1") {
// Verify that the kNewShardDetected event is successfully delivered to mongoS even in cases
// where the event does not match the user's filter.
// TODO SERVER-30784: remove this test-case, or rework it without the failpoint, when the
// kNewShardDetected event is the only way we detect a new shard for the collection.
mongosDB.adminCommand({
configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient",
mode: "alwaysOn",
});
ChangeStreamTest.assertChangeStreamThrowsCode({
db: mongosDB,
collName: collToWatch,
pipeline: [
{$changeStream: {resumeAfter: preShardCollectionResumeToken}},
{$match: {operationType: "delete"}},
],
expectedCode: ErrorCodes.ChangeStreamTopologyChange,
});
mongosDB.adminCommand({configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "off"});
}
cst.cleanUp();
}

View File

@ -77,7 +77,7 @@ function killShardAndTestErrorForChangeStream({sharding, changeStream, cursor})
return false;
} catch (e) {
// Log the error message received, which should state that the cursor cannot be read.
jsTest.log.info("Cursor threw an error. Exception message:", e.toString());
jsTest.log.info("Cursor threw an error", {exception: e});
// Most importantly verify that the error is resumable.
return isResumableChangeStreamError(e);
}

View File

@ -0,0 +1,381 @@
/**
* Tests that a user projection which fakes an internal topology-change event is handled gracefully
* in a sharded cluster.
* TODO SERVER-65778: rework this test when we can handle faked internal events more robustly.
*
* Tests that if a user fakes an internal event with a projection nothing crashes, so not valuable
* to test with a config shard.
* @tags: [assumes_read_preference_unchanged, config_shard_incompatible]
*/
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {describe, it, before, after} from "jstests/libs/mochalite.js";
import {assertCreateCollection} from "jstests/libs/collection_drop_recreate.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
describe("$changeStream", function () {
const numShards = 2;
const configDotShardsNs = {
db: "config",
coll: "shards",
};
let st;
let adminDB;
let testColl;
const startAtOperationTime = Timestamp(1, 1);
let existingShardWrongNameDoc;
let existingShardWrongHostDoc;
let existingShardDoc;
let invalidShardDoc;
let fakeShardDoc;
before(function () {
st = new ShardingTest({
shards: numShards,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
});
const mongosConn = st.s;
const testDB = mongosConn.getDB(jsTestName());
adminDB = mongosConn.getDB("admin");
testColl = assertCreateCollection(testDB, "test");
// Insert one test document that points to a valid shard, and one that points to an invalid shard.
// These will generate change events that look identical to a config.shards entry, except for 'ns'.
// It also means that the documentKey field in the resume token will look like a potentially valid
// new-shard document.
existingShardDoc = testDB.getSiblingDB("config").shards.find({_id: st.rs0.name}).next();
existingShardWrongNameDoc = {
_id: "nonExistentName",
host: existingShardDoc.host,
};
existingShardWrongHostDoc = {
...existingShardDoc,
_id: st.rs1.name,
host: `${st.rs1.name}/${st.rs1.host}-wrong:${st.rs1.ports[0]}`,
};
fakeShardDoc = {
_id: "shardX",
host: "shardX/nonExistentHost:27017",
};
invalidShardDoc = {
_id: "shardY",
host: null,
};
assert.commandWorked(testColl.insert(existingShardWrongNameDoc));
assert.commandWorked(testColl.insert(existingShardWrongHostDoc));
assert.commandWorked(testColl.insert(existingShardDoc));
assert.commandWorked(testColl.insert(invalidShardDoc));
assert.commandWorked(testColl.insert(fakeShardDoc));
// TODO: SERVER-113286 Generate NamespacePlacementChanged on addition of first shard in the sharded cluster.
if (FeatureFlagUtil.isPresentAndEnabled(testDB, "ChangeStreamPreciseShardTargeting")) {
assert.commandWorked(testDB.adminCommand({resetPlacementHistory: 1}));
}
// Log the shard description documents that we just inserted into the collection.
jsTest.log.info("Shard docs: ", {docs: testColl.find().toArray()});
});
after(function () {
st.stop();
});
describe("returns nothing when projecting valid internal events for changeStreams v1, but returns all of them in v2", function () {
// Helper function which opens a stream with the given projection and asserts that its behaviour
// conforms to the specified arguments; it will return the expected events. Passing an empty array will confirm that we see no events in the stream. We
// further confirm that the faked events do not cause additional cursors to be opened.
function assertChangeStreamBehaviour(projection, expectedEvents) {
// Generate a random ID for this stream.
const commentID = `${Math.random()}`;
// Create a change stream cursor with the specified projection.
let csCursor = testColl.watch([{$addFields: projection}], {
startAtOperationTime,
comment: commentID,
});
const changeStreamVersion = csCursor.getChangeStreamVersion();
const expectedEventsForGivenChangeStreamVersion = expectedEvents[changeStreamVersion];
// Confirm that the observed events match the expected events, if specified.
if (expectedEventsForGivenChangeStreamVersion.length > 0) {
for (let expectedEvent of expectedEventsForGivenChangeStreamVersion) {
assert.soon(() => csCursor.hasNext());
const nextEvent = csCursor.next();
for (let fieldName in expectedEvent) {
assert.eq(expectedEvent[fieldName], nextEvent[fieldName], {expectedEvent, nextEvent});
}
}
} else {
// If there are no expected events, confirm that the token advances without seeing anything.
const startPoint = csCursor.getResumeToken();
assert.soon(() => {
assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
return bsonWoCompare(csCursor.getResumeToken(), startPoint) > 0;
});
}
// Otherwise, confirm that we still only have a single cursor on each shard. It's possible
// that the same cursor will be listed as both active and inactive, so group by cursorId.
const openCursors = adminDB
.aggregate([
{$currentOp: {idleCursors: true}},
{$match: {"cursor.originatingCommand.comment": commentID}},
{
$group: {
_id: {shard: "$shard", cursorId: "$cursor.cursorId"},
currentOps: {$push: "$$ROOT"},
},
},
])
.toArray();
const assertFn = changeStreamVersion === "v1" ? assert.eq : assert.lte;
assertFn(
openCursors.length,
numShards,
// Dump all the running operations for better debuggability.
() => tojson(adminDB.aggregate([{$currentOp: {idleCursors: true}}]).toArray()),
);
// Close the change stream when we are done.
csCursor.close();
}
it("Test that a projection which fakes a 'migrateChunkToNewShard' event is swallowed but has no effect", function () {
assertChangeStreamBehaviour(
{operationType: "migrateChunkToNewShard"},
{
v1: [],
v2: [
{operationType: "migrateChunkToNewShard"},
{operationType: "migrateChunkToNewShard"},
{operationType: "migrateChunkToNewShard"},
{operationType: "migrateChunkToNewShard"},
{operationType: "migrateChunkToNewShard"},
],
},
);
});
it("Test that a projection which fakes an event on config.shards with a non-string operationType is allowed to pass through", function () {
const testProjection = {
ns: configDotShardsNs,
operationType: null,
};
const expectedEvents = [
{operationType: null, fullDocument: existingShardWrongNameDoc},
{operationType: null, fullDocument: existingShardWrongHostDoc},
{operationType: null, fullDocument: existingShardDoc},
{operationType: null, fullDocument: invalidShardDoc},
{operationType: null, fullDocument: fakeShardDoc},
];
assertChangeStreamBehaviour(testProjection, {v1: expectedEvents, v2: expectedEvents});
});
it("Test that a projection which fakes an event on config.shards with a non-timestamp clusterTime is allowed to pass through", function () {
const testProjection = {
ns: configDotShardsNs,
clusterTime: null,
};
const expectedEvents = [
{clusterTime: null, fullDocument: existingShardWrongNameDoc},
{clusterTime: null, fullDocument: existingShardWrongHostDoc},
{clusterTime: null, fullDocument: existingShardDoc},
{clusterTime: null, fullDocument: invalidShardDoc},
{clusterTime: null, fullDocument: fakeShardDoc},
];
assertChangeStreamBehaviour(testProjection, {v1: expectedEvents, v2: expectedEvents});
});
it("Test that a projection which fakes an event on config.shards with a non-object fullDocument is allowed to pass through", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: null,
};
const expectedEvents = [
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
{fullDocument: null},
];
assertChangeStreamBehaviour(testProjection, {v1: expectedEvents, v2: expectedEvents});
});
it("Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument pointing to an existing shard is swallowed but has no effect", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardDoc,
};
assertChangeStreamBehaviour(testProjection, {
v1: [],
v2: [
{fullDocument: existingShardDoc},
{fullDocument: existingShardDoc},
{fullDocument: existingShardDoc},
{fullDocument: existingShardDoc},
{fullDocument: existingShardDoc},
],
});
});
it("Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument pointing to an existing shard's name, but the wrong host, is swallowed and has no effect", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardWrongHostDoc,
};
assertChangeStreamBehaviour(testProjection, {
v1: [],
v2: [
{fullDocument: existingShardWrongHostDoc},
{fullDocument: existingShardWrongHostDoc},
{fullDocument: existingShardWrongHostDoc},
{fullDocument: existingShardWrongHostDoc},
{fullDocument: existingShardWrongHostDoc},
],
});
});
});
describe("Throws an exception when handling shard insertion document coming from the shard in invalid format", function () {
function assertChangeStreamShouldThrowForV1(projection, expectedErrorCode) {
const isMultiversion =
TestData.useRandomBinVersionsWithinReplicaSet || TestData.mixedBinVersions || TestData.mongosBinVersion;
if (isMultiversion) {
return;
}
// Generate a random ID for this stream.
const commentID = `${Math.random()}`;
const testDB = st.s.getDB(jsTestName());
// Create a change stream cursor with the specified projection.
let csCursor = testColl.watch([{$addFields: projection}], {
startAtOperationTime,
version: "v1",
comment: commentID,
});
assert.throwsWithCode(() => {
let res = csCursor;
assert.soon(() => {
const cursorId = res._cursorid ?? res.cursor.id;
res = assert.commandWorked(testDB.runCommand({getMore: cursorId, collection: "test"}));
});
}, [expectedErrorCode, undefined]);
}
it("Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument pointing to an existing shard's host, but the wrong shard name, throws as it attempts to connect", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: existingShardWrongNameDoc,
};
assertChangeStreamShouldThrowForV1(testProjection, ErrorCodes.ShardNotFound);
});
it("Test that a projection which fakes a new-shard event on config.shards with a valid fullDocument pointing to a non-existent shard throws as it attempts to connect", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: fakeShardDoc,
};
assertChangeStreamShouldThrowForV1(testProjection, ErrorCodes.ShardNotFound);
});
it("Test that a projection which fakes a new-shard event on config.shards with an invalid fullDocument throws a validation exception", function () {
const testProjection = {
ns: configDotShardsNs,
fullDocument: invalidShardDoc,
};
assertChangeStreamShouldThrowForV1(testProjection, ErrorCodes.TypeMismatch);
});
});
describe("returns projected 'createDatabase' events", function () {
let csTest;
let tmpColl;
const databaseCreatedEventForExistingDbProjection = {
operationType: "insert",
ns: {db: "config", coll: "databases"},
fullDocument: {_id: "tmp"},
};
const databaseCreatedEventForNonExistingDbProjection = {
operationType: "insert",
ns: {db: "config", coll: "databases"},
fullDocument: {_id: "non-existent-db"},
};
const invalidDatabaseCreatedEventProjection = {
operationType: "insert",
ns: {coll: "databases"},
};
before(function () {
const testDB = st.s.getDB(jsTestName());
csTest = new ChangeStreamTest(testDB);
tmpColl = assertCreateCollection(testDB, "tmp");
assert.commandWorked(tmpColl.insert({a: 1}));
assert.commandWorked(tmpColl.insert({a: 2}));
});
it("for existing database", function () {
// Open change stream over 'tmpColl'.
const cursor = csTest.startWatchingChanges({
pipeline: [{$project: databaseCreatedEventForExistingDbProjection}],
collection: tmpColl,
startAtOperationTime,
});
// Ensure that the event is not swallowed and instead the insert is observed.
csTest.assertNextChangesEqual({
cursor,
expectedChanges: databaseCreatedEventForExistingDbProjection,
});
csTest.cleanUp();
});
it("for non-existing database", function () {
// Open change stream over 'tmpColl'.
const cursor = csTest.startWatchingChanges({
pipeline: [{$project: databaseCreatedEventForNonExistingDbProjection}],
collection: tmpColl,
startAtOperationTime,
});
// Ensure that the event is not swallowed and instead the insert is observed.
csTest.assertNextChangesEqual({
cursor,
expectedChanges: databaseCreatedEventForNonExistingDbProjection,
});
csTest.cleanUp();
});
it("for invalid createDatabase event", function () {
// Open change stream over 'tmpColl'.
const cursor = csTest.startWatchingChanges({
pipeline: [{$project: invalidDatabaseCreatedEventProjection}],
collection: tmpColl,
startAtOperationTime,
});
// Ensure that the event is not swallowed and instead the insert is observed.
csTest.assertNextChangesEqual({
cursor,
expectedChanges: invalidDatabaseCreatedEventProjection,
});
csTest.cleanUp();
});
});
});

View File

@ -99,7 +99,9 @@ public:
_mergeCursors = stage->getSourceStage();
_mergeCursors->recognizeControlEvents();
_mergeCursors->setInitialHighWaterMark(ResumeToken(resumeTokenData).toBSON());
_initializationResumeToken = ResumeToken(resumeTokenData);
_mergeCursors->setInitialHighWaterMark(_initializationResumeToken.toBSON());
_originalAggregateCommand = expCtx->getOriginalAggregateCommand().getOwned();
}
@ -113,11 +115,26 @@ public:
// Build the change stream pipeline command to be run on the data shards.
// '_originalAggregateCommand' already contains the relevant match expressions
// for the oplog.
auto cmdObj = change_stream::topology_helpers::createUpdatedCommandForNewShard(
expCtx,
atClusterTime,
_originalAggregateCommand,
ChangeStreamReaderVersionEnum::kV2);
auto cmdObj = [&]() {
// If the 'atClusterTime' matches the clusterTime of
// '_initializationResumeToken', we should open the $changeStream cursors by
// passing the original resume token.
const bool isInitialRequest =
atClusterTime == _initializationResumeToken.getClusterTime();
if (isInitialRequest) {
return change_stream::topology_helpers::createUpdatedCommandForNewShard(
expCtx,
_initializationResumeToken,
_originalAggregateCommand,
ChangeStreamReaderVersionEnum::kV2);
}
return change_stream::topology_helpers::createUpdatedCommandForNewShard(
expCtx,
atClusterTime,
_originalAggregateCommand,
ChangeStreamReaderVersionEnum::kV2);
}();
LOGV2_DEBUG(10657554,
3,
@ -385,6 +402,9 @@ private:
// opening change streams on data shards.
BSONObj _originalAggregateCommand;
// ResumeToken used for initializing the CursorManager.
ResumeToken _initializationResumeToken;
// Pointer to the preceding 'MergeCursors' stage. Will be set in 'initialize()'.
exec::agg::MergeCursorsStage* _mergeCursors = nullptr;

View File

@ -62,16 +62,21 @@ BSONObj replaceResumeTokenAndVersionInCommand(
MutableDocument changeStreamStage(
pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
if (changeStreamVersion.has_value()) {
changeStreamStage[DocumentSourceChangeStreamSpec::kVersionFieldName] =
Value(ChangeStreamReaderVersion_serializer(*changeStreamVersion));
}
// If the command was initially specified with a startAtOperationTime, we need to remove it to
// use the new resume token.
// Provide 'resumeToken' as part 'startAfter' for resuming the changeStream. 'startAfter' and
// not 'resumeAfter' attribute is provided, to allow opening the change stream cursors with an
// invalidation 'resumeToken' resume token.
// All other forms of resuming the change stream are set to null if provided in the original
// command.
changeStreamStage[DocumentSourceChangeStreamSpec::kStartAfterFieldName] = Value(resumeToken);
changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value();
changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value();
pipeline[0] =
Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
MutableDocument newCmd(std::move(originalCmd));
@ -84,9 +89,18 @@ BSONObj createUpdatedCommandForNewShard(
Timestamp atClusterTime,
const BSONObj& originalAggregateCommand,
const boost::optional<ChangeStreamReaderVersionEnum>& changeStreamVersion) {
auto resumeTokenForNewShard =
ResumeToken::makeHighWaterMarkToken(atClusterTime, expCtx->getChangeStreamTokenVersion());
return createUpdatedCommandForNewShard(
expCtx,
ResumeToken::makeHighWaterMarkToken(atClusterTime, expCtx->getChangeStreamTokenVersion()),
originalAggregateCommand,
changeStreamVersion);
}
BSONObj createUpdatedCommandForNewShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ResumeToken& resumeTokenForNewShard,
const BSONObj& originalAggregateCommand,
const boost::optional<ChangeStreamReaderVersionEnum>& changeStreamVersion) {
// Create a new shard command object containing the new resume token, adding the 'resumeAfter'
// field to the '$changeStream' spec. An example input 'originalAggregateCommand' is:
// {"aggregate":"test","pipeline":[{"$changeStream":{"fullDocument":"default"}}],"cursor":{"batchSize":101}}

View File

@ -34,6 +34,7 @@
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/resume_token.h"
#include <boost/optional/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
@ -58,6 +59,16 @@ BSONObj replaceResumeTokenAndVersionInCommand(
* The change stream version can be optionally specified to force a specific change stream reader
* version on the shard.
*/
BSONObj createUpdatedCommandForNewShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ResumeToken& resumeTokenForNewShard,
const BSONObj& originalAggregateCommand,
const boost::optional<ChangeStreamReaderVersionEnum>& changeStreamVersion);
/**
* Calls createUpdatedCommandForNewShard() with the HighWaterMark ResumeToken created from the
* 'atClusterTime' timestamp.
*/
BSONObj createUpdatedCommandForNewShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp atClusterTime,

View File

@ -883,6 +883,9 @@ boost::optional<Document> CommonMongodProcessInterface::doLookupSingleDocument(
auto foreignExpCtx = makeCopyFromExpressionContext(
expCtx, nss, collectionUUID, std::unique_ptr<CollatorInterface>());
// Clearing the change stream spec as the aggregate request is not a change stream.
foreignExpCtx->setChangeStreamSpec(boost::none);
// If we are here, we are either executing the pipeline normally or running in one of the
// execution stat explain verbosities. In either case, we disable explain on the foreign
// context so that we actually retrieve the document.

View File

@ -288,26 +288,10 @@ std::vector<repl::MutableOplogEntry> buildMoveChunkOplogEntries(
bool firstCollectionChunkOnRecipient) {
const auto nss = NamespaceStringUtil::serialize(collName, SerializationContext::stateDefault());
std::vector<repl::MutableOplogEntry> oplogEntries;
{
repl::MutableOplogEntry oplogEntry;
StringData opName("moveChunk");
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setNss(collName);
oplogEntry.setUuid(collUUID);
oplogEntry.setTid(collName.tenantId());
oplogEntry.setObject(BSON("msg" << BSON(opName << nss)));
oplogEntry.setObject2(BSON(opName << nss << "donor" << donor << "recipient" << recipient
<< "allCollectionChunksMigratedFromDonor"
<< noMoreCollectionChunksOnDonor));
oplogEntry.setOpTime(repl::OpTime());
oplogEntry.setWallClockTime(opCtx->fastClockSource().now());
oplogEntries.push_back(std::move(oplogEntry));
}
// Conditionally emit the legacy 'migrateLastChunkFromShard' and 'migrateChunkToNewShard' op
// entry types, consumed by V1 change stream readers.
// 'moveChunk' oplog entry must be the last one to be emitted to ensure V2 change stream reader
// correctness.
if (noMoreCollectionChunksOnDonor) {
repl::MutableOplogEntry legacyOplogEntry;
@ -344,6 +328,24 @@ std::vector<repl::MutableOplogEntry> buildMoveChunkOplogEntries(
oplogEntries.push_back(std::move(legacyOplogEntry));
}
{
repl::MutableOplogEntry oplogEntry;
StringData opName("moveChunk");
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setNss(collName);
oplogEntry.setUuid(collUUID);
oplogEntry.setTid(collName.tenantId());
oplogEntry.setObject(BSON("msg" << BSON(opName << nss)));
oplogEntry.setObject2(BSON(opName << nss << "donor" << donor << "recipient" << recipient
<< "allCollectionChunksMigratedFromDonor"
<< noMoreCollectionChunksOnDonor));
oplogEntry.setOpTime(repl::OpTime());
oplogEntry.setWallClockTime(opCtx->fastClockSource().now());
oplogEntries.push_back(std::move(oplogEntry));
}
return oplogEntries;
}

View File

@ -120,9 +120,6 @@ ShardTargeterDecision ChangeStreamShardTargeterDbPresentStateEventHandler::handl
// In case the set of shards is empty, it means the underlying database is no longer present.
// We open a cursor on the configsvr and change the event handler to the database absent state.
if (shards.empty()) {
tassert(10917002,
"Opened cursors set must be empty",
readerCtx.getCurrentlyTargetedDataShards().empty());
readerCtx.openCursorOnConfigServer(clusterTime + 1);
ctx.setEventHandler(buildDbAbsentStateEventHandler());
}

View File

@ -84,6 +84,51 @@ public:
ChangeStreamReadMode::kStrict, ChangeStreamType::kAllDatabases, boost::none /* nss */);
}
bool assertMoveChunk(const ChangeStream& changeStream,
const MatchExpression& matchExpr,
const NamespaceString& nss,
ShardId fromShard,
ShardId toShard) {
// buildMoveChunkOplogEntries() may build multiple oplog entries. The MatchExpression should
// only potentially match the last one depending on the 'nss' attribute.
auto moveChunkOplogEntriesWithMatchingNss =
buildMoveChunkOplogEntries(opCtx(),
nss,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto lastMoveChunkOplogEntryIt = --moveChunkOplogEntriesWithMatchingNss.end();
for (auto it = moveChunkOplogEntriesWithMatchingNss.begin();
it != lastMoveChunkOplogEntryIt;
++it) {
auto moveChunkWithMatchingNssBSON = it->toBSON();
BSONMatchableDocument matchingDoc(moveChunkWithMatchingNssBSON);
ASSERT_FALSE(exec::matcher::matches(&matchExpr, &matchingDoc));
}
auto moveChunkWithMatchingNssBSON = lastMoveChunkOplogEntryIt->toBSON();
BSONMatchableDocument matchingDoc(moveChunkWithMatchingNssBSON);
return exec::matcher::matches(&matchExpr, &matchingDoc);
}
void assertMoveChunkTrue(const ChangeStream& changeStream,
const MatchExpression& matchExpr,
const NamespaceString& nss,
ShardId fromShard,
ShardId toShard) {
ASSERT_TRUE(assertMoveChunk(changeStream, matchExpr, nss, fromShard, toShard));
}
void assertMoveChunkFalse(const ChangeStream& changeStream,
const MatchExpression& matchExpr,
const NamespaceString& nss,
ShardId fromShard,
ShardId toShard) {
ASSERT_FALSE(assertMoveChunk(changeStream, matchExpr, nss, fromShard, toShard));
}
private:
ServiceContext::UniqueOperationContext _opCtx;
boost::intrusive_ptr<ExpressionContext> _expCtx;
@ -222,33 +267,8 @@ TEST_F(
// Ensure 'controlEventFilter' matches only those moveChunk events that affect the relevant
// namespace.
{
auto moveChunkOplogEntriesWithMatchingNss =
buildMoveChunkOplogEntries(opCtx(),
matchingNss,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithMatchingNssBSON = moveChunkOplogEntriesWithMatchingNss[0].toBSON();
BSONMatchableDocument matchingDoc(moveChunkWithMatchingNssBSON);
ASSERT_EQ(*changeStream.getNamespace(), matchingNss);
ASSERT_TRUE(exec::matcher::matches(matchExpr.get(), &matchingDoc));
auto moveChunkOplogEntriesWithoutMatchingNss =
buildMoveChunkOplogEntries(opCtx(),
nonMatchingNss,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithoutMatchingNssBSON = moveChunkOplogEntriesWithoutMatchingNss[0].toBSON();
BSONMatchableDocument nonMatchingDoc(moveChunkWithoutMatchingNssBSON);
ASSERT_NE(*changeStream.getNamespace(), nonMatchingNss);
ASSERT_FALSE(exec::matcher::matches(matchExpr.get(), &nonMatchingDoc));
}
assertMoveChunkTrue(changeStream, *matchExpr, matchingNss, fromShard, toShard);
assertMoveChunkFalse(changeStream, *matchExpr, nonMatchingNss, fromShard, toShard);
}
TEST_F(
@ -346,44 +366,9 @@ TEST_F(
// Ensure 'controlEventFilter' matches only those moveChunk events that affect the relevant
// namespace.
{
auto moveChunkOplogEntriesWithMatchingNss1 =
buildMoveChunkOplogEntries(opCtx(),
matchingNss1,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithMatchingNss1BSON = moveChunkOplogEntriesWithMatchingNss1[0].toBSON();
BSONMatchableDocument matchingDoc1(moveChunkWithMatchingNss1BSON);
ASSERT_TRUE(exec::matcher::matches(matchExpr.get(), &matchingDoc1));
auto moveChunkOplogEntriesWithMatchingNss2 =
buildMoveChunkOplogEntries(opCtx(),
matchingNss2,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithMatchingNss2BSON = moveChunkOplogEntriesWithMatchingNss2[0].toBSON();
BSONMatchableDocument matchingDoc2(moveChunkWithMatchingNss2BSON);
ASSERT_TRUE(exec::matcher::matches(matchExpr.get(), &matchingDoc2));
auto moveChunkOplogEntriesWithoutMatchingNss =
buildMoveChunkOplogEntries(opCtx(),
nonMatchingNss,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithoutMatchingNssBSON = moveChunkOplogEntriesWithoutMatchingNss[0].toBSON();
BSONMatchableDocument nonMatchingDoc(moveChunkWithoutMatchingNssBSON);
ASSERT_NE(*changeStream.getNamespace(), nonMatchingNss);
ASSERT_FALSE(exec::matcher::matches(matchExpr.get(), &nonMatchingDoc));
}
assertMoveChunkTrue(changeStream, *matchExpr, matchingNss1, fromShard, toShard);
assertMoveChunkTrue(changeStream, *matchExpr, matchingNss2, fromShard, toShard);
assertMoveChunkFalse(changeStream, *matchExpr, nonMatchingNss, fromShard, toShard);
}
TEST_F(
@ -440,31 +425,8 @@ TEST_F(
}
// Ensure 'controlEventFilter' matches all moveChunk events.
{
auto moveChunkOplogEntriesWithNss1 =
buildMoveChunkOplogEntries(opCtx(),
nss1,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithNss1BSON = moveChunkOplogEntriesWithNss1[0].toBSON();
BSONMatchableDocument matchingDoc1(moveChunkWithNss1BSON);
ASSERT_TRUE(exec::matcher::matches(matchExpr.get(), &matchingDoc1));
auto moveChunkOplogEntriesWithNss2 =
buildMoveChunkOplogEntries(opCtx(),
nss2,
boost::none /* collUUID */,
fromShard,
toShard,
true /* noMoreCollectionChunksOnDonor */,
false /* firstCollectionChunkOnRecipient */);
auto moveChunkWithNss2BSON = moveChunkOplogEntriesWithNss2[0].toBSON();
BSONMatchableDocument matchingDoc2(moveChunkWithNss2BSON);
ASSERT_TRUE(exec::matcher::matches(matchExpr.get(), &matchingDoc2));
}
assertMoveChunkTrue(changeStream, *matchExpr, nss1, fromShard, toShard);
assertMoveChunkTrue(changeStream, *matchExpr, nss2, fromShard, toShard);
}
TEST_F(

View File

@ -30,9 +30,9 @@
#include "mongo/s/change_streams/control_events.h"
#include "mongo/db/namespace_spec_gen.h"
#include "mongo/util/database_name_util.h"
#include <boost/optional/optional.hpp>
#include <fmt/format.h>
namespace mongo {
namespace {
@ -94,7 +94,24 @@ NamespacePlacementChangedControlEvent NamespacePlacementChangedControlEvent::cre
auto nsField = assertFieldType(event, kNamespaceField, BSONType::object).getDocument();
auto nssSpec = NamespaceSpec::parse(nsField.toBson(),
IDLParserContext("NamespacePlacementChangedControlEvent"));
NamespaceString nss = NamespaceStringUtil::deserialize(*nssSpec.getDb(), *nssSpec.getColl());
NamespaceString nss = [&]() {
if (nssSpec.getDb()) {
if (nssSpec.getColl()) {
return NamespaceStringUtil::deserialize(*nssSpec.getDb(), *nssSpec.getColl());
} else {
return NamespaceString(*nssSpec.getDb());
}
}
tassert(11161100,
fmt::format("Invalid NamespacePlacementChangedControlEvent. Collection name is "
"provided, but no database name is present {}",
event.toString()),
!nssSpec.getColl());
return NamespaceString::kEmpty;
}();
return NamespacePlacementChangedControlEvent{clusterTime, nss};
}

View File

@ -109,6 +109,48 @@ TEST(
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
}
TEST(
ControlEventTest,
GivenValidNamespacePlacementChangedControlEventAsDocumentWithDbFieldOnly_WhenCallingParseControlEvent_ThenParsingIsSuccessful) {
Timestamp ts;
NamespaceString nss(NamespaceString::kDefaultInitialSyncIdNamespace.dbName());
auto nssSpec = [&]() {
NamespaceSpec nssSpec;
nssSpec.setDb(nss.dbName());
return nssSpec;
}();
Document event =
Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType
<< "clusterTime" << ts << "ns" << nssSpec.toBSON()));
ControlEvent expectedControlEvent = NamespacePlacementChangedControlEvent{ts, nss};
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
}
TEST(
ControlEventTest,
GivenValidNamespacePlacementChangedControlEventAsDocumentForTheWholeCluster_WhenCallingParseControlEvent_ThenParsingIsSuccessful) {
Timestamp ts;
Document event =
Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType
<< "clusterTime" << ts << "ns" << NamespaceSpec().toBSON()));
ControlEvent expectedControlEvent =
NamespacePlacementChangedControlEvent{ts, NamespaceString::kEmpty};
ASSERT_EQ(parseControlEvent(event), expectedControlEvent);
}
TEST(
ControlEventTest,
GivenInvalidNamespacePlacementChangedControlEventAsDocumentWithOnlyCollectionBeingProvided_WhenCallingParseControlEvent_ThenParsingThrowsAnException) {
Document event =
Document(BSON("operationType" << NamespacePlacementChangedControlEvent::opType));
ASSERT_THROWS(parseControlEvent(event), DBException);
}
TEST(
ControlEventTest,
GivenInvalidNamespacePlacementChangedControlEventAsDocument_WhenCallingParseControlEvent_ThenParsingThrowsAnException) {

View File

@ -187,7 +187,16 @@ DB.prototype.runReadCommand = function (obj, extra, queryOptions) {
// runCommand uses this impl to actually execute the command
DB.prototype._runCommandImpl = function (name, obj, options) {
const session = this.getSession();
return session._getSessionAwareClient().runCommand(session, name, obj, options);
const result = session._getSessionAwareClient().runCommand(session, name, obj, options);
// Set change stream version into the result object for easier assertions.
if (obj.pipeline && obj.pipeline.length > 0 && obj.pipeline[0].$changeStream) {
// TODO: SERVER-52253 Enable feature flag for Improved change stream handling of cluster topology changes.
const changeStreamVersion = obj.pipeline[0].$changeStream.version ?? "v1";
result._changeStreamVersion = changeStreamVersion;
}
return result;
};
DB.prototype.runCommand = function (obj, extra, queryOptions) {

View File

@ -628,6 +628,11 @@ Mongo.prototype._extractChangeStreamOptions = function (options) {
options.maxAwaitTimeMS = 15 * 1000;
}
if (options.hasOwnProperty("version")) {
changeStreamOptions.version = options.version;
delete options.version;
}
return [{$changeStream: changeStreamOptions}, options];
};

View File

@ -656,6 +656,10 @@ function DBCommandCursor(db, cmdResult, batchSize, maxAwaitTimeMS, txnNumber) {
// If the command result represents a change stream cursor, update our postBatchResumeToken.
this._updatePostBatchResumeToken(cmdResult.cursor);
if (cmdResult._changeStreamVersion) {
this._changeStreamVersion = cmdResult._changeStreamVersion;
}
this._cursorid = cmdResult.cursor.id;
this._batchSize = batchSize;
this._maxAwaitTimeMS = maxAwaitTimeMS;
@ -808,6 +812,10 @@ DBCommandCursor.prototype.getResumeToken = function () {
return this._resumeToken;
};
DBCommandCursor.prototype.getChangeStreamVersion = function () {
return this._changeStreamVersion;
};
DBCommandCursor.prototype.getClusterTime = function () {
// Return the read timestamp for snapshot reads, or undefined for other readConcern levels.
return this._atClusterTime;
@ -828,6 +836,7 @@ DBCommandCursor.prototype.help = function () {
print(
"\t.getResumeToken() - for a change stream cursor, obtains the most recent valid resume token, if it exists.",
);
print("\t.getChangeStreamVersion() - for a change stream cursor, obtains the change stream version, if it exists.");
print("\t.getClusterTime() - returns the read timestamp for snapshot reads.");
print("\t.pretty() - pretty print each document, possibly over multiple lines");
print("\t.close()");