SERVER-125560 rename coordinator white box testing (#52682)
Co-authored-by: Paolo Polato <paolo.polato@mongodb.com> GitOrigin-RevId: 91fe9b90749df3ca48eae33eb6fdd971dcac7cbf
This commit is contained in:
parent
d6f365c88b
commit
814d7b8e84
@ -367,7 +367,13 @@ describe("ChangeStreamReader integration", function () {
|
||||
const numTotalInserts = numCommands * InsertDocCommand.numDocs;
|
||||
const insertCommands = [];
|
||||
for (let i = 0; i < numCommands; i++) {
|
||||
insertCommands.push(new InsertDocCommand(dbName, collName, ctx.shards, {exists: true, nonEmpty: i > 0}));
|
||||
insertCommands.push(
|
||||
new InsertDocCommand({
|
||||
dbName,
|
||||
collName,
|
||||
collectionCtx: {exists: true},
|
||||
}),
|
||||
);
|
||||
}
|
||||
Writer.run(ctx.st.s, writerInstanceName, insertCommands, TEST_SEED);
|
||||
|
||||
@ -429,12 +435,12 @@ describe("ChangeStreamReader integration", function () {
|
||||
const expectedEventTypes = ["insert", "insert", "insert", "drop", "invalidate"];
|
||||
|
||||
// Build commands: 3 InsertDocCommands + drop.
|
||||
const collectionCtx = {exists: true, nonEmpty: false};
|
||||
const collectionCtx = {exists: true};
|
||||
const commands = [
|
||||
new InsertDocCommand(dbName, collName, this.shards, collectionCtx),
|
||||
new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}),
|
||||
new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}),
|
||||
new DropCollectionCommand(dbName, collName, this.shards, collectionCtx),
|
||||
new InsertDocCommand({dbName, collName, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName, collectionCtx}),
|
||||
new DropCollectionCommand({dbName, collName, shardSet: this.shards}),
|
||||
];
|
||||
|
||||
/**
|
||||
@ -528,11 +534,11 @@ describe("ChangeStreamReader integration", function () {
|
||||
const expectedEventTypes = ["insert", "insert", "drop", "invalidate"];
|
||||
|
||||
// Build commands: 2 InsertDocCommands + drop.
|
||||
const collectionCtx = {exists: true, nonEmpty: false};
|
||||
const collectionCtx = {exists: true};
|
||||
const commands = [
|
||||
new InsertDocCommand(dbName, collName, this.shards, collectionCtx),
|
||||
new InsertDocCommand(dbName, collName, this.shards, {...collectionCtx, nonEmpty: true}),
|
||||
new DropCollectionCommand(dbName, collName, this.shards, collectionCtx),
|
||||
new InsertDocCommand({dbName, collName, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName, collectionCtx}),
|
||||
new DropCollectionCommand({dbName, collName, shardSet: this.shards}),
|
||||
];
|
||||
|
||||
const setupCollection = () => {
|
||||
@ -622,12 +628,12 @@ describe("ChangeStreamReader integration", function () {
|
||||
const expectedEventTypes = ["insert", "insert", "insert", "insert"];
|
||||
|
||||
// Build commands: 2 InsertDocCommands into each collection (interleaved).
|
||||
const collectionCtx = {exists: true, nonEmpty: false};
|
||||
const collectionCtx = {exists: true};
|
||||
const commands = [
|
||||
new InsertDocCommand(dbName, collName1, this.shards, collectionCtx),
|
||||
new InsertDocCommand(dbName, collName2, this.shards, collectionCtx),
|
||||
new InsertDocCommand(dbName, collName1, this.shards, {...collectionCtx, nonEmpty: true}),
|
||||
new InsertDocCommand(dbName, collName2, this.shards, {...collectionCtx, nonEmpty: true}),
|
||||
new InsertDocCommand({dbName, collName: collName1, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName: collName2, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName: collName1, collectionCtx}),
|
||||
new InsertDocCommand({dbName, collName: collName2, collectionCtx}),
|
||||
];
|
||||
|
||||
// Setup: drop and recreate collections (drop to ensure clean state before test).
|
||||
|
||||
@ -67,13 +67,17 @@ function getDbPrimary(connection, dbName) {
|
||||
|
||||
/**
|
||||
* Base command class.
|
||||
*
|
||||
* All Command subclasses take a single options object so callers always pass
|
||||
* named fields. This avoids positional-arg bugs (e.g. silently dropping
|
||||
* primaryShard by passing it where collectionCtx is expected).
|
||||
*/
|
||||
class Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx = {}) {
|
||||
constructor({dbName = null, collName = null, shardSet = null, collectionCtx = null} = {}) {
|
||||
this.dbName = dbName;
|
||||
this.collName = collName;
|
||||
this.shardSet = shardSet;
|
||||
this.collectionCtx = collectionCtx;
|
||||
this.collectionCtx = collectionCtx ?? {};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -151,8 +155,8 @@ class InsertDocCommand extends Command {
|
||||
}
|
||||
}
|
||||
|
||||
constructor(dbName, collName, shardSet, collectionCtx, documents = null) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, collectionCtx, documents = null}) {
|
||||
super({dbName, collName, collectionCtx});
|
||||
if (documents) {
|
||||
this.documents = documents;
|
||||
} else {
|
||||
@ -208,12 +212,21 @@ class InsertDocCommand extends Command {
|
||||
|
||||
/**
|
||||
* Create database command.
|
||||
* Targets a random primary shard from the shard set using the seeded PRNG.
|
||||
*
|
||||
* Takes an options object to avoid positional-arg confusion (in particular,
|
||||
* silently dropping primaryShard by passing it where collectionCtx is expected).
|
||||
*
|
||||
* If primaryShard is omitted, one is picked at random from shardSet using the
|
||||
* seeded PRNG. Caller must provide either primaryShard or a non-empty shardSet.
|
||||
*/
|
||||
class CreateDatabaseCommand extends Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx, primaryShard = null) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
this.primaryShard = primaryShard ?? shardSet[Random.randInt(shardSet.length)]._id;
|
||||
constructor({dbName, shardSet = null, primaryShard = null}) {
|
||||
super({dbName, shardSet});
|
||||
assert(
|
||||
primaryShard || (shardSet && shardSet.length),
|
||||
"CreateDatabaseCommand requires primaryShard or a non-empty shardSet",
|
||||
);
|
||||
this.primaryShard = primaryShard || shardSet[Random.randInt(shardSet.length)]._id;
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
@ -232,12 +245,13 @@ class CreateDatabaseCommand extends Command {
|
||||
|
||||
/**
|
||||
* Create unsplittable collection command.
|
||||
* Targets a random shard from the shard set using the seeded PRNG.
|
||||
* If dataShard is omitted, one is picked at random from shardSet using the
|
||||
* seeded PRNG.
|
||||
*/
|
||||
class CreateUnsplittableCollectionCommand extends Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx, dataShard = null) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
this.dataShard = dataShard ?? shardSet[Random.randInt(shardSet.length)]._id;
|
||||
constructor({dbName, collName, shardSet, collectionCtx, dataShard = null}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
this.dataShard = dataShard ?? (shardSet?.length ? shardSet[Random.randInt(shardSet.length)]._id : null);
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
@ -261,8 +275,16 @@ class CreateUnsplittableCollectionCommand extends Command {
|
||||
|
||||
/**
|
||||
* Create untracked collection command.
|
||||
*
|
||||
* collectionCtx defaults to {exists: false} since this command is by definition used to
|
||||
* bring a collection into existence — that's the value callers would otherwise have to
|
||||
* pass in every call site.
|
||||
*/
|
||||
class CreateUntrackedCollectionCommand extends Command {
|
||||
constructor({dbName, collName, collectionCtx = {exists: false}} = {}) {
|
||||
super({dbName, collName, collectionCtx});
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
assert.commandWorked(connection.getDB(this.dbName).createCollection(this.collName));
|
||||
}
|
||||
@ -291,8 +313,10 @@ class CreateUntrackedCollectionCommand extends Command {
|
||||
* This was verified empirically through testing.
|
||||
*/
|
||||
class DropCollectionCommand extends Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
// Default to {exists: true}: the common case is dropping an existing collection,
|
||||
// so callers shouldn't have to spell that out at every call site.
|
||||
constructor({dbName, collName, shardSet, collectionCtx = {exists: true}}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
@ -338,10 +362,6 @@ class DropCollectionCommand extends Command {
|
||||
* This means we only emit the 'dropDatabase' event, not individual 'drop' events.
|
||||
*/
|
||||
class DropDatabaseCommand extends Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
assert.commandWorked(connection.getDB(this.dbName).dropDatabase());
|
||||
}
|
||||
@ -497,11 +517,10 @@ class CreateIndexCommand extends Command {
|
||||
* @param {string} dbName - Database name.
|
||||
* @param {string} collName - Collection name.
|
||||
* @param {Array} shardSet - Array of shard objects.
|
||||
* @param {Object} collectionCtx - Collection state (shardKeySpec = current shard key).
|
||||
* @param {Object} indexSpec - The index specification to create.
|
||||
*/
|
||||
constructor(dbName, collName, shardSet, collectionCtx, indexSpec) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, indexSpec}) {
|
||||
super({dbName, collName, shardSet});
|
||||
assert(indexSpec, "indexSpec must be provided to CreateIndexCommand");
|
||||
this.indexSpec = indexSpec;
|
||||
}
|
||||
@ -537,11 +556,10 @@ class DropIndexCommand extends Command {
|
||||
* @param {string} dbName - Database name.
|
||||
* @param {string} collName - Collection name.
|
||||
* @param {Array} shardSet - Array of shard objects.
|
||||
* @param {Object} collectionCtx - Collection state (shardKeySpec = current shard key).
|
||||
* @param {Object} indexSpec - The index specification to drop.
|
||||
*/
|
||||
constructor(dbName, collName, shardSet, collectionCtx, indexSpec) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, indexSpec}) {
|
||||
super({dbName, collName, shardSet});
|
||||
assert(indexSpec, "indexSpec must be provided to DropIndexCommand");
|
||||
this.indexSpec = indexSpec;
|
||||
}
|
||||
@ -585,8 +603,8 @@ class ShardCollectionCommand extends Command {
|
||||
* @param {Object} collectionCtx - Collection state.
|
||||
* @param {Object} shardKey - The shard key to use for sharding.
|
||||
*/
|
||||
constructor(dbName, collName, shardSet, collectionCtx, shardKey) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, collectionCtx, shardKey}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
assert(shardKey, "shardKey must be provided to ShardCollectionCommand");
|
||||
this.shardKey = shardKey;
|
||||
}
|
||||
@ -653,10 +671,6 @@ class ShardCollectionCommand extends Command {
|
||||
* Precondition (guaranteed by FSM): collection exists and is sharded.
|
||||
*/
|
||||
class UnshardCollectionCommand extends Command {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
const dbDoc = connection.getDB("config").databases.findOne({_id: this.dbName});
|
||||
assert(dbDoc, `${this}: database ${this.dbName} not in config.databases`);
|
||||
@ -711,19 +725,19 @@ class ReshardCollectionCommand extends Command {
|
||||
* @param {string} collName - Collection name.
|
||||
* @param {Array} shardSet - Array of shard objects.
|
||||
* @param {Object} collectionCtx - Collection state.
|
||||
* @param {Object} newShardKey - The new shard key to reshard to.
|
||||
* @param {Object} shardKey - The new shard key to reshard to.
|
||||
*/
|
||||
constructor(
|
||||
constructor({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet,
|
||||
collectionCtx,
|
||||
newShardKey,
|
||||
shardKey,
|
||||
numInitialChunks = ReshardCollectionCommand.numInitialChunks,
|
||||
) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
assert(newShardKey, "newShardKey must be provided to ReshardCollectionCommand");
|
||||
this.newShardKey = newShardKey;
|
||||
}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
assert(shardKey, "shardKey must be provided to ReshardCollectionCommand");
|
||||
this.shardKey = shardKey;
|
||||
this.numInitialChunks = numInitialChunks;
|
||||
}
|
||||
|
||||
@ -735,11 +749,11 @@ class ReshardCollectionCommand extends Command {
|
||||
_reconfigureZonesForShardSet(connection, ns, this.shardSet);
|
||||
|
||||
const zoneName = _getZoneName(ns);
|
||||
const shardKeyField = Object.keys(this.newShardKey)[0];
|
||||
const shardKeyField = Object.keys(this.shardKey)[0];
|
||||
assert.commandWorked(
|
||||
connection.adminCommand({
|
||||
reshardCollection: ns,
|
||||
key: this.newShardKey,
|
||||
key: this.shardKey,
|
||||
numInitialChunks: this.numInitialChunks,
|
||||
zones: [
|
||||
{
|
||||
@ -753,7 +767,7 @@ class ReshardCollectionCommand extends Command {
|
||||
}
|
||||
|
||||
toString() {
|
||||
const type = isHashedShardKey(this.newShardKey) ? "hashed" : "range";
|
||||
const type = isHashedShardKey(this.shardKey) ? "hashed" : "range";
|
||||
return `ReshardCollectionCommand(${type})`;
|
||||
}
|
||||
|
||||
@ -783,8 +797,13 @@ class ReshardCollectionCommand extends Command {
|
||||
class RenameCommand extends Command {
|
||||
// Subclasses must set: this.targetShouldExist, this.crossDatabase.
|
||||
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, collectionCtx, dropAfterRename = true}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
// When true, drops the renamed-to collection at the end of execute() so the same
|
||||
// rename can run again later in a long-lived test (FSM cleanup). Tests that need
|
||||
// to observe the post-rename state should set this to false and drop the renamed
|
||||
// collection themselves.
|
||||
this.dropAfterRename = dropAfterRename;
|
||||
}
|
||||
|
||||
execute(connection) {
|
||||
@ -803,8 +822,9 @@ class RenameCommand extends Command {
|
||||
}),
|
||||
);
|
||||
|
||||
// Drop the renamed collection to clean up for subsequent renames.
|
||||
assert.commandWorked(connection.getDB(targetDb).runCommand({drop: targetColl}));
|
||||
if (this.dropAfterRename) {
|
||||
assert.commandWorked(connection.getDB(targetDb).runCommand({drop: targetColl}));
|
||||
}
|
||||
}
|
||||
|
||||
toString() {
|
||||
@ -860,32 +880,32 @@ class RenameCommand extends Command {
|
||||
|
||||
// Concrete rename command classes.
|
||||
class RenameToNonExistentSameDbCommand extends RenameCommand {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor(opts) {
|
||||
super(opts);
|
||||
this.targetShouldExist = false;
|
||||
this.crossDatabase = false;
|
||||
}
|
||||
}
|
||||
|
||||
class RenameToExistentSameDbCommand extends RenameCommand {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor(opts) {
|
||||
super(opts);
|
||||
this.targetShouldExist = true;
|
||||
this.crossDatabase = false;
|
||||
}
|
||||
}
|
||||
|
||||
class RenameToNonExistentDifferentDbCommand extends RenameCommand {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor(opts) {
|
||||
super(opts);
|
||||
this.targetShouldExist = false;
|
||||
this.crossDatabase = true;
|
||||
}
|
||||
}
|
||||
|
||||
class RenameToExistentDifferentDbCommand extends RenameCommand {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor(opts) {
|
||||
super(opts);
|
||||
this.targetShouldExist = true;
|
||||
this.crossDatabase = true;
|
||||
}
|
||||
@ -956,8 +976,8 @@ class MoveCommandBase extends Command {
|
||||
* Moves the primary shard for a database to a different shard.
|
||||
*/
|
||||
class MovePrimaryCommand extends MoveCommandBase {
|
||||
constructor(dbName, collName, shardSet, collectionCtx, targetShard = null) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, collectionCtx, targetShard = null}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
this.targetShard = targetShard;
|
||||
}
|
||||
|
||||
@ -1040,8 +1060,8 @@ class MoveCollectionCommand extends MoveCommandBase {
|
||||
* are reported by getChangeEvents().
|
||||
*/
|
||||
class MoveChunkCommand extends MoveCommandBase {
|
||||
constructor(dbName, collName, shardSet, collectionCtx) {
|
||||
super(dbName, collName, shardSet, collectionCtx);
|
||||
constructor({dbName, collName, shardSet, collectionCtx}) {
|
||||
super({dbName, collName, shardSet, collectionCtx});
|
||||
// Always insert enough documents for proper chunk distribution,
|
||||
// even when the collection already has data — prior inserts may
|
||||
// have fewer docs than shardSet.length requires for splitting.
|
||||
|
||||
@ -75,19 +75,20 @@ class ShardingCommandGenerator {
|
||||
const CommandClass = ShardingCommandGenerator.actionToCommandClass[action];
|
||||
assert(CommandClass !== undefined, `No command class found for action ${action}`);
|
||||
|
||||
const baseOpts = {
|
||||
dbName: params.getDbName(),
|
||||
collName: params.getCollName(),
|
||||
shardSet: params.getShardSet(),
|
||||
collectionCtx: {...collectionCtx},
|
||||
};
|
||||
|
||||
// Commands that need the target shard key as a separate parameter.
|
||||
if (CommandClass === ShardCollectionCommand || CommandClass === ReshardCollectionCommand) {
|
||||
assert(targetShardKey, `${CommandClass.name} requires targetShardKey`);
|
||||
return new CommandClass(
|
||||
params.getDbName(),
|
||||
params.getCollName(),
|
||||
params.getShardSet(),
|
||||
{...collectionCtx},
|
||||
targetShardKey,
|
||||
);
|
||||
return new CommandClass({...baseOpts, shardKey: targetShardKey});
|
||||
}
|
||||
|
||||
return new CommandClass(params.getDbName(), params.getCollName(), params.getShardSet(), {...collectionCtx});
|
||||
return new CommandClass(baseOpts);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -463,13 +464,13 @@ class ShardingCommandGenerator {
|
||||
|
||||
const shardCtx = {...ctx, exists: false};
|
||||
commands.push(
|
||||
new ShardCollectionCommand(
|
||||
params.getDbName(),
|
||||
params.getCollName(),
|
||||
params.getShardSet(),
|
||||
shardCtx,
|
||||
targetShardKey,
|
||||
),
|
||||
new ShardCollectionCommand({
|
||||
dbName: params.getDbName(),
|
||||
collName: params.getCollName(),
|
||||
shardSet: params.getShardSet(),
|
||||
collectionCtx: shardCtx,
|
||||
shardKey: targetShardKey,
|
||||
}),
|
||||
);
|
||||
// Update context - collection is now sharded
|
||||
ctx.exists = true;
|
||||
@ -480,13 +481,12 @@ class ShardingCommandGenerator {
|
||||
// Create shard key index if action requires it (collection must already exist).
|
||||
if (ShardingCommandGenerator.actionsRequiringIndex.has(action)) {
|
||||
commands.push(
|
||||
new CreateIndexCommand(
|
||||
params.getDbName(),
|
||||
params.getCollName(),
|
||||
params.getShardSet(),
|
||||
ctx,
|
||||
targetShardKey,
|
||||
),
|
||||
new CreateIndexCommand({
|
||||
dbName: params.getDbName(),
|
||||
collName: params.getCollName(),
|
||||
shardSet: params.getShardSet(),
|
||||
indexSpec: targetShardKey,
|
||||
}),
|
||||
);
|
||||
}
|
||||
// NOTE: We intentionally do NOT drop indexes before dropping collections.
|
||||
@ -496,10 +496,14 @@ class ShardingCommandGenerator {
|
||||
// Drop collection before dropping database (simplifies change event matching).
|
||||
if (action === Action.DROP_DATABASE && collectionCtx.exists) {
|
||||
// NOTE: We do NOT drop indexes here - see comment above about dropIndexes coverage.
|
||||
// Pass a COPY to DropCollectionCommand so it sees exists:true.
|
||||
// We'll set ctx.exists = false for the subsequent DropDatabaseCommand.
|
||||
// DropCollectionCommand defaults to {exists: true}, which is what we want here
|
||||
// (we only enter this branch when ctx.exists is true).
|
||||
commands.push(
|
||||
new DropCollectionCommand(params.getDbName(), params.getCollName(), params.getShardSet(), {...ctx}),
|
||||
new DropCollectionCommand({
|
||||
dbName: params.getDbName(),
|
||||
collName: params.getCollName(),
|
||||
shardSet: params.getShardSet(),
|
||||
}),
|
||||
);
|
||||
// After dropping collection, update context for DropDatabaseCommand to reflect that collection no longer exists.
|
||||
ctx.exists = false;
|
||||
@ -519,18 +523,13 @@ class ShardingCommandGenerator {
|
||||
const shouldDropIndex =
|
||||
action === Action.UNSHARD_COLLECTION || bsonWoCompare(oldShardKey, targetShardKey) !== 0;
|
||||
if (shouldDropIndex && oldShardKey) {
|
||||
// Update context for DropIndexCommand's event count calculation:
|
||||
// - After reshard: shardKeySpec = new shard key (collection still sharded)
|
||||
// - After unshard: shardKeySpec = null (collection is now untracked, no shard key)
|
||||
const postActionCtx = {...ctx, shardKeySpec: targetShardKey};
|
||||
commands.push(
|
||||
new DropIndexCommand(
|
||||
params.getDbName(),
|
||||
params.getCollName(),
|
||||
params.getShardSet(),
|
||||
postActionCtx,
|
||||
oldShardKey,
|
||||
),
|
||||
new DropIndexCommand({
|
||||
dbName: params.getDbName(),
|
||||
collName: params.getCollName(),
|
||||
shardSet: params.getShardSet(),
|
||||
indexSpec: oldShardKey,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,7 +269,7 @@ function ensureDatabasesExist(writerDefs, mongos, fsmShards) {
|
||||
if (startState !== State.DATABASE_ABSENT && !createdDbs.has(w.dbName)) {
|
||||
createdDbs.add(w.dbName);
|
||||
mongos.getDB(w.dbName).dropDatabase();
|
||||
new CreateDatabaseCommand(w.dbName, w.collName, fsmShards).execute(mongos);
|
||||
new CreateDatabaseCommand({dbName: w.dbName, shardSet: fsmShards}).execute(mongos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ import {ChangeStreamWatchMode} from "jstests/libs/query/change_stream_util.js";
|
||||
*/
|
||||
export class CreateTimeseriesCollectionCommand extends Command {
|
||||
constructor({dbName, collName, timeField, metaField}) {
|
||||
super(dbName, collName, /* shardSet */ null, /* collectionCtx */ {});
|
||||
super({dbName, collName});
|
||||
this.timeField = timeField;
|
||||
this.metaField = metaField;
|
||||
}
|
||||
@ -81,7 +81,7 @@ export class TimeseriesInsertCommand extends Command {
|
||||
* If true, this event is only visible when ctx.rawData === true.
|
||||
*/
|
||||
constructor({insertNss, eventNss, insertDoc, expectedFullDocument, requiresRawData = false}) {
|
||||
super(insertNss.db, insertNss.coll, /* shardSet */ null, /* collectionCtx */ {});
|
||||
super({dbName: insertNss.db, collName: insertNss.coll});
|
||||
this.insertNss = insertNss;
|
||||
this.eventNss = eventNss;
|
||||
this.insertDoc = insertDoc;
|
||||
@ -139,7 +139,7 @@ export class TimeseriesInsertCommand extends Command {
|
||||
|
||||
export class FCVUpgradeCommand extends Command {
|
||||
constructor({timeseriesCollections} = {}) {
|
||||
super(/* dbName */ null, /* collName */ null, /* shardSet */ null, /* collectionCtx */ {});
|
||||
super();
|
||||
this.timeseriesCollections = timeseriesCollections;
|
||||
}
|
||||
|
||||
@ -185,7 +185,7 @@ export class FCVUpgradeCommand extends Command {
|
||||
|
||||
export class FCVDowngradeCommand extends Command {
|
||||
constructor({timeseriesCollections, targetFCV} = {}) {
|
||||
super(/* dbName */ null, /* collName */ null, /* shardSet */ null, /* collectionCtx */ {});
|
||||
super();
|
||||
this.timeseriesCollections = timeseriesCollections;
|
||||
this.targetFCV = targetFCV;
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ import {removeShard} from "jstests/sharding/libs/remove_shard_util.js";
|
||||
* The command auto-generates a unique _id and knows which change events it produces.
|
||||
*/
|
||||
function makeInsertCmd(dbName, collName) {
|
||||
return new InsertDocCommand(dbName, collName, /* shardSet */ null, /* collectionCtx */ {exists: true});
|
||||
return new InsertDocCommand({dbName, collName, collectionCtx: {exists: true}});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -42,10 +42,6 @@ function makeNss(dbName, collName) {
|
||||
return {db: dbName, coll: collName};
|
||||
}
|
||||
|
||||
function createDbCmdOnShard(dbName, shard) {
|
||||
return new CreateDatabaseCommand(dbName, null, null, null, shard);
|
||||
}
|
||||
|
||||
function withChangeStreamTest(db, fn) {
|
||||
const cst = new ChangeStreamTest(db);
|
||||
try {
|
||||
@ -295,8 +291,11 @@ for (const [downgradeFCV, testHelper] of crossProduct(
|
||||
}
|
||||
|
||||
return db1Name == db2Name
|
||||
? [createDbCmdOnShard(db1Name, db1PrimaryShard)]
|
||||
: [createDbCmdOnShard(db1Name, db1PrimaryShard), createDbCmdOnShard(db2Name, db2PrimaryShard)];
|
||||
? [new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard})]
|
||||
: [
|
||||
new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard}),
|
||||
new CreateDatabaseCommand({dbName: db2Name, primaryShard: db2PrimaryShard}),
|
||||
];
|
||||
})();
|
||||
|
||||
const createTs1CollCmd = new CreateTimeseriesCollectionCommand({
|
||||
@ -621,8 +620,11 @@ for (const [downgradeFCV, testHelper] of crossProduct(
|
||||
}
|
||||
|
||||
return db1Name == db2Name
|
||||
? [createDbCmdOnShard(db1Name, db1PrimaryShard)]
|
||||
: [createDbCmdOnShard(db1Name, db1PrimaryShard), createDbCmdOnShard(db2Name, db2PrimaryShard)];
|
||||
? [new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard})]
|
||||
: [
|
||||
new CreateDatabaseCommand({dbName: db1Name, primaryShard: db1PrimaryShard}),
|
||||
new CreateDatabaseCommand({dbName: db2Name, primaryShard: db2PrimaryShard}),
|
||||
];
|
||||
})();
|
||||
|
||||
const createTs1CollCmd = new CreateTimeseriesCollectionCommand({
|
||||
|
||||
@ -84,7 +84,7 @@ describe("collection v2 strict whitebox", function () {
|
||||
});
|
||||
|
||||
beforeEach(function () {
|
||||
new CreateDatabaseCommand(dbName, null, null, null, st.shard0.shardName).execute(st.s);
|
||||
new CreateDatabaseCommand({dbName, primaryShard: st.shard0.shardName}).execute(st.s);
|
||||
});
|
||||
|
||||
afterEach(function () {
|
||||
@ -105,7 +105,13 @@ describe("collection v2 strict whitebox", function () {
|
||||
|
||||
it("sharded collection across two shards - initial placement", function () {
|
||||
coll = db[collName];
|
||||
new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}).execute(st.s);
|
||||
new ShardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: allShards,
|
||||
collectionCtx: {exists: false},
|
||||
shardKey: {_id: 1},
|
||||
}).execute(st.s);
|
||||
distributeCollectionDataOverShards(db, coll, {
|
||||
middle: {_id: 0},
|
||||
chunks: [
|
||||
@ -135,7 +141,7 @@ describe("collection v2 strict whitebox", function () {
|
||||
|
||||
it("unsharded collection - single shard cursor", function () {
|
||||
coll = db[collName];
|
||||
new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s);
|
||||
new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s);
|
||||
|
||||
const comment = "strict_unsharded";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -175,7 +181,7 @@ describe("collection v2 strict whitebox", function () {
|
||||
);
|
||||
|
||||
// Create DB to trigger DbAbsent -> DbPresent transition.
|
||||
new CreateDatabaseCommand(dbName, null, null, null, st.shard2.shardName).execute(st.s);
|
||||
new CreateDatabaseCommand({dbName, primaryShard: st.shard2.shardName}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kDbAbsentEvent], () => csTest.assertNoChange(csCursor), {
|
||||
[kDbAbsentEvent]: (attr) => assertExpectedShardsInLog(attr, 1),
|
||||
});
|
||||
@ -192,7 +198,12 @@ describe("collection v2 strict whitebox", function () {
|
||||
assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 10}}));
|
||||
|
||||
const docs = [{_id: -11}, {_id: 0}, {_id: 11}];
|
||||
new InsertDocCommand(dbName, collName, allShards, {exists: true, shardKeySpec: {_id: 1}}, docs).execute(st.s);
|
||||
new InsertDocCommand({
|
||||
dbName,
|
||||
collName,
|
||||
collectionCtx: {exists: true, shardKeySpec: {_id: 1}},
|
||||
documents: docs,
|
||||
}).execute(st.s);
|
||||
|
||||
const comment = "strict_movechunk_lifecycle";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -280,7 +291,7 @@ describe("collection v2 strict whitebox", function () {
|
||||
const originalShardId = st.shard0.shardName;
|
||||
const targetShardId = st.shard1.shardName;
|
||||
|
||||
new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s);
|
||||
new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s);
|
||||
|
||||
const comment = "strict_move_primary";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -295,7 +306,7 @@ describe("collection v2 strict whitebox", function () {
|
||||
});
|
||||
assertOpenCursors(st, [originalShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment));
|
||||
|
||||
new MovePrimaryCommand(dbName, collName, null, null, targetShardId).execute(st.s);
|
||||
new MovePrimaryCommand({dbName, collName, targetShard: targetShardId}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor));
|
||||
assertOpenCursors(st, [targetShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment));
|
||||
});
|
||||
@ -309,8 +320,19 @@ describe("collection v2 strict whitebox", function () {
|
||||
{_id: 2, a: 2},
|
||||
];
|
||||
[
|
||||
new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}),
|
||||
new InsertDocCommand(dbName, collName, null, {exists: true, shardKeySpec: {_id: 1}}, docs),
|
||||
new ShardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: allShards,
|
||||
collectionCtx: {exists: false},
|
||||
shardKey: {_id: 1},
|
||||
}),
|
||||
new InsertDocCommand({
|
||||
dbName,
|
||||
collName,
|
||||
collectionCtx: {exists: true, shardKeySpec: {_id: 1}},
|
||||
documents: docs,
|
||||
}),
|
||||
].forEach((cmd) => cmd.execute(st.s));
|
||||
distributeCollectionDataOverShards(db, coll, {
|
||||
middle: {_id: 0},
|
||||
@ -343,7 +365,14 @@ describe("collection v2 strict whitebox", function () {
|
||||
const collCtx = {exists: true, shardKeySpec: {_id: 1}};
|
||||
const newShardKey = {a: 1};
|
||||
const chunkCount = 2;
|
||||
new ReshardCollectionCommand(dbName, collName, targetShards, collCtx, newShardKey, chunkCount).execute(st.s);
|
||||
new ReshardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: targetShards,
|
||||
collectionCtx: collCtx,
|
||||
shardKey: newShardKey,
|
||||
numInitialChunks: chunkCount,
|
||||
}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor));
|
||||
assertOpenCursors(
|
||||
st,
|
||||
|
||||
@ -87,7 +87,7 @@ describe("database v2 strict whitebox", function () {
|
||||
});
|
||||
|
||||
beforeEach(function () {
|
||||
new CreateDatabaseCommand(dbName, null, null, null, st.shard0.shardName).execute(st.s);
|
||||
new CreateDatabaseCommand({dbName, primaryShard: st.shard0.shardName}).execute(st.s);
|
||||
});
|
||||
|
||||
afterEach(function () {
|
||||
@ -108,7 +108,13 @@ describe("database v2 strict whitebox", function () {
|
||||
|
||||
it("sharded collection across two shards - initial placement", function () {
|
||||
coll = db[collName];
|
||||
new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}).execute(st.s);
|
||||
new ShardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: allShards,
|
||||
collectionCtx: {exists: false},
|
||||
shardKey: {_id: 1},
|
||||
}).execute(st.s);
|
||||
distributeCollectionDataOverShards(db, coll, {
|
||||
middle: {_id: 0},
|
||||
chunks: [
|
||||
@ -138,7 +144,7 @@ describe("database v2 strict whitebox", function () {
|
||||
|
||||
it("unsharded collection - single shard cursor", function () {
|
||||
coll = db[collName];
|
||||
new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s);
|
||||
new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s);
|
||||
|
||||
const comment = "db_strict_unsharded";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -178,7 +184,7 @@ describe("database v2 strict whitebox", function () {
|
||||
);
|
||||
|
||||
// Create DB to trigger DbAbsent -> DbPresent transition.
|
||||
new CreateDatabaseCommand(dbName, null, null, null, st.shard2.shardName).execute(st.s);
|
||||
new CreateDatabaseCommand({dbName, primaryShard: st.shard2.shardName}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kDbAbsentEvent], () => csTest.assertNoChange(csCursor), {
|
||||
[kDbAbsentEvent]: (attr) => assertExpectedShardsInLog(attr, 1),
|
||||
});
|
||||
@ -195,7 +201,12 @@ describe("database v2 strict whitebox", function () {
|
||||
assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 10}}));
|
||||
|
||||
const docs = [{_id: -11}, {_id: 0}, {_id: 11}];
|
||||
new InsertDocCommand(dbName, collName, allShards, {exists: true, shardKeySpec: {_id: 1}}, docs).execute(st.s);
|
||||
new InsertDocCommand({
|
||||
dbName,
|
||||
collName,
|
||||
collectionCtx: {exists: true, shardKeySpec: {_id: 1}},
|
||||
documents: docs,
|
||||
}).execute(st.s);
|
||||
|
||||
const comment = "db_strict_movechunk_lifecycle";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -269,7 +280,7 @@ describe("database v2 strict whitebox", function () {
|
||||
const originalShardId = st.shard0.shardName;
|
||||
const targetShardId = st.shard1.shardName;
|
||||
|
||||
new CreateUntrackedCollectionCommand(dbName, collName, null, {exists: false}).execute(st.s);
|
||||
new CreateUntrackedCollectionCommand({dbName, collName}).execute(st.s);
|
||||
|
||||
const comment = "db_strict_move_primary";
|
||||
csTest = new ChangeStreamTest(db);
|
||||
@ -284,7 +295,7 @@ describe("database v2 strict whitebox", function () {
|
||||
});
|
||||
assertOpenCursors(st, [originalShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment));
|
||||
|
||||
new MovePrimaryCommand(dbName, collName, null, null, targetShardId).execute(st.s);
|
||||
new MovePrimaryCommand({dbName, collName, targetShard: targetShardId}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor));
|
||||
assertOpenCursors(st, [targetShardId], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment));
|
||||
});
|
||||
@ -298,8 +309,19 @@ describe("database v2 strict whitebox", function () {
|
||||
{_id: 2, a: 2},
|
||||
];
|
||||
[
|
||||
new ShardCollectionCommand(dbName, collName, allShards, {exists: false}, {_id: 1}),
|
||||
new InsertDocCommand(dbName, collName, null, {exists: true, shardKeySpec: {_id: 1}}, docs),
|
||||
new ShardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: allShards,
|
||||
collectionCtx: {exists: false},
|
||||
shardKey: {_id: 1},
|
||||
}),
|
||||
new InsertDocCommand({
|
||||
dbName,
|
||||
collName,
|
||||
collectionCtx: {exists: true, shardKeySpec: {_id: 1}},
|
||||
documents: docs,
|
||||
}),
|
||||
].forEach((cmd) => cmd.execute(st.s));
|
||||
distributeCollectionDataOverShards(db, coll, {
|
||||
middle: {_id: 0},
|
||||
@ -332,7 +354,14 @@ describe("database v2 strict whitebox", function () {
|
||||
const collCtx = {exists: true, shardKeySpec: {_id: 1}};
|
||||
const newShardKey = {a: 1};
|
||||
const chunkCount = 2;
|
||||
new ReshardCollectionCommand(dbName, collName, targetShards, collCtx, newShardKey, chunkCount).execute(st.s);
|
||||
new ReshardCollectionCommand({
|
||||
dbName,
|
||||
collName,
|
||||
shardSet: targetShards,
|
||||
collectionCtx: collCtx,
|
||||
shardKey: newShardKey,
|
||||
numInitialChunks: chunkCount,
|
||||
}).execute(st.s);
|
||||
awaitLogMessageCodes(st.s, [kPlacementRefresh], () => csTest.assertNoChange(csCursor));
|
||||
// shard0 is still the DB primary, so the database targeter keeps a cursor there
|
||||
// even though no collection chunks reside on it after resharding.
|
||||
|
||||
@ -2,20 +2,27 @@
|
||||
* Tests that a change stream can resume correctly after rename invalidation.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_sharding,
|
||||
* uses_change_streams,
|
||||
* assumes_balancer_off,
|
||||
* does_not_support_stepdowns,
|
||||
* featureFlagChangeStreamPreciseShardTargeting,
|
||||
* requires_sharding,
|
||||
* uses_change_streams,
|
||||
* ]
|
||||
*/
|
||||
import {describe, it, before, afterEach, after} from "jstests/libs/mochalite.js";
|
||||
import {ChangeStreamWatchMode} from "jstests/libs/query/change_stream_util.js";
|
||||
import {
|
||||
assertOpenCursors,
|
||||
ChangeStreamTest,
|
||||
ChangeStreamWatchMode,
|
||||
cursorCommentFilter,
|
||||
} from "jstests/libs/query/change_stream_util.js";
|
||||
import {ChangeStreamReader, ChangeStreamReadingMode} from "jstests/libs/util/change_stream/change_stream_reader.js";
|
||||
import {Writer} from "jstests/libs/util/change_stream/change_stream_writer.js";
|
||||
import {Connector} from "jstests/libs/util/change_stream/change_stream_connector.js";
|
||||
import {
|
||||
CreateDatabaseCommand,
|
||||
CreateUnsplittableCollectionCommand,
|
||||
DropCollectionCommand,
|
||||
InsertDocCommand,
|
||||
RenameToNonExistentSameDbCommand,
|
||||
} from "jstests/libs/util/change_stream/change_stream_commands.js";
|
||||
@ -64,15 +71,54 @@ describe("$changeStream", function () {
|
||||
// - Original name recreated by insert (lands on shard0)
|
||||
//
|
||||
// Expected events: [create, rename, invalidate, create, insert]
|
||||
// Pre-create the database (with primary on shard0) outside the writer so that the
|
||||
// target-collection stream below can open with deterministic placement (cursor on the
|
||||
// DB primary). enableSharding is not visible to change streams.
|
||||
new CreateDatabaseCommand({dbName: testDb, primaryShard: shards[0]._id}).execute(st.s);
|
||||
|
||||
const commands = [
|
||||
new CreateDatabaseCommand(testDb, testColl, [shards[0]], null),
|
||||
new CreateUnsplittableCollectionCommand(testDb, testColl, [shards[1]], {exists: false, isSharded: false}),
|
||||
new RenameToNonExistentSameDbCommand(testDb, testColl, shards, {exists: true, isSharded: false}),
|
||||
new InsertDocCommand(testDb, testColl, shards, {exists: false, isSharded: false}),
|
||||
new CreateUnsplittableCollectionCommand({
|
||||
dbName: testDb,
|
||||
collName: testColl,
|
||||
shardSet: [shards[1]],
|
||||
collectionCtx: {exists: false, isSharded: false},
|
||||
}),
|
||||
new RenameToNonExistentSameDbCommand({
|
||||
dbName: testDb,
|
||||
collName: testColl,
|
||||
shardSet: shards,
|
||||
collectionCtx: {exists: true, isSharded: false},
|
||||
// Keep testColl_renamed alive after the rename so a follow-up watch
|
||||
// can observe the v2 targeter placing its cursor on shard1. The
|
||||
// invalidate is triggered by the rename itself (rename-to-watched-
|
||||
// namespace), not by the manual drop done in afterEach cleanup.
|
||||
dropAfterRename: false,
|
||||
}),
|
||||
new InsertDocCommand({
|
||||
dbName: testDb,
|
||||
collName: testColl,
|
||||
collectionCtx: {exists: false, isSharded: false},
|
||||
}),
|
||||
];
|
||||
const expectedEvents = buildExpectedEvents(commands, ChangeStreamWatchMode.kCollection);
|
||||
const startTime = getCurrentClusterTime(st.s);
|
||||
|
||||
// Open a change stream on the rename target collection BEFORE running the writer to
|
||||
// observe v2 targeter retargeting from the DB primary to the non-primary shard.
|
||||
const targetColl = `${testColl}_renamed`; // matches RenameToNonExistentSameDbCommand
|
||||
const comment = "rename_target_retargeting";
|
||||
const csTest = new ChangeStreamTest(st.s.getDB(testDb));
|
||||
const targetCursor = csTest.startWatchingChanges({
|
||||
pipeline: [{$changeStream: {version: "v2", startAtOperationTime: startTime}}],
|
||||
collection: targetColl,
|
||||
aggregateOptions: {comment, cursor: {batchSize: 1}},
|
||||
});
|
||||
|
||||
// Initial placement: the target collection does not yet exist, but the DB is present
|
||||
// with primary on shard0. The v2 targeter falls back to the DB primary, so a single
|
||||
// cursor is opened on shard0 and no config-server placement watcher is needed.
|
||||
assertOpenCursors(st, [shards[0]._id], /*expectedConfigCursor=*/ false, cursorCommentFilter(comment));
|
||||
|
||||
// Execute commands via Writer.
|
||||
const writerName = "writer_rename_resume";
|
||||
Writer.run(st.s, writerName, commands);
|
||||
@ -83,7 +129,7 @@ describe("$changeStream", function () {
|
||||
dbName: testDb,
|
||||
collName: testColl,
|
||||
watchMode: ChangeStreamWatchMode.kCollection,
|
||||
readingMode: ChangeStreamReadingMode.kContinuous,
|
||||
readingMode: ChangeStreamReadingMode.kFetchOneAndResume,
|
||||
startAtClusterTime: startTime,
|
||||
numberOfEventsToRead: expectedEvents.length,
|
||||
instanceName: readerName,
|
||||
@ -93,5 +139,56 @@ describe("$changeStream", function () {
|
||||
// Build VerifierContext and run SingleReaderVerificationTestCase.
|
||||
const ctx = new VerifierContext({[readerName]: readerConfig}, {[readerName]: createMatcher(expectedEvents)});
|
||||
new SingleReaderVerificationTestCase(readerName).run(st.s, ctx);
|
||||
|
||||
// Wait for the writer so all events (including the trailing drop) have flushed.
|
||||
Connector.waitForDone(st.s, writerName);
|
||||
Writer.joinAll();
|
||||
|
||||
// Sanity check: with dropAfterRename: false, the rename target should still exist
|
||||
// on shard1 after the writer finishes. If this fails, an unexpected drop happened.
|
||||
const collsOnTargetDb = st.s.getDB(testDb).getCollectionNames();
|
||||
assert(
|
||||
collsOnTargetDb.includes(targetColl),
|
||||
`Expected ${targetColl} to still exist after writer; found collections: ${tojsononeline(collsOnTargetDb)}`,
|
||||
);
|
||||
|
||||
// A rename TO the watched namespace invalidates a collection-level change stream:
|
||||
// the cursor delivers the rename event and then an immediate `invalidate`.
|
||||
// See https://www.mongodb.com/docs/manual/reference/method/db.collection.renameCollection/
|
||||
// Receiving the rename here also implicitly proves the targeter retargeted from
|
||||
// shard0 to shard1.
|
||||
csTest.assertNextChangesEqual({
|
||||
cursor: targetCursor,
|
||||
expectedChanges: [
|
||||
{
|
||||
operationType: "rename",
|
||||
ns: {db: testDb, coll: testColl},
|
||||
to: {db: testDb, coll: targetColl},
|
||||
},
|
||||
{operationType: "invalidate"},
|
||||
],
|
||||
expectInvalidate: true,
|
||||
});
|
||||
|
||||
// Verify the v2 targeter places cursors on shard1 for testColl_renamed by opening
|
||||
// a fresh stream. The original cursor's post-rename placement is unobservable
|
||||
// because the rename-target invalidate closes its shard cursors before any
|
||||
// inspection window; a new cursor on the now-existing collection takes the
|
||||
// direct-target path and is observable.
|
||||
const verifyComment = "verify_post_rename_placement";
|
||||
const verifyCursor = csTest.startWatchingChanges({
|
||||
pipeline: [{$changeStream: {version: "v2"}}],
|
||||
collection: targetColl,
|
||||
aggregateOptions: {comment: verifyComment, cursor: {batchSize: 0}},
|
||||
});
|
||||
// Drive a getMore so shard cursors materialize and become idle.
|
||||
csTest.assertNoChange(verifyCursor);
|
||||
assertOpenCursors(st, [shards[1]._id], /*expectedConfigCursor=*/ false, cursorCommentFilter(verifyComment));
|
||||
|
||||
// Clean up the lingering testColl_renamed so afterEach's dropDatabase doesn't trip
|
||||
// on a surprise collection (the rename command no longer drops it for us).
|
||||
new DropCollectionCommand({dbName: testDb, collName: targetColl}).execute(st.s);
|
||||
|
||||
csTest.cleanUp();
|
||||
});
|
||||
});
|
||||
|
||||
@ -37,19 +37,19 @@ function buildCommands({dbName, collName, shards, shardingType, nonEmpty}) {
|
||||
const commands = [];
|
||||
let ctx = {exists: false, nonEmpty: false, shardKeySpec: null, isUnsplittable: false};
|
||||
|
||||
commands.push(new CreateDatabaseCommand(dbName, collName, shards, ctx));
|
||||
commands.push(new CreateDatabaseCommand({dbName, shardSet: shards}));
|
||||
|
||||
if (nonEmpty) {
|
||||
commands.push(new InsertDocCommand(dbName, collName, shards, ctx));
|
||||
commands.push(new InsertDocCommand({dbName, collName, collectionCtx: ctx}));
|
||||
ctx = {exists: true, nonEmpty: true, shardKeySpec: null, isUnsplittable: false};
|
||||
commands.push(new CreateIndexCommand(dbName, collName, shards, ctx, shardKey));
|
||||
commands.push(new CreateIndexCommand({dbName, collName, shardSet: shards, indexSpec: shardKey}));
|
||||
}
|
||||
|
||||
commands.push(new ShardCollectionCommand(dbName, collName, shards, ctx, shardKey));
|
||||
commands.push(new ShardCollectionCommand({dbName, collName, shardSet: shards, collectionCtx: ctx, shardKey}));
|
||||
|
||||
const shardedCtx = {exists: true, nonEmpty, shardKeySpec: shardKey, isUnsplittable: false};
|
||||
commands.push(new MoveChunkCommand(dbName, collName, shards, shardedCtx));
|
||||
commands.push(new UnshardCollectionCommand(dbName, collName, shards, shardedCtx));
|
||||
commands.push(new MoveChunkCommand({dbName, collName, shardSet: shards, collectionCtx: shardedCtx}));
|
||||
commands.push(new UnshardCollectionCommand({dbName, collName, shardSet: shards, collectionCtx: shardedCtx}));
|
||||
|
||||
return commands;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user