diff --git a/jstests/change_stream_fsm/test_change_stream_sharding_command_generator.js b/jstests/change_stream_fsm/test_change_stream_sharding_command_generator.js index 62966763c64..a0760c78d45 100644 --- a/jstests/change_stream_fsm/test_change_stream_sharding_command_generator.js +++ b/jstests/change_stream_fsm/test_change_stream_sharding_command_generator.js @@ -367,7 +367,13 @@ describe("ChangeStreamReader integration", function () { const numTotalInserts = numCommands * InsertDocCommand.numDocs; const insertCommands = []; for (let i = 0; i < numCommands; i++) { - insertCommands.push(new InsertDocCommand(dbName, collName, ctx.shards, {exists: true, nonEmpty: i > 0})); + insertCommands.push( + new InsertDocCommand({ + dbName, + collName, + collectionCtx: {exists: true}, + }), + ); } Writer.run(ctx.st.s, writerInstanceName, insertCommands, TEST_SEED); @@ -429,12 +435,12 @@ describe("ChangeStreamReader integration", function () { const expectedEventTypes = ["insert", "insert", "insert", "drop", "invalidate"]; // Build commands: 3 InsertDocCommands + drop. - const collectionCtx = {exists: true, nonEmpty: false}; + const collectionCtx = {exists: true}; const commands = [ - new InsertDocCommand(dbName, collName, this.shards, collectionCtx), - new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}), - new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}), - new DropCollectionCommand(dbName, collName, this.shards, collectionCtx), + new InsertDocCommand({dbName, collName, collectionCtx}), + new InsertDocCommand({dbName, collName, collectionCtx}), + new InsertDocCommand({dbName, collName, collectionCtx}), + new DropCollectionCommand({dbName, collName, shardSet: this.shards}), ]; /** @@ -528,11 +534,11 @@ describe("ChangeStreamReader integration", function () { const expectedEventTypes = ["insert", "insert", "drop", "invalidate"]; // Build commands: 2 InsertDocCommands + drop. - const collectionCtx = {exists: true, nonEmpty: false}; + const collectionCtx = {exists: true}; const commands = [ - new InsertDocCommand(dbName, collName, this.shards, collectionCtx), - new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}), - new DropCollectionCommand(dbName, collName, this.shards, collectionCtx), + new InsertDocCommand({dbName, collName, collectionCtx}), + new InsertDocCommand({dbName, collName, collectionCtx}), + new DropCollectionCommand({dbName, collName, shardSet: this.shards}), ]; const setupCollection = () => { @@ -622,12 +628,12 @@ describe("ChangeStreamReader integration", function () { const expectedEventTypes = ["insert", "insert", "insert", "insert"]; // Build commands: 2 InsertDocCommands into each collection (interleaved). - const collectionCtx = {exists: true, nonEmpty: false}; + const collectionCtx = {exists: true}; const commands = [ - new InsertDocCommand(dbName, collName1, this.shards, collectionCtx), - new InsertDocCommand(dbName, collName2, this.shards, collectionCtx), - new InsertDocCommand(dbName, collName1, this.shards, {...collectionCtx, nonEmpty: true}), - new InsertDocCommand(dbName, collName2, this.shards, {...collectionCtx, nonEmpty: true}), + new InsertDocCommand({dbName, collName: collName1, collectionCtx}), + new InsertDocCommand({dbName, collName: collName2, collectionCtx}), + new InsertDocCommand({dbName, collName: collName1, collectionCtx}), + new InsertDocCommand({dbName, collName: collName2, collectionCtx}), ]; // Setup: drop and recreate collections (drop to ensure clean state before test). diff --git a/jstests/libs/util/change_stream/change_stream_commands.js b/jstests/libs/util/change_stream/change_stream_commands.js index fe49fae5126..7cbab6d480c 100644 --- a/jstests/libs/util/change_stream/change_stream_commands.js +++ b/jstests/libs/util/change_stream/change_stream_commands.js @@ -67,13 +67,17 @@ function getDbPrimary(connection, dbName) { /** * Base command class. + * + * All Command subclasses take a single options object so callers always pass + * named fields. This avoids positional-arg bugs (e.g. silently dropping + * primaryShard by passing it where collectionCtx is expected). */ class Command { - constructor(dbName, collName, shardSet, collectionCtx = {}) { + constructor({dbName = null, collName = null, shardSet = null, collectionCtx = null} = {}) { this.dbName = dbName; this.collName = collName; this.shardSet = shardSet; - this.collectionCtx = collectionCtx; + this.collectionCtx = collectionCtx ?? {}; } /** @@ -151,8 +155,8 @@ class InsertDocCommand extends Command { } } - constructor(dbName, collName, shardSet, collectionCtx, documents = null) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, collectionCtx, documents = null}) { + super({dbName, collName, collectionCtx}); if (documents) { this.documents = documents; } else { @@ -208,12 +212,21 @@ class InsertDocCommand extends Command { /** * Create database command. - * Targets a random primary shard from the shard set using the seeded PRNG. + * + * Takes an options object to avoid positional-arg confusion (in particular, + * silently dropping primaryShard by passing it where collectionCtx is expected). + * + * If primaryShard is omitted, one is picked at random from shardSet using the + * seeded PRNG. Caller must provide either primaryShard or a non-empty shardSet. */ class CreateDatabaseCommand extends Command { - constructor(dbName, collName, shardSet, collectionCtx, primaryShard = null) { - super(dbName, collName, shardSet, collectionCtx); - this.primaryShard = primaryShard ?? shardSet[Random.randInt(shardSet.length)]._id; + constructor({dbName, shardSet = null, primaryShard = null}) { + super({dbName, shardSet}); + assert( + primaryShard || (shardSet && shardSet.length), + "CreateDatabaseCommand requires primaryShard or a non-empty shardSet", + ); + this.primaryShard = primaryShard || shardSet[Random.randInt(shardSet.length)]._id; } execute(connection) { @@ -232,12 +245,13 @@ class CreateDatabaseCommand extends Command { /** * Create unsplittable collection command. - * Targets a random shard from the shard set using the seeded PRNG. + * If dataShard is omitted, one is picked at random from shardSet using the + * seeded PRNG. */ class CreateUnsplittableCollectionCommand extends Command { - constructor(dbName, collName, shardSet, collectionCtx, dataShard = null) { - super(dbName, collName, shardSet, collectionCtx); - this.dataShard = dataShard ?? shardSet[Random.randInt(shardSet.length)]._id; + constructor({dbName, collName, shardSet, collectionCtx, dataShard = null}) { + super({dbName, collName, shardSet, collectionCtx}); + this.dataShard = dataShard ?? (shardSet?.length ? shardSet[Random.randInt(shardSet.length)]._id : null); } execute(connection) { @@ -261,8 +275,16 @@ class CreateUnsplittableCollectionCommand extends Command { /** * Create untracked collection command. + * + * collectionCtx defaults to {exists: false} since this command is by definition used to + * bring a collection into existence — that's the value callers would otherwise have to + * pass in every call site. */ class CreateUntrackedCollectionCommand extends Command { + constructor({dbName, collName, collectionCtx = {exists: false}} = {}) { + super({dbName, collName, collectionCtx}); + } + execute(connection) { assert.commandWorked(connection.getDB(this.dbName).createCollection(this.collName)); } @@ -291,8 +313,10 @@ class CreateUntrackedCollectionCommand extends Command { * This was verified empirically through testing. */ class DropCollectionCommand extends Command { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + // Default to {exists: true}: the common case is dropping an existing collection, + // so callers shouldn't have to spell that out at every call site. + constructor({dbName, collName, shardSet, collectionCtx = {exists: true}}) { + super({dbName, collName, shardSet, collectionCtx}); } execute(connection) { @@ -338,10 +362,6 @@ class DropCollectionCommand extends Command { * This means we only emit the 'dropDatabase' event, not individual 'drop' events. */ class DropDatabaseCommand extends Command { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); - } - execute(connection) { assert.commandWorked(connection.getDB(this.dbName).dropDatabase()); } @@ -497,11 +517,10 @@ class CreateIndexCommand extends Command { * @param {string} dbName - Database name. * @param {string} collName - Collection name. * @param {Array} shardSet - Array of shard objects. - * @param {Object} collectionCtx - Collection state (shardKeySpec = current shard key). * @param {Object} indexSpec - The index specification to create. */ - constructor(dbName, collName, shardSet, collectionCtx, indexSpec) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, indexSpec}) { + super({dbName, collName, shardSet}); assert(indexSpec, "indexSpec must be provided to CreateIndexCommand"); this.indexSpec = indexSpec; } @@ -537,11 +556,10 @@ class DropIndexCommand extends Command { * @param {string} dbName - Database name. * @param {string} collName - Collection name. * @param {Array} shardSet - Array of shard objects. - * @param {Object} collectionCtx - Collection state (shardKeySpec = current shard key). * @param {Object} indexSpec - The index specification to drop. */ - constructor(dbName, collName, shardSet, collectionCtx, indexSpec) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, indexSpec}) { + super({dbName, collName, shardSet}); assert(indexSpec, "indexSpec must be provided to DropIndexCommand"); this.indexSpec = indexSpec; } @@ -585,8 +603,8 @@ class ShardCollectionCommand extends Command { * @param {Object} collectionCtx - Collection state. * @param {Object} shardKey - The shard key to use for sharding. */ - constructor(dbName, collName, shardSet, collectionCtx, shardKey) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, collectionCtx, shardKey}) { + super({dbName, collName, shardSet, collectionCtx}); assert(shardKey, "shardKey must be provided to ShardCollectionCommand"); this.shardKey = shardKey; } @@ -653,10 +671,6 @@ class ShardCollectionCommand extends Command { * Precondition (guaranteed by FSM): collection exists and is sharded. */ class UnshardCollectionCommand extends Command { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); - } - execute(connection) { const dbDoc = connection.getDB("config").databases.findOne({_id: this.dbName}); assert(dbDoc, `${this}: database ${this.dbName} not in config.databases`); @@ -711,19 +725,19 @@ class ReshardCollectionCommand extends Command { * @param {string} collName - Collection name. * @param {Array} shardSet - Array of shard objects. * @param {Object} collectionCtx - Collection state. - * @param {Object} newShardKey - The new shard key to reshard to. + * @param {Object} shardKey - The new shard key to reshard to. */ - constructor( + constructor({ dbName, collName, shardSet, collectionCtx, - newShardKey, + shardKey, numInitialChunks = ReshardCollectionCommand.numInitialChunks, - ) { - super(dbName, collName, shardSet, collectionCtx); - assert(newShardKey, "newShardKey must be provided to ReshardCollectionCommand"); - this.newShardKey = newShardKey; + }) { + super({dbName, collName, shardSet, collectionCtx}); + assert(shardKey, "shardKey must be provided to ReshardCollectionCommand"); + this.shardKey = shardKey; this.numInitialChunks = numInitialChunks; } @@ -735,11 +749,11 @@ class ReshardCollectionCommand extends Command { _reconfigureZonesForShardSet(connection, ns, this.shardSet); const zoneName = _getZoneName(ns); - const shardKeyField = Object.keys(this.newShardKey)[0]; + const shardKeyField = Object.keys(this.shardKey)[0]; assert.commandWorked( connection.adminCommand({ reshardCollection: ns, - key: this.newShardKey, + key: this.shardKey, numInitialChunks: this.numInitialChunks, zones: [ { @@ -753,7 +767,7 @@ class ReshardCollectionCommand extends Command { } toString() { - const type = isHashedShardKey(this.newShardKey) ? "hashed" : "range"; + const type = isHashedShardKey(this.shardKey) ? "hashed" : "range"; return `ReshardCollectionCommand(${type})`; } @@ -783,8 +797,13 @@ class ReshardCollectionCommand extends Command { class RenameCommand extends Command { // Subclasses must set: this.targetShouldExist, this.crossDatabase. - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, collectionCtx, dropAfterRename = true}) { + super({dbName, collName, shardSet, collectionCtx}); + // When true, drops the renamed-to collection at the end of execute() so the same + // rename can run again later in a long-lived test (FSM cleanup). Tests that need + // to observe the post-rename state should set this to false and drop the renamed + // collection themselves. + this.dropAfterRename = dropAfterRename; } execute(connection) { @@ -803,8 +822,9 @@ class RenameCommand extends Command { }), ); - // Drop the renamed collection to clean up for subsequent renames. - assert.commandWorked(connection.getDB(targetDb).runCommand({drop: targetColl})); + if (this.dropAfterRename) { + assert.commandWorked(connection.getDB(targetDb).runCommand({drop: targetColl})); + } } toString() { @@ -860,32 +880,32 @@ class RenameCommand extends Command { // Concrete rename command classes. class RenameToNonExistentSameDbCommand extends RenameCommand { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor(opts) { + super(opts); this.targetShouldExist = false; this.crossDatabase = false; } } class RenameToExistentSameDbCommand extends RenameCommand { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor(opts) { + super(opts); this.targetShouldExist = true; this.crossDatabase = false; } } class RenameToNonExistentDifferentDbCommand extends RenameCommand { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor(opts) { + super(opts); this.targetShouldExist = false; this.crossDatabase = true; } } class RenameToExistentDifferentDbCommand extends RenameCommand { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor(opts) { + super(opts); this.targetShouldExist = true; this.crossDatabase = true; } @@ -956,8 +976,8 @@ class MoveCommandBase extends Command { * Moves the primary shard for a database to a different shard. */ class MovePrimaryCommand extends MoveCommandBase { - constructor(dbName, collName, shardSet, collectionCtx, targetShard = null) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, collectionCtx, targetShard = null}) { + super({dbName, collName, shardSet, collectionCtx}); this.targetShard = targetShard; } @@ -1040,8 +1060,8 @@ class MoveCollectionCommand extends MoveCommandBase { * are reported by getChangeEvents(). */ class MoveChunkCommand extends MoveCommandBase { - constructor(dbName, collName, shardSet, collectionCtx) { - super(dbName, collName, shardSet, collectionCtx); + constructor({dbName, collName, shardSet, collectionCtx}) { + super({dbName, collName, shardSet, collectionCtx}); // Always insert enough documents for proper chunk distribution, // even when the collection already has data — prior inserts may // have fewer docs than shardSet.length requires for splitting. diff --git a/jstests/libs/util/change_stream/change_stream_sharding_command_generator.js b/jstests/libs/util/change_stream/change_stream_sharding_command_generator.js index 361c92e09cb..203ee1f8508 100644 --- a/jstests/libs/util/change_stream/change_stream_sharding_command_generator.js +++ b/jstests/libs/util/change_stream/change_stream_sharding_command_generator.js @@ -75,19 +75,20 @@ class ShardingCommandGenerator { const CommandClass = ShardingCommandGenerator.actionToCommandClass[action]; assert(CommandClass !== undefined, `No command class found for action ${action}`); + const baseOpts = { + dbName: params.getDbName(), + collName: params.getCollName(), + shardSet: params.getShardSet(), + collectionCtx: {...collectionCtx}, + }; + // Commands that need the target shard key as a separate parameter. if (CommandClass === ShardCollectionCommand || CommandClass === ReshardCollectionCommand) { assert(targetShardKey, `${CommandClass.name} requires targetShardKey`); - return new CommandClass( - params.getDbName(), - params.getCollName(), - params.getShardSet(), - {...collectionCtx}, - targetShardKey, - ); + return new CommandClass({...baseOpts, shardKey: targetShardKey}); } - return new CommandClass(params.getDbName(), params.getCollName(), params.getShardSet(), {...collectionCtx}); + return new CommandClass(baseOpts); } /** @@ -463,13 +464,13 @@ class ShardingCommandGenerator { const shardCtx = {...ctx, exists: false}; commands.push( - new ShardCollectionCommand( - params.getDbName(), - params.getCollName(), - params.getShardSet(), - shardCtx, - targetShardKey, - ), + new ShardCollectionCommand({ + dbName: params.getDbName(), + collName: params.getCollName(), + shardSet: params.getShardSet(), + collectionCtx: shardCtx, + shardKey: targetShardKey, + }), ); // Update context - collection is now sharded ctx.exists = true; @@ -480,13 +481,12 @@ class ShardingCommandGenerator { // Create shard key index if action requires it (collection must already exist). if (ShardingCommandGenerator.actionsRequiringIndex.has(action)) { commands.push( - new CreateIndexCommand( - params.getDbName(), - params.getCollName(), - params.getShardSet(), - ctx, - targetShardKey, - ), + new CreateIndexCommand({ + dbName: params.getDbName(), + collName: params.getCollName(), + shardSet: params.getShardSet(), + indexSpec: targetShardKey, + }), ); } // NOTE: We intentionally do NOT drop indexes before dropping collections. @@ -496,10 +496,14 @@ class ShardingCommandGenerator { // Drop collection before dropping database (simplifies change event matching). if (action === Action.DROP_DATABASE && collectionCtx.exists) { // NOTE: We do NOT drop indexes here - see comment above about dropIndexes coverage. - // Pass a COPY to DropCollectionCommand so it sees exists:true. - // We'll set ctx.exists = false for the subsequent DropDatabaseCommand. + // DropCollectionCommand defaults to {exists: true}, which is what we want here + // (we only enter this branch when ctx.exists is true). commands.push( - new DropCollectionCommand(params.getDbName(), params.getCollName(), params.getShardSet(), {...ctx}), + new DropCollectionCommand({ + dbName: params.getDbName(), + collName: params.getCollName(), + shardSet: params.getShardSet(), + }), ); // After dropping collection, update context for DropDatabaseCommand to reflect that collection no longer exists. ctx.exists = false; @@ -519,18 +523,13 @@ class ShardingCommandGenerator { const shouldDropIndex = action === Action.UNSHARD_COLLECTION || bsonWoCompare(oldShardKey, targetShardKey) !== 0; if (shouldDropIndex && oldShardKey) { - // Update context for DropIndexCommand's event count calculation: - // - After reshard: shardKeySpec = new shard key (collection still sharded) - // - After unshard: shardKeySpec = null (collection is now untracked, no shard key) - const postActionCtx = {...ctx, shardKeySpec: targetShardKey}; commands.push( - new DropIndexCommand( - params.getDbName(), - params.getCollName(), - params.getShardSet(), - postActionCtx, - oldShardKey, - ), + new DropIndexCommand({ + dbName: params.getDbName(), + collName: params.getCollName(), + shardSet: params.getShardSet(), + indexSpec: oldShardKey, + }), ); } } diff --git a/jstests/libs/util/change_stream/change_stream_sharding_utils.js b/jstests/libs/util/change_stream/change_stream_sharding_utils.js index e151fec9b75..77428a5b2b1 100644 --- a/jstests/libs/util/change_stream/change_stream_sharding_utils.js +++ b/jstests/libs/util/change_stream/change_stream_sharding_utils.js @@ -269,7 +269,7 @@ function ensureDatabasesExist(writerDefs, mongos, fsmShards) { if (startState !== State.DATABASE_ABSENT && !createdDbs.has(w.dbName)) { createdDbs.add(w.dbName); mongos.getDB(w.dbName).dropDatabase(); - new CreateDatabaseCommand(w.dbName, w.collName, fsmShards).execute(mongos); + new CreateDatabaseCommand({dbName: w.dbName, shardSet: fsmShards}).execute(mongos); } } } diff --git a/jstests/libs/util/change_stream/change_stream_timeseries_commands.js b/jstests/libs/util/change_stream/change_stream_timeseries_commands.js index 2e7c41b12f7..7bce539c1bc 100644 --- a/jstests/libs/util/change_stream/change_stream_timeseries_commands.js +++ b/jstests/libs/util/change_stream/change_stream_timeseries_commands.js @@ -7,7 +7,7 @@ import {ChangeStreamWatchMode} from "jstests/libs/query/change_stream_util.js"; */ export class CreateTimeseriesCollectionCommand extends Command { constructor({dbName, collName, timeField, metaField}) { - super(dbName, collName, /* shardSet */ null, /* collectionCtx */ {}); + super({dbName, collName}); this.timeField = timeField; this.metaField = metaField; } @@ -81,7 +81,7 @@ export class TimeseriesInsertCommand extends Command { * If true, this event is only visible when ctx.rawData === true. */ constructor({insertNss, eventNss, insertDoc, expectedFullDocument, requiresRawData = false}) { - super(insertNss.db, insertNss.coll, /* shardSet */ null, /* collectionCtx */ {}); + super({dbName: insertNss.db, collName: insertNss.coll}); this.insertNss = insertNss; this.eventNss = eventNss; this.insertDoc = insertDoc; @@ -139,7 +139,7 @@ export class TimeseriesInsertCommand extends Command { export class FCVUpgradeCommand extends Command { constructor({timeseriesCollections} = {}) { - super(/* dbName */ null, /* collName */ null, /* shardSet */ null, /* collectionCtx */ {}); + super(); this.timeseriesCollections = timeseriesCollections; } @@ -185,7 +185,7 @@ export class FCVUpgradeCommand extends Command { export class FCVDowngradeCommand extends Command { constructor({timeseriesCollections, targetFCV} = {}) { - super(/* dbName */ null, /* collName */ null, /* shardSet */ null, /* collectionCtx */ {}); + super(); this.timeseriesCollections = timeseriesCollections; this.targetFCV = targetFCV; } diff --git a/jstests/multiVersion/genericChangeStreams/change_stream_v2_fcv_upgrade_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_stream_v2_fcv_upgrade_downgrade.js index 120bbdd3cd7..9769bf017a5 100644 --- a/jstests/multiVersion/genericChangeStreams/change_stream_v2_fcv_upgrade_downgrade.js +++ b/jstests/multiVersion/genericChangeStreams/change_stream_v2_fcv_upgrade_downgrade.js @@ -36,7 +36,7 @@ import {removeShard} from "jstests/sharding/libs/remove_shard_util.js"; * The command auto-generates a unique _id and knows which change events it produces. */ function makeInsertCmd(dbName, collName) { - return new InsertDocCommand(dbName, collName, /* shardSet */ null, /* collectionCtx */ {exists: true}); + return new InsertDocCommand({dbName, collName, collectionCtx: {exists: true}}); } /** diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_timeseries_fcv_upgrade_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_streams_timeseries_fcv_upgrade_downgrade.js index 2041f4719df..c622770a242 100644 --- a/jstests/multiVersion/genericChangeStreams/change_streams_timeseries_fcv_upgrade_downgrade.js +++ b/jstests/multiVersion/genericChangeStreams/change_streams_timeseries_fcv_upgrade_downgrade.js @@ -42,10 +42,6 @@ function makeNss(dbName, collName) { return {db: dbName, coll: collName}; } -function createDbCmdOnShard(dbName, shard) { - return new CreateDatabaseCommand(dbName, null, null, null, shard); -} - function withChangeStreamTest(db, fn) { const cst = new ChangeStreamTest(db); try { @@ -295,8 +291,11 @@ for (const [downgradeFCV, testHelper] of crossProduct( } return db1Name == db2Name - ? [createDbCmdOnShard(db1Name, db1PrimaryShard)] - : [createDbCmdOnShard(db1Name, db1PrimaryShard), createDbCmdOnShard(db2Name, db2PrimaryShard)]; + ? [new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard})] + : [ + new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard}), + new CreateDatabaseCommand({dbName: db2Name, primaryShard: db2PrimaryShard}), + ]; })(); const createTs1CollCmd = new CreateTimeseriesCollectionCommand({ @@ -621,8 +620,11 @@ for (const [downgradeFCV, testHelper] of crossProduct( } return db1Name == db2Name - ? [createDbCmdOnShard(db1Name, db1PrimaryShard)] - : [createDbCmdOnShard(db1Name, db1PrimaryShard), createDbCmdOnShard(db2Name, db2PrimaryShard)]; + ? [new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard})] + : [ + new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard}), + new CreateDatabaseCommand({dbName: db2Name, primaryShard: db2PrimaryShard}), + ]; })(); const createTs1CollCmd = new CreateTimeseriesCollectionCommand({ diff --git a/jstests/sharding/query/change_streams/change_stream_collection_v2_strict_whitebox.js b/jstests/sharding/query/change_streams/change_stream_collection_v2_strict_whitebox.js index f88264f93db..db65e673123 100644 --- a/jstests/sharding/query/change_streams/change_stream_collection_v2_strict_whitebox.js +++ b/jstests/sharding/query/change_streams/change_stream_collection_v2_strict_whitebox.js @@ -84,7 +84,7 @@ describe("collection v2 strict whitebox", function () { }); beforeEach(function () { - new CreateDatabaseCommand(dbName, null, null, null, st.shard0.shardName).execute(st.s); + new CreateDatabaseCommand({dbName, primaryShard: st.shard0.shardName}).execute(st.s); }); afterEach(function () { @@ -105,7 +105,13 @@ describe("collection v2 strict whitebox", function () { it("sharded collection across two shards - initial placement", function () { coll = db[collName]; - new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}).execute(st.s); + new ShardCollectionCommand({ + dbName, + collName, + shardSet: allShards, + collectionCtx: {exists: false}, + shardKey: {_id: 1}, + }).execute(st.s); distributeCollectionDataOverShards(db, coll, { middle: {_id: 0}, chunks: [ @@ -135,7 +141,7 @@ describe("collection v2 strict whitebox", function () { it("unsharded collection - single shard cursor", function () { coll = db[collName]; - new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s); + new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s); const comment = "strict_unsharded"; csTest = new ChangeStreamTest(db); @@ -175,7 +181,7 @@ describe("collection v2 strict whitebox", function () { ); // Create DB to trigger DbAbsent -> DbPresent transition. - new CreateDatabaseCommand(dbName, null, null, null, st.shard2.shardName).execute(st.s); + new CreateDatabaseCommand({dbName, primaryShard: st.shard2.shardName}).execute(st.s); awaitLogMessageCodes(st.s, [kDbAbsentEvent], () => csTest.assertNoChange(csCursor), { [kDbAbsentEvent]: (attr) => assertExpectedShardsInLog(attr, 1), }); @@ -192,7 +198,12 @@ describe("collection v2 strict whitebox", function () { assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 10}})); const docs = [{_id: -11}, {_id: 0}, {_id: 11}]; - new InsertDocCommand(dbName, collName, allShards, {exists: true, shardKeySpec: {_id: 1}}, docs).execute(st.s); + new InsertDocCommand({ + dbName, + collName, + collectionCtx: {exists: true, shardKeySpec: {_id: 1}}, + documents: docs, + }).execute(st.s); const comment = "strict_movechunk_lifecycle"; csTest = new ChangeStreamTest(db); @@ -280,7 +291,7 @@ describe("collection v2 strict whitebox", function () { const originalShardId = st.shard0.shardName; const targetShardId = st.shard1.shardName; - new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s); + new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s); const comment = "strict_move_primary"; csTest = new ChangeStreamTest(db); @@ -295,7 +306,7 @@ describe("collection v2 strict whitebox", function () { }); assertOpenCursors(st, [originalShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment)); - new MovePrimaryCommand(dbName, collName, null, null, targetShardId).execute(st.s); + new MovePrimaryCommand({dbName, collName, targetShard: targetShardId}).execute(st.s); awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor)); assertOpenCursors(st, [targetShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment)); }); @@ -309,8 +320,19 @@ describe("collection v2 strict whitebox", function () { {_id: 2, a: 2}, ]; [ - new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}), - new InsertDocCommand(dbName, collName, null, {exists: true, shardKeySpec: {_id: 1}}, docs), + new ShardCollectionCommand({ + dbName, + collName, + shardSet: allShards, + collectionCtx: {exists: false}, + shardKey: {_id: 1}, + }), + new InsertDocCommand({ + dbName, + collName, + collectionCtx: {exists: true, shardKeySpec: {_id: 1}}, + documents: docs, + }), ].forEach((cmd) => cmd.execute(st.s)); distributeCollectionDataOverShards(db, coll, { middle: {_id: 0}, @@ -343,7 +365,14 @@ describe("collection v2 strict whitebox", function () { const collCtx = {exists: true, shardKeySpec: {_id: 1}}; const newShardKey = {a: 1}; const chunkCount = 2; - new ReshardCollectionCommand(dbName, collName, targetShards, collCtx, newShardKey, chunkCount).execute(st.s); + new ReshardCollectionCommand({ + dbName, + collName, + shardSet: targetShards, + collectionCtx: collCtx, + shardKey: newShardKey, + numInitialChunks: chunkCount, + }).execute(st.s); awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor)); assertOpenCursors( st, diff --git a/jstests/sharding/query/change_streams/change_stream_database_v2_strict_whitebox.js b/jstests/sharding/query/change_streams/change_stream_database_v2_strict_whitebox.js index 7d70b3363e9..0ba6921687f 100644 --- a/jstests/sharding/query/change_streams/change_stream_database_v2_strict_whitebox.js +++ b/jstests/sharding/query/change_streams/change_stream_database_v2_strict_whitebox.js @@ -87,7 +87,7 @@ describe("database v2 strict whitebox", function () { }); beforeEach(function () { - new CreateDatabaseCommand(dbName, null, null, null, st.shard0.shardName).execute(st.s); + new CreateDatabaseCommand({dbName, primaryShard: st.shard0.shardName}).execute(st.s); }); afterEach(function () { @@ -108,7 +108,13 @@ describe("database v2 strict whitebox", function () { it("sharded collection across two shards - initial placement", function () { coll = db[collName]; - new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}).execute(st.s); + new ShardCollectionCommand({ + dbName, + collName, + shardSet: allShards, + collectionCtx: {exists: false}, + shardKey: {_id: 1}, + }).execute(st.s); distributeCollectionDataOverShards(db, coll, { middle: {_id: 0}, chunks: [ @@ -138,7 +144,7 @@ describe("database v2 strict whitebox", function () { it("unsharded collection - single shard cursor", function () { coll = db[collName]; - new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s); + new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s); const comment = "db_strict_unsharded"; csTest = new ChangeStreamTest(db); @@ -178,7 +184,7 @@ describe("database v2 strict whitebox", function () { ); // Create DB to trigger DbAbsent -> DbPresent transition. - new CreateDatabaseCommand(dbName, null, null, null, st.shard2.shardName).execute(st.s); + new CreateDatabaseCommand({dbName, primaryShard: st.shard2.shardName}).execute(st.s); awaitLogMessageCodes(st.s, [kDbAbsentEvent], () => csTest.assertNoChange(csCursor), { [kDbAbsentEvent]: (attr) => assertExpectedShardsInLog(attr, 1), }); @@ -195,7 +201,12 @@ describe("database v2 strict whitebox", function () { assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 10}})); const docs = [{_id: -11}, {_id: 0}, {_id: 11}]; - new InsertDocCommand(dbName, collName, allShards, {exists: true, shardKeySpec: {_id: 1}}, docs).execute(st.s); + new InsertDocCommand({ + dbName, + collName, + collectionCtx: {exists: true, shardKeySpec: {_id: 1}}, + documents: docs, + }).execute(st.s); const comment = "db_strict_movechunk_lifecycle"; csTest = new ChangeStreamTest(db); @@ -269,7 +280,7 @@ describe("database v2 strict whitebox", function () { const originalShardId = st.shard0.shardName; const targetShardId = st.shard1.shardName; - new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s); + new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s); const comment = "db_strict_move_primary"; csTest = new ChangeStreamTest(db); @@ -284,7 +295,7 @@ describe("database v2 strict whitebox", function () { }); assertOpenCursors(st, [originalShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment)); - new MovePrimaryCommand(dbName, collName, null, null, targetShardId).execute(st.s); + new MovePrimaryCommand({dbName, collName, targetShard: targetShardId}).execute(st.s); awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor)); assertOpenCursors(st, [targetShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment)); }); @@ -298,8 +309,19 @@ describe("database v2 strict whitebox", function () { {_id: 2, a: 2}, ]; [ - new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}), - new InsertDocCommand(dbName, collName, null, {exists: true, shardKeySpec: {_id: 1}}, docs), + new ShardCollectionCommand({ + dbName, + collName, + shardSet: allShards, + collectionCtx: {exists: false}, + shardKey: {_id: 1}, + }), + new InsertDocCommand({ + dbName, + collName, + collectionCtx: {exists: true, shardKeySpec: {_id: 1}}, + documents: docs, + }), ].forEach((cmd) => cmd.execute(st.s)); distributeCollectionDataOverShards(db, coll, { middle: {_id: 0}, @@ -332,7 +354,14 @@ describe("database v2 strict whitebox", function () { const collCtx = {exists: true, shardKeySpec: {_id: 1}}; const newShardKey = {a: 1}; const chunkCount = 2; - new ReshardCollectionCommand(dbName, collName, targetShards, collCtx, newShardKey, chunkCount).execute(st.s); + new ReshardCollectionCommand({ + dbName, + collName, + shardSet: targetShards, + collectionCtx: collCtx, + shardKey: newShardKey, + numInitialChunks: chunkCount, + }).execute(st.s); awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor)); // shard0 is still the DB primary, so the database targeter keeps a cursor there // even though no collection chunks reside on it after resharding. diff --git a/jstests/sharding/query/change_streams/change_stream_resume_after_rename.js b/jstests/sharding/query/change_streams/change_stream_resume_after_rename.js index c8d560c35fd..c38a27941da 100644 --- a/jstests/sharding/query/change_streams/change_stream_resume_after_rename.js +++ b/jstests/sharding/query/change_streams/change_stream_resume_after_rename.js @@ -2,20 +2,27 @@ * Tests that a change stream can resume correctly after rename invalidation. * * @tags: [ - * requires_sharding, - * uses_change_streams, * assumes_balancer_off, * does_not_support_stepdowns, + * featureFlagChangeStreamPreciseShardTargeting, + * requires_sharding, + * uses_change_streams, * ] */ import {describe, it, before, afterEach, after} from "jstests/libs/mochalite.js"; -import {ChangeStreamWatchMode} from "jstests/libs/query/change_stream_util.js"; +import { + assertOpenCursors, + ChangeStreamTest, + ChangeStreamWatchMode, + cursorCommentFilter, +} from "jstests/libs/query/change_stream_util.js"; import {ChangeStreamReader, ChangeStreamReadingMode} from "jstests/libs/util/change_stream/change_stream_reader.js"; import {Writer} from "jstests/libs/util/change_stream/change_stream_writer.js"; import {Connector} from "jstests/libs/util/change_stream/change_stream_connector.js"; import { CreateDatabaseCommand, CreateUnsplittableCollectionCommand, + DropCollectionCommand, InsertDocCommand, RenameToNonExistentSameDbCommand, } from "jstests/libs/util/change_stream/change_stream_commands.js"; @@ -64,15 +71,54 @@ describe("$changeStream", function () { // - Original name recreated by insert (lands on shard0) // // Expected events: [create, rename, invalidate, create, insert] + // Pre-create the database (with primary on shard0) outside the writer so that the + // target-collection stream below can open with deterministic placement (cursor on the + // DB primary). enableSharding is not visible to change streams. + new CreateDatabaseCommand({dbName: testDb, primaryShard: shards[0]._id}).execute(st.s); + const commands = [ - new CreateDatabaseCommand(testDb, testColl, [shards[0]], null), - new CreateUnsplittableCollectionCommand(testDb, testColl, [shards[1]], {exists: false, isSharded: false}), - new RenameToNonExistentSameDbCommand(testDb, testColl, shards, {exists: true, isSharded: false}), - new InsertDocCommand(testDb, testColl, shards, {exists: false, isSharded: false}), + new CreateUnsplittableCollectionCommand({ + dbName: testDb, + collName: testColl, + shardSet: [shards[1]], + collectionCtx: {exists: false, isSharded: false}, + }), + new RenameToNonExistentSameDbCommand({ + dbName: testDb, + collName: testColl, + shardSet: shards, + collectionCtx: {exists: true, isSharded: false}, + // Keep testColl_renamed alive after the rename so a follow-up watch + // can observe the v2 targeter placing its cursor on shard1. The + // invalidate is triggered by the rename itself (rename-to-watched- + // namespace), not by the manual drop done in afterEach cleanup. + dropAfterRename: false, + }), + new InsertDocCommand({ + dbName: testDb, + collName: testColl, + collectionCtx: {exists: false, isSharded: false}, + }), ]; const expectedEvents = buildExpectedEvents(commands, ChangeStreamWatchMode.kCollection); const startTime = getCurrentClusterTime(st.s); + // Open a change stream on the rename target collection BEFORE running the writer to + // observe v2 targeter retargeting from the DB primary to the non-primary shard. + const targetColl = `${testColl}_renamed`; // matches RenameToNonExistentSameDbCommand + const comment = "rename_target_retargeting"; + const csTest = new ChangeStreamTest(st.s.getDB(testDb)); + const targetCursor = csTest.startWatchingChanges({ + pipeline: [{$changeStream: {version: "v2", startAtOperationTime: startTime}}], + collection: targetColl, + aggregateOptions: {comment, cursor: {batchSize: 1}}, + }); + + // Initial placement: the target collection does not yet exist, but the DB is present + // with primary on shard0. The v2 targeter falls back to the DB primary, so a single + // cursor is opened on shard0 and no config-server placement watcher is needed. + assertOpenCursors(st, [shards[0]._id], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment)); + // Execute commands via Writer. const writerName = "writer_rename_resume"; Writer.run(st.s, writerName, commands); @@ -83,7 +129,7 @@ describe("$changeStream", function () { dbName: testDb, collName: testColl, watchMode: ChangeStreamWatchMode.kCollection, - readingMode: ChangeStreamReadingMode.kContinuous, + readingMode: ChangeStreamReadingMode.kFetchOneAndResume, startAtClusterTime: startTime, numberOfEventsToRead: expectedEvents.length, instanceName: readerName, @@ -93,5 +139,56 @@ describe("$changeStream", function () { // Build VerifierContext and run SingleReaderVerificationTestCase. const ctx = new VerifierContext({[readerName]: readerConfig}, {[readerName]: createMatcher(expectedEvents)}); new SingleReaderVerificationTestCase(readerName).run(st.s, ctx); + + // Wait for the writer so all events (including the trailing drop) have flushed. + Connector.waitForDone(st.s, writerName); + Writer.joinAll(); + + // Sanity check: with dropAfterRename: false, the rename target should still exist + // on shard1 after the writer finishes. If this fails, an unexpected drop happened. + const collsOnTargetDb = st.s.getDB(testDb).getCollectionNames(); + assert( + collsOnTargetDb.includes(targetColl), + `Expected ${targetColl} to still exist after writer; found collections: ${tojsononeline(collsOnTargetDb)}`, + ); + + // A rename TO the watched namespace invalidates a collection-level change stream: + // the cursor delivers the rename event and then an immediate `invalidate`. + // See https://www.mongodb.com/docs/manual/reference/method/db.collection.renameCollection/ + // Receiving the rename here also implicitly proves the targeter retargeted from + // shard0 to shard1. + csTest.assertNextChangesEqual({ + cursor: targetCursor, + expectedChanges: [ + { + operationType: "rename", + ns: {db: testDb, coll: testColl}, + to: {db: testDb, coll: targetColl}, + }, + {operationType: "invalidate"}, + ], + expectInvalidate: true, + }); + + // Verify the v2 targeter places cursors on shard1 for testColl_renamed by opening + // a fresh stream. The original cursor's post-rename placement is unobservable + // because the rename-target invalidate closes its shard cursors before any + // inspection window; a new cursor on the now-existing collection takes the + // direct-target path and is observable. + const verifyComment = "verify_post_rename_placement"; + const verifyCursor = csTest.startWatchingChanges({ + pipeline: [{$changeStream: {version: "v2"}}], + collection: targetColl, + aggregateOptions: {comment: verifyComment, cursor: {batchSize: 0}}, + }); + // Drive a getMore so shard cursors materialize and become idle. + csTest.assertNoChange(verifyCursor); + assertOpenCursors(st, [shards[1]._id], /*expectedConfigCursor=*/ false, cursorCommentFilter(verifyComment)); + + // Clean up the lingering testColl_renamed so afterEach's dropDatabase doesn't trip + // on a surprise collection (the rename command no longer drops it for us). + new DropCollectionCommand({dbName: testDb, collName: targetColl}).execute(st.s); + + csTest.cleanUp(); }); }); diff --git a/jstests/sharding/query/change_streams/test_change_stream_sharding_move_chunk_events.js b/jstests/sharding/query/change_streams/test_change_stream_sharding_move_chunk_events.js index 530a856c636..5bdb307d048 100644 --- a/jstests/sharding/query/change_streams/test_change_stream_sharding_move_chunk_events.js +++ b/jstests/sharding/query/change_streams/test_change_stream_sharding_move_chunk_events.js @@ -37,19 +37,19 @@ function buildCommands({dbName, collName, shards, shardingType, nonEmpty}) { const commands = []; let ctx = {exists: false, nonEmpty: false, shardKeySpec: null, isUnsplittable: false}; - commands.push(new CreateDatabaseCommand(dbName, collName, shards, ctx)); + commands.push(new CreateDatabaseCommand({dbName, shardSet: shards})); if (nonEmpty) { - commands.push(new InsertDocCommand(dbName, collName, shards, ctx)); + commands.push(new InsertDocCommand({dbName, collName, collectionCtx: ctx})); ctx = {exists: true, nonEmpty: true, shardKeySpec: null, isUnsplittable: false}; - commands.push(new CreateIndexCommand(dbName, collName, shards, ctx, shardKey)); + commands.push(new CreateIndexCommand({dbName, collName, shardSet: shards, indexSpec: shardKey})); } - commands.push(new ShardCollectionCommand(dbName, collName, shards, ctx, shardKey)); + commands.push(new ShardCollectionCommand({dbName, collName, shardSet: shards, collectionCtx: ctx, shardKey})); const shardedCtx = {exists: true, nonEmpty, shardKeySpec: shardKey, isUnsplittable: false}; - commands.push(new MoveChunkCommand(dbName, collName, shards, shardedCtx)); - commands.push(new UnshardCollectionCommand(dbName, collName, shards, shardedCtx)); + commands.push(new MoveChunkCommand({dbName, collName, shardSet: shards, collectionCtx: shardedCtx})); + commands.push(new UnshardCollectionCommand({dbName, collName, shardSet: shards, collectionCtx: shardedCtx})); return commands; }