SERVER-125865 Revert "Skip range deletions when transitioning from embedded to dedicated config servers" (#53170)
GitOrigin-RevId: 9c8c1a951a95235b3c84c976ca4c07d9aa8297aa
This commit is contained in:
parent
4db8d6aece
commit
5d5d678a7c
@ -33,11 +33,6 @@ export const ShardTransitionUtil = (function () {
|
||||
// ShardNotFound.
|
||||
return true;
|
||||
}
|
||||
if (!res.ok && res.code === ErrorCodes.RemoveShardDrainingInProgress) {
|
||||
// If orphanCleanupDelaySecs hasn't elapsed yet, the command will fail with
|
||||
// RemoveShardDrainingInProgress. Keep retrying until the delay elapses.
|
||||
return false;
|
||||
}
|
||||
assert.commandWorked(res);
|
||||
return res.state == "completed";
|
||||
},
|
||||
|
||||
@ -258,27 +258,21 @@ const newShardName = assert.commandWorked(st.s.adminCommand({addShard: newShardR
|
||||
assert.sameMembers(configPrimary.getCollection(indexedNs).getIndexKeys(), [{_id: 1}, {oldKey: 1}]);
|
||||
assert(configPrimary.getCollection("config.system.sessions").exists());
|
||||
|
||||
// Move away the final chunk.
|
||||
// Move away the final chunk, but block range deletion and verify this blocks the transition.
|
||||
assert.eq(1, getCatalogShardChunks(st.s).length, () => getCatalogShardChunks(st.s));
|
||||
const suspendRangeDeletionFp = configureFailPoint(st.configRS.getPrimary(), "suspendRangeDeletion");
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: "config.system.sessions", find: {_id: 0}, to: newShardName}));
|
||||
suspendRangeDeletionFp.wait();
|
||||
|
||||
// The config server owns no chunks now.
|
||||
// The config server owns no chunks, but must wait for its range deletions.
|
||||
assert.eq(0, getCatalogShardChunks(st.s).length, () => getCatalogShardChunks(st.s));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(1, removeRes.pendingRangeDeletions);
|
||||
|
||||
// Poll until draining is complete (waits for orphanCleanupDelaySecs to elapse).
|
||||
assert.soon(
|
||||
() => {
|
||||
const statusRes = st.s.adminCommand({getTransitionToDedicatedConfigServerStatus: 1});
|
||||
if (!statusRes.ok && statusRes.code === ErrorCodes.RemoveShardDrainingInProgress) {
|
||||
// Keep retrying until orphanCleanupDelaySecs elapses.
|
||||
return false;
|
||||
}
|
||||
assert.commandWorked(statusRes);
|
||||
// Wait for ready to commit.
|
||||
return statusRes.state === "drainingComplete";
|
||||
},
|
||||
() => "transitionToDedicatedConfigServer draining did not reach drainingComplete",
|
||||
);
|
||||
suspendRangeDeletionFp.off();
|
||||
ShardTransitionUtil.waitForRangeDeletions(st.s);
|
||||
|
||||
// Start the final transition command. This will trigger locally dropping all tracked user
|
||||
// databases on the config server. Hang after removing one database and trigger a failover to
|
||||
@ -381,44 +375,69 @@ const newShardName = assert.commandWorked(st.s.adminCommand({addShard: newShardR
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {skey: 0}, to: newShardName}));
|
||||
ShardTransitionUtil.waitForRangeDeletions(st.s);
|
||||
|
||||
// Insert documents via mongos after collections have been fully drained and verify data goes
|
||||
// to the destination shard (not the config shard) and the transition can proceed.
|
||||
// Insert documents directly onto the config server after it has fully drained and verify we
|
||||
// can't complete the transition.
|
||||
|
||||
// Drained sharded collection - insert via mongos and verify it goes to newShard.
|
||||
assert.commandWorked(st.s.getCollection(ns).insert({x: 1}));
|
||||
assert.eq(0, st.configRS.getPrimary().getCollection(ns).countDocuments({x: 1}));
|
||||
assert.eq(1, newShardRS.getPrimary().getCollection(ns).countDocuments({x: 1}));
|
||||
assert.commandWorked(st.s.getCollection(ns).remove({x: 1}));
|
||||
// Drained sharded collection.
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(ns).insert({x: 1}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
assert.eq(ns, removeRes.firstNonEmptyCollection, tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(ns).remove({x: 1}));
|
||||
|
||||
// Drained unsharded collection - insert via mongos and verify it goes to newShard.
|
||||
assert.commandWorked(st.s.getCollection(unshardedNs).insert({y: 1}));
|
||||
assert.eq(0, st.configRS.getPrimary().getCollection(unshardedNs).countDocuments({y: 1}));
|
||||
assert.eq(1, newShardRS.getPrimary().getCollection(unshardedNs).countDocuments({y: 1}));
|
||||
assert.commandWorked(st.s.getCollection(unshardedNs).remove({y: 1}));
|
||||
// Drained unsharded collection.
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(unshardedNs).insert({x: 1}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
assert.eq(unshardedNs, removeRes.firstNonEmptyCollection, tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(unshardedNs).remove({x: 1}));
|
||||
|
||||
// Drained time series collection - insert via mongos and verify it goes to newShard.
|
||||
assert.commandWorked(st.s.getCollection(timeseriesShardedNs).insert({time: ISODate(), x: 1}));
|
||||
const timeseriesDB = st.s.getDB(timeseriesDbName);
|
||||
const bucketColl = getTimeseriesCollForDDLOps(timeseriesDB, timeseriesDB[timeseriesShardedCollName]);
|
||||
const bucketCollName = bucketColl.getFullName();
|
||||
assert.eq(0, st.configRS.getPrimary().getCollection(bucketCollName).countDocuments({}));
|
||||
assert.gte(newShardRS.getPrimary().getCollection(bucketCollName).countDocuments({}), 1);
|
||||
assert.commandWorked(st.s.getCollection(timeseriesShardedNs).remove({}));
|
||||
// Drained time series collection.
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(timeseriesShardedNs).insert({time: ISODate()}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
assert.eq(timeseriesShardedNs, removeRes.firstNonEmptyCollection, tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(timeseriesShardedNs).remove({}));
|
||||
|
||||
// New collection - insert via mongos and verify all go to newshard.
|
||||
const newColl = unshardedDbName + ".newColl";
|
||||
assert.commandWorked(st.s.getCollection(newColl).insert({x: 1}));
|
||||
assert.eq(0, st.configRS.getPrimary().getCollection(newColl).countDocuments({x: 1}));
|
||||
assert.eq(1, newShardRS.getPrimary().getCollection(newColl).countDocuments({x: 1}));
|
||||
assert.commandWorked(st.s.getCollection(newColl).remove({x: 1}));
|
||||
// Previously non-existent collection in a drained database, e.g. a temporary collection created
|
||||
// by an in-progress operation or a new untracked unsharded collection.
|
||||
assert.commandWorked(st.configRS.getPrimary().getDB(dbName)["newOrphanCollection"].insert({x: 1}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
assert.eq(dbName + ".newOrphanCollection", removeRes.firstNonEmptyCollection, tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getDB(dbName)["newOrphanCollection"].remove({x: 1}));
|
||||
|
||||
// Logical sessions collection - insert via mongos and verify it goes to newShard.
|
||||
const sessionsNs = "config.system.sessions";
|
||||
const testSessionId = {id: UUID()};
|
||||
assert.commandWorked(st.s.getCollection(sessionsNs).insert({_id: testSessionId, lastUse: new Date()}));
|
||||
assert.eq(0, st.configRS.getPrimary().getCollection(sessionsNs).countDocuments({_id: testSessionId}));
|
||||
assert.eq(1, newShardRS.getPrimary().getCollection(sessionsNs).countDocuments({_id: testSessionId}));
|
||||
assert.commandWorked(st.s.getCollection(sessionsNs).remove({_id: testSessionId}));
|
||||
// Logical sessions collection.
|
||||
assert.commandWorked(
|
||||
st.configRS.getPrimary().adminCommand({_flushRoutingTableCacheUpdates: "config.system.sessions"}),
|
||||
);
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection("config.system.sessions").insert({x: 1}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
assert.eq("config.system.sessions", removeRes.firstNonEmptyCollection, tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection("config.system.sessions").remove({x: 1}));
|
||||
|
||||
// More than one collection.
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(ns).insert({x: 1}));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(unshardedNs).insert({x: 1}));
|
||||
removeRes = assert.commandWorked(st.s.adminCommand({transitionToDedicatedConfigServer: 1}));
|
||||
assert.eq("pendingDataCleanup", removeRes.state);
|
||||
assert.eq("waiting for data to be cleaned up", removeRes.msg);
|
||||
assert.eq(0, removeRes.pendingRangeDeletions, tojson(removeRes));
|
||||
// Only the first non-empty collection found is in the response, so either ns or unshardedNs.
|
||||
assert.contains(removeRes.firstNonEmptyCollection, [ns, unshardedNs], tojson(removeRes));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(ns).remove({x: 1}));
|
||||
assert.commandWorked(st.configRS.getPrimary().getCollection(unshardedNs).remove({x: 1}));
|
||||
|
||||
// The transition has not removed the config server as a shard yet.
|
||||
assert.neq(null, st.s.getDB("config").shards.findOne({_id: "config"}));
|
||||
|
||||
@ -109,7 +109,7 @@ filters:
|
||||
- "direct_shard_connection_ddls_on_replicaset.js":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-routing-and-topology
|
||||
- "transition_to_dedicated_config_server*.js":
|
||||
- "transition_to_dedicated_config_server.js":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-routing-and-topology
|
||||
- "direct_shard_connection_ddls_during_promotion_to_*.js":
|
||||
|
||||
@ -12,13 +12,7 @@ import {moveDatabaseAndUnshardedColls} from "jstests/sharding/libs/move_database
|
||||
const dbName = "test";
|
||||
const collName = "collTest";
|
||||
const ns = dbName + "." + collName;
|
||||
const st = new ShardingTest({
|
||||
shards: 2,
|
||||
mongos: 1,
|
||||
config: 1,
|
||||
configShard: true,
|
||||
enableBalancer: true,
|
||||
});
|
||||
const st = new ShardingTest({shards: 2, mongos: 1, config: 1, configShard: true, enableBalancer: true});
|
||||
const adminDB = st.s.getDB("admin");
|
||||
const distributed_txn_insert_count = 10;
|
||||
|
||||
|
||||
@ -62,11 +62,6 @@ function removeShardOld(s, shardName, timeout) {
|
||||
// We should retry the operation when this happens.
|
||||
return kRetry;
|
||||
}
|
||||
if (shardName == "config" && res.code === ErrorCodes.RemoveShardDrainingInProgress) {
|
||||
// If orphanCleanupDelaySecs hasn't elapsed yet, the command will fail with
|
||||
// RemoveShardDrainingInProgress. Keep retrying until the delay elapses.
|
||||
return kRetry;
|
||||
}
|
||||
}
|
||||
assert.commandWorked(res);
|
||||
return res.state == "completed";
|
||||
|
||||
@ -1,236 +0,0 @@
|
||||
/**
|
||||
* Test transition to dedicated config server waits for orphanCleanupDelaySecs before dropping
|
||||
* local collections.
|
||||
* @tags: [
|
||||
* requires_fcv_83,
|
||||
* # This test disables the range deleter (disableResumableRangeDeleter: true) to test orphan
|
||||
* # cleanup delay. Stepdowns during moveChunk create range deletion tasks that can't be cleaned
|
||||
* # up, causing subsequent moveChunk retries to fail with "overlapping range deletion" errors.
|
||||
* does_not_support_stepdowns
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {after, afterEach, before, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
function insertDocs(coll, numDocs) {
|
||||
const docs = [];
|
||||
for (let i = 0; i < numDocs; i++) {
|
||||
docs.push({_id: i, data: "x".repeat(10)});
|
||||
}
|
||||
assert.commandWorked(coll.insertMany(docs));
|
||||
}
|
||||
|
||||
describe("Test transition to dedicated config server waits for orphanCleanupDelaySecs", function () {
|
||||
const orphanCleanupDelaySecs = 3;
|
||||
|
||||
before(() => {
|
||||
jsTest.log.info(
|
||||
"Create sharded cluster 'stOrphanDelay' with embedded config server and orphanCleanupDelaySecs > 0",
|
||||
);
|
||||
this.st = new ShardingTest({
|
||||
name: jsTestName() + "_orphanDelay",
|
||||
shards: 2,
|
||||
rs: {nodes: 3},
|
||||
other: {
|
||||
enableBalancer: true,
|
||||
configShard: true,
|
||||
rsOptions: {
|
||||
setParameter: {
|
||||
orphanCleanupDelaySecs: orphanCleanupDelaySecs,
|
||||
// Disable range deleter to ensure range deletion tasks remain in DB
|
||||
// for the transition code to find and wait for orphanCleanupDelaySecs
|
||||
disableResumableRangeDeleter: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
after(() => {
|
||||
jsTest.log.info("Stop sharded cluster 'stOrphanDelay'");
|
||||
this.st.stop();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
const numShards = this.st.s.getDB("config").shards.count();
|
||||
if (numShards == 2) {
|
||||
jsTest.log.info("Stop transition to dedicated config server");
|
||||
const stopResult = this.st.s.adminCommand({stopTransitionToDedicatedConfigServer: 1});
|
||||
// May fail if transition already committed
|
||||
if (stopResult.ok) {
|
||||
// Check that config shard draining has stopped successfully
|
||||
const notDrainingShards = this.st.s.getDB("config").shards.find({"draining": true}).toArray();
|
||||
assert.eq(0, notDrainingShards.length);
|
||||
}
|
||||
} else {
|
||||
jsTest.log.info("Transition back to embedded config server");
|
||||
assert.commandWorked(this.st.s.adminCommand({transitionFromDedicatedConfigServer: 1}));
|
||||
}
|
||||
});
|
||||
|
||||
it("Test transition completes with delayed range deletion tasks after waiting", function () {
|
||||
const dbName = jsTestName() + "_delayed";
|
||||
const collName = "testColl";
|
||||
const ns = dbName + "." + collName;
|
||||
|
||||
assert.commandWorked(this.st.s.adminCommand({enableSharding: dbName, primaryShard: "config"}));
|
||||
assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
const coll = this.st.s.getDB(dbName).getCollection(collName);
|
||||
insertDocs(coll, 100);
|
||||
|
||||
// Split and migrate a chunk to create range deletion tasks
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({split: ns, middle: {_id: 50}}).ok,
|
||||
"split did not complete within the timeout",
|
||||
);
|
||||
assert.soon(
|
||||
() =>
|
||||
this.st.s.adminCommand({
|
||||
moveChunk: ns,
|
||||
find: {_id: 0},
|
||||
to: this.st.shard1.shardName,
|
||||
_waitForDelete: false,
|
||||
}).ok,
|
||||
"moveChunk did not complete within the timeout",
|
||||
);
|
||||
|
||||
// Move the other chunk as well
|
||||
assert.soon(
|
||||
() =>
|
||||
this.st.s.adminCommand({
|
||||
moveChunk: ns,
|
||||
find: {_id: 50},
|
||||
to: this.st.shard1.shardName,
|
||||
_waitForDelete: false,
|
||||
}).ok,
|
||||
"moveChunk did not complete within the timeout",
|
||||
);
|
||||
|
||||
// Move primary to non-config shard
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({movePrimary: dbName, to: this.st.shard1.shardName}).ok,
|
||||
"movePrimary did not complete within the timeout",
|
||||
);
|
||||
|
||||
jsTest.log.info("Start transition to dedicated config server");
|
||||
assert.commandWorked(this.st.s.adminCommand({startTransitionToDedicatedConfigServer: 1}));
|
||||
this.st.configRS.awaitReplication();
|
||||
|
||||
jsTest.log.info("Wait for draining to complete");
|
||||
assert.soon(
|
||||
() => {
|
||||
const drainingStatus = this.st.s.adminCommand({getTransitionToDedicatedConfigServerStatus: 1});
|
||||
assert.commandWorked(drainingStatus);
|
||||
return "drainingComplete" == drainingStatus.state;
|
||||
},
|
||||
"getTransitionToDedicatedConfigServerStatus did not return 'drainingComplete' status within the timeout",
|
||||
120000,
|
||||
);
|
||||
|
||||
jsTest.log.info("Commit transition to dedicated config server (will wait for orphan cleanup delay if needed)");
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({commitTransitionToDedicatedConfigServer: 1}).ok,
|
||||
"commitTransitionToDedicatedConfigServer did not complete within the timeout",
|
||||
);
|
||||
|
||||
// Verify config shard is no longer in config.shards
|
||||
const configShard = this.st.s.getDB("config").shards.findOne({_id: "config"});
|
||||
assert.eq(null, configShard, "Config shard should have been removed");
|
||||
|
||||
assert.commandWorked(this.st.s.getDB(dbName).dropDatabase());
|
||||
});
|
||||
|
||||
it("Test queries started before draining fail with QueryPlanKilled after collection drop", function () {
|
||||
const dbName = jsTestName() + "_qryKilled";
|
||||
const collName = "testColl";
|
||||
const ns = dbName + "." + collName;
|
||||
|
||||
assert.commandWorked(this.st.s.adminCommand({enableSharding: dbName, primaryShard: "config"}));
|
||||
assert.commandWorked(this.st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
// Insert enough data to require multiple batches (default batch size is 101)
|
||||
const coll = this.st.s.getDB(dbName).getCollection(collName);
|
||||
insertDocs(coll, 100);
|
||||
|
||||
this.st.configRS.awaitReplication();
|
||||
|
||||
// Open a cursor via mongos with secondary read preference
|
||||
// Secondary reads will fail with QueryPlanKilled when the collection is dropped.
|
||||
jsTest.log.info("Opening cursor via mongos with secondary read preference before draining starts");
|
||||
const cursor = this.st.s.getDB(dbName).getCollection(collName).find({}).readPref("secondary").batchSize(10);
|
||||
|
||||
// Fetch the first batch to establish the cursor
|
||||
const firstDoc = cursor.next();
|
||||
assert.neq(null, firstDoc, "Should have documents in the collection");
|
||||
const cursorId = cursor.getId();
|
||||
assert.neq(0, cursorId, "Cursor should be open with more documents to fetch");
|
||||
|
||||
// Move all chunks off the config shard
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({split: ns, middle: {_id: 250}}).ok,
|
||||
"split did not complete within the timeout",
|
||||
);
|
||||
assert.soon(
|
||||
() =>
|
||||
this.st.s.adminCommand({
|
||||
moveChunk: ns,
|
||||
find: {_id: 0},
|
||||
to: this.st.shard1.shardName,
|
||||
_waitForDelete: false,
|
||||
}).ok,
|
||||
"moveChunk did not complete within the timeout",
|
||||
);
|
||||
assert.soon(
|
||||
() =>
|
||||
this.st.s.adminCommand({
|
||||
moveChunk: ns,
|
||||
find: {_id: 250},
|
||||
to: this.st.shard1.shardName,
|
||||
_waitForDelete: false,
|
||||
}).ok,
|
||||
"moveChunk did not complete within the timeout",
|
||||
);
|
||||
|
||||
// Move primary to non-config shard
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({movePrimary: dbName, to: this.st.shard1.shardName}).ok,
|
||||
"movePrimary did not complete within the timeout",
|
||||
);
|
||||
|
||||
jsTest.log.info("Start transition to dedicated config server");
|
||||
assert.commandWorked(this.st.s.adminCommand({startTransitionToDedicatedConfigServer: 1}));
|
||||
this.st.configRS.awaitReplication();
|
||||
|
||||
jsTest.log.info("Wait for draining to complete");
|
||||
assert.soon(
|
||||
() => {
|
||||
const drainingStatus = this.st.s.adminCommand({getTransitionToDedicatedConfigServerStatus: 1});
|
||||
assert.commandWorked(drainingStatus);
|
||||
return "drainingComplete" == drainingStatus.state;
|
||||
},
|
||||
"getTransitionToDedicatedConfigServerStatus did not return 'drainingComplete' status within the timeout",
|
||||
120000,
|
||||
);
|
||||
|
||||
jsTest.log.info("Commit transition to dedicated config server");
|
||||
assert.soon(
|
||||
() => this.st.s.adminCommand({commitTransitionToDedicatedConfigServer: 1}).ok,
|
||||
"commitTransitionToDedicatedConfigServer did not complete within the timeout",
|
||||
);
|
||||
|
||||
const error = assert.throws(() => {
|
||||
while (cursor.hasNext()) {
|
||||
cursor.next();
|
||||
}
|
||||
});
|
||||
|
||||
assert.eq(ErrorCodes.QueryPlanKilled, error.code, "Expected QueryPlanKilled error but got: " + tojson(error));
|
||||
|
||||
jsTest.log.info("Transition back to embedded config server");
|
||||
assert.commandWorked(this.st.s.adminCommand({transitionFromDedicatedConfigServer: 1}));
|
||||
|
||||
assert.commandWorked(this.st.s.getDB(dbName).dropDatabase());
|
||||
});
|
||||
});
|
||||
@ -539,7 +539,6 @@ mongo_cc_library(
|
||||
"//src/mongo/db/repl/hello:hello_command",
|
||||
"//src/mongo/db/rss:replicated_storage_service",
|
||||
"//src/mongo/db/s:forwardable_operation_metadata",
|
||||
"//src/mongo/db/s:range_deletion_task",
|
||||
"//src/mongo/db/s:resharding_server_parameters_idl",
|
||||
"//src/mongo/db/shard_role",
|
||||
"//src/mongo/db/shard_role/ddl:coll_mod_command_idl",
|
||||
|
||||
@ -337,16 +337,6 @@ mongo_cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "range_deletion_task",
|
||||
srcs = ["range_deletion_task_gen"],
|
||||
deps = [
|
||||
"//src/mongo:base",
|
||||
"//src/mongo/db:server_base",
|
||||
"//src/mongo/s:common_s",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "sharding_runtime_d",
|
||||
srcs = [
|
||||
@ -378,6 +368,7 @@ mongo_cc_library(
|
||||
"random_migration_testing_utils.cpp",
|
||||
"range_deleter_service.cpp",
|
||||
"range_deleter_service_op_observer.cpp",
|
||||
"range_deletion_task_gen",
|
||||
"range_deletion_util.cpp",
|
||||
"ready_range_deletions_processor.cpp",
|
||||
"session_catalog_migration.cpp",
|
||||
@ -473,7 +464,6 @@ mongo_cc_library(
|
||||
":forwardable_operation_metadata",
|
||||
":local_resharding_operations_registry",
|
||||
":query_analysis_writer",
|
||||
":range_deletion_task",
|
||||
":resharding_server_parameters_idl",
|
||||
":sharding_coord_d",
|
||||
":transaction_coordinator",
|
||||
@ -1161,10 +1151,8 @@ mongo_cc_unit_test(
|
||||
"//src/mongo/db/s/balancer:move_unsharded_policy_test.cpp",
|
||||
"//src/mongo/db/s/config:initial_split_policy_test.cpp",
|
||||
"//src/mongo/db/s/config:sampling_based_initial_split_policy_test.cpp",
|
||||
"//src/mongo/db/topology:remove_shard_commit_coordinator_test.cpp",
|
||||
"//src/mongo/db/topology:sharding_catalog_manager_add_shard_test.cpp",
|
||||
"//src/mongo/db/topology:sharding_catalog_manager_remove_shard_test.cpp",
|
||||
"//src/mongo/db/topology:topology_change_helpers_test.cpp",
|
||||
"//src/mongo/db/topology/cluster_parameters:set_cluster_parameter_coordinator_test.cpp",
|
||||
"//src/mongo/db/topology/vector_clock:topology_time_ticker_test.cpp",
|
||||
"//src/mongo/db/topology/vector_clock:vector_clock_config_server_test.cpp",
|
||||
|
||||
@ -138,13 +138,19 @@ void RemoveShardCommitCoordinator::_setReplicaSetNameOnDocument(OperationContext
|
||||
void RemoveShardCommitCoordinator::_joinMigrationsAndCheckRangeDeletions(OperationContext* opCtx) {
|
||||
topology_change_helpers::joinMigrations(opCtx);
|
||||
// The config server may be added as a shard again, so we locally drop its drained
|
||||
// sharded collections to enable that without user intervention. We only wait for
|
||||
// orphanCleanupDelaySecs as a best effort since creation of latest non pending range deletion
|
||||
// task. Any ongoing queries on primary or secondary from older chunk metadata will throw with
|
||||
// QueryPlanKilled error and can be retried by the user.
|
||||
auto task = topology_change_helpers::getLatestNonProcessingRangeDeletionTask(opCtx);
|
||||
if (task) {
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(opCtx, *task);
|
||||
// sharded collections to enable that without user intervention. But we have to wait for
|
||||
// the range deleter to quiesce to give queries and stale routers time to discover the
|
||||
// migration, to match the usual probabilistic guarantees for migrations.
|
||||
auto pendingRangeDeletions = topology_change_helpers::getRangeDeletionCount(opCtx);
|
||||
if (pendingRangeDeletions > 0) {
|
||||
LOGV2(9782400,
|
||||
"removeShard: waiting for range deletions",
|
||||
"pendingRangeDeletions"_attr = pendingRangeDeletions);
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
progress.setPendingRangeDeletions(pendingRangeDeletions);
|
||||
uasserted(
|
||||
RemoveShardDrainingInfo(progress),
|
||||
"Range deletions must complete before transitioning to a dedicated config server.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -192,18 +198,11 @@ void RemoveShardCommitCoordinator::_dropLocalCollections(OperationContext* opCtx
|
||||
}
|
||||
|
||||
DBDirectClient client(opCtx);
|
||||
BSONObj sessionsResult;
|
||||
BSONObj result;
|
||||
if (!client.dropCollection(NamespaceString::kLogicalSessionsNamespace,
|
||||
ShardingCatalogClient::writeConcernLocalHavingUpstreamWaiter(),
|
||||
&sessionsResult)) {
|
||||
uassertStatusOK(getStatusFromCommandResult(sessionsResult));
|
||||
}
|
||||
|
||||
BSONObj rangeDeletionsResult;
|
||||
if (!client.dropCollection(NamespaceString::kRangeDeletionNamespace,
|
||||
ShardingCatalogClient::writeConcernLocalHavingUpstreamWaiter(),
|
||||
&rangeDeletionsResult)) {
|
||||
uassertStatusOK(getStatusFromCommandResult(rangeDeletionsResult));
|
||||
&result)) {
|
||||
uassertStatusOK(getStatusFromCommandResult(result));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,293 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/persistent_task_store.h"
|
||||
#include "mongo/db/s/range_deletion_task_gen.h"
|
||||
#include "mongo/db/sharding_environment/config_server_test_fixture.h"
|
||||
#include "mongo/db/sharding_environment/sharding_runtime_d_params_gen.h"
|
||||
#include "mongo/db/topology/remove_shard_draining_progress_gen.h"
|
||||
#include "mongo/db/topology/remove_shard_exception.h"
|
||||
#include "mongo/db/topology/topology_change_helpers.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/duration.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class RemoveShardCommitCoordinatorTest : public ConfigServerTestFixture {
|
||||
public:
|
||||
RemoveShardCommitCoordinatorTest() : ConfigServerTestFixture(Options{}.useMockClock(true)) {}
|
||||
|
||||
protected:
|
||||
void setUp() override {
|
||||
setUpAndInitializeConfigDb();
|
||||
|
||||
// Advance the mock clock to a reasonable starting point
|
||||
// This ensures timestamps are valid and we have room to work with time differences
|
||||
clockSource()->reset(Date_t::fromMillisSinceEpoch(1000 * 1000));
|
||||
}
|
||||
|
||||
void tearDown() override {
|
||||
ConfigServerTestFixture::tearDown();
|
||||
}
|
||||
|
||||
void insertRangeDeletionTask(const UUID& migrationId,
|
||||
const NamespaceString& nss,
|
||||
const UUID& collUuid,
|
||||
const ChunkRange& range,
|
||||
CleanWhenEnum whenToClean,
|
||||
const Timestamp& timestamp,
|
||||
boost::optional<bool> pending = boost::none,
|
||||
boost::optional<bool> processing = boost::none) {
|
||||
RangeDeletionTask task(
|
||||
migrationId, nss, collUuid, ShardId("donorShard"), range, whenToClean);
|
||||
task.setTimestamp(timestamp);
|
||||
|
||||
if (pending) {
|
||||
task.setPending(*pending);
|
||||
}
|
||||
if (processing) {
|
||||
task.setProcessing(*processing);
|
||||
}
|
||||
|
||||
PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
|
||||
store.add(operationContext(), task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current time in seconds since epoch from the mock clock.
|
||||
*/
|
||||
uint32_t getCurrentTimeSecs() {
|
||||
return durationCount<Seconds>(clockSource()->now().toDurationSinceEpoch());
|
||||
}
|
||||
|
||||
int _originalOrphanCleanupDelaySecs;
|
||||
|
||||
const NamespaceString kTestNss =
|
||||
NamespaceString::createNamespaceString_forTest("testDb", "testColl");
|
||||
const ChunkRange kTestRange{BSON("_id" << 0), BSON("_id" << 100)};
|
||||
};
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckNoTasks) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_FALSE(task.has_value());
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckElapsed) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
Timestamp taskTimestamp(currentTimeSecs, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
taskTimestamp,
|
||||
false /* pending */);
|
||||
|
||||
clockSource()->advance(Seconds(100));
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_DOES_NOT_THROW(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task));
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckNotElapsed) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
Timestamp taskTimestamp(currentTimeSecs, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
taskTimestamp,
|
||||
false /* pending */);
|
||||
|
||||
clockSource()->advance(Seconds(30));
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_THROWS_CODE(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task),
|
||||
DBException,
|
||||
ErrorCodes::RemoveShardDrainingInProgress);
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckUsesLatestTimestamp) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 0), BSON("_id" << 50)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(900, 1),
|
||||
false /* pending */);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 50), BSON("_id" << 100)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(currentTimeSecs, 1),
|
||||
false /* pending */);
|
||||
|
||||
clockSource()->advance(Seconds(30));
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_THROWS_CODE(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task),
|
||||
DBException,
|
||||
ErrorCodes::RemoveShardDrainingInProgress);
|
||||
|
||||
clockSource()->advance(Seconds(40));
|
||||
|
||||
// Re-fetch task since we need the same task reference
|
||||
task = topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_DOES_NOT_THROW(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task));
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckSmallDelay) {
|
||||
orphanCleanupDelaySecs.store(0);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
Timestamp taskTimestamp(currentTimeSecs, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
taskTimestamp,
|
||||
false /* pending */);
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_DOES_NOT_THROW(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task));
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckIncludesPendingTasks) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(currentTimeSecs, 1),
|
||||
true /* pending */);
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
ASSERT_THROWS_CODE(
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task),
|
||||
DBException,
|
||||
ErrorCodes::RemoveShardDrainingInProgress);
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayCheckIgnoresProcessingTasks) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kNow, // Processing tasks have kNow
|
||||
Timestamp(currentTimeSecs, 1),
|
||||
boost::none /* pending not set */,
|
||||
true /* processing */);
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_FALSE(task.has_value());
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, PendingDataCleanupStateUsedForOrphanCleanupDelay) {
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
ASSERT_EQ(ShardDrainingStateEnum::kPendingDataCleanup, progress.getState());
|
||||
|
||||
RemoveShardDrainingInfo info(progress);
|
||||
ASSERT_EQ(ShardDrainingStateEnum::kPendingDataCleanup, info.getProgress().getState());
|
||||
|
||||
ASSERT_EQ(ErrorCodes::RemoveShardDrainingInProgress, RemoveShardDrainingInfo::code);
|
||||
}
|
||||
|
||||
TEST_F(RemoveShardCommitCoordinatorTest, OrphanCleanupDelayThrowsWithPendingDataCleanupState) {
|
||||
orphanCleanupDelaySecs.store(60);
|
||||
|
||||
auto currentTimeSecs = getCurrentTimeSecs();
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(currentTimeSecs, 1),
|
||||
false /* pending */);
|
||||
|
||||
auto task =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(task.has_value());
|
||||
|
||||
try {
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(operationContext(), *task);
|
||||
FAIL("Expected RemoveShardDrainingInfo exception");
|
||||
} catch (const DBException& ex) {
|
||||
ASSERT_EQ(ErrorCodes::RemoveShardDrainingInProgress, ex.code());
|
||||
auto info = ex.extraInfo<RemoveShardDrainingInfo>();
|
||||
ASSERT(info);
|
||||
ASSERT_EQ(ShardDrainingStateEnum::kPendingDataCleanup, info->getProgress().getState());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -85,7 +85,3 @@ structs:
|
||||
type: namespacestring
|
||||
description: "First non empty collection found on the shard being removed."
|
||||
optional: true
|
||||
pendingRangeDeletionTask:
|
||||
type: object_owned
|
||||
description: "The range deletion task we are waiting on for ophanCleanupDelaySecs to elapse"
|
||||
optional: true
|
||||
|
||||
@ -108,7 +108,6 @@
|
||||
#include "mongo/db/sharding_environment/sharding_config_server_parameters_gen.h"
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/sharding_environment/sharding_logging.h"
|
||||
#include "mongo/db/sharding_environment/sharding_runtime_d_params_gen.h"
|
||||
#include "mongo/db/tenant_id.h"
|
||||
#include "mongo/db/topology/add_shard_gen.h"
|
||||
#include "mongo/db/topology/cluster_parameters/cluster_server_parameter_cmds_gen.h"
|
||||
@ -808,14 +807,12 @@ RemoveShardProgress ShardingCatalogManager::checkDrainingProgress(OperationConte
|
||||
}
|
||||
|
||||
if (shardId == ShardId::kConfigServerId) {
|
||||
auto task = topology_change_helpers::getLatestNonProcessingRangeDeletionTask(opCtx);
|
||||
if (task) {
|
||||
try {
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(opCtx, *task);
|
||||
} catch (const ExceptionFor<ErrorCodes::RemoveShardDrainingInProgress>& ex) {
|
||||
const auto drainingProgress = ex.extraInfo<RemoveShardDrainingInfo>();
|
||||
return drainingProgress->getProgress();
|
||||
}
|
||||
// Wait for range deletions to complete
|
||||
auto pendingRangeDeletions = topology_change_helpers::getRangeDeletionCount(opCtx);
|
||||
if (pendingRangeDeletions > 0) {
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
progress.setPendingRangeDeletions(pendingRangeDeletions);
|
||||
return progress;
|
||||
}
|
||||
}
|
||||
|
||||
@ -857,17 +854,17 @@ RemoveShardProgress ShardingCatalogManager::removeShard(OperationContext* opCtx,
|
||||
if (shardId == ShardId::kConfigServerId) {
|
||||
topology_change_helpers::joinMigrations(opCtx);
|
||||
// The config server may be added as a shard again, so we locally drop its drained
|
||||
// sharded collections to enable that without user intervention. We wait for
|
||||
// orphanCleanupDelaySecs as a best effort since creation of latest non pending range
|
||||
// deletion task.
|
||||
auto task = topology_change_helpers::getLatestNonProcessingRangeDeletionTask(opCtx);
|
||||
if (task) {
|
||||
try {
|
||||
topology_change_helpers::checkOrphanCleanupDelayElapsed(opCtx, *task);
|
||||
} catch (const ExceptionFor<ErrorCodes::RemoveShardDrainingInProgress>& ex) {
|
||||
const auto drainingProgress = ex.extraInfo<RemoveShardDrainingInfo>();
|
||||
return drainingProgress->getProgress();
|
||||
}
|
||||
// sharded collections to enable that without user intervention. But we have to wait for
|
||||
// the range deleter to quiesce to give queries and stale routers time to discover the
|
||||
// migration, to match the usual probabilistic guarantees for migrations.
|
||||
auto pendingRangeDeletions = topology_change_helpers::getRangeDeletionCount(opCtx);
|
||||
if (pendingRangeDeletions > 0) {
|
||||
LOGV2(7564600,
|
||||
"removeShard: waiting for range deletions",
|
||||
"pendingRangeDeletions"_attr = pendingRangeDeletions);
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
progress.setPendingRangeDeletions(pendingRangeDeletions);
|
||||
return progress;
|
||||
}
|
||||
}
|
||||
|
||||
@ -947,14 +944,6 @@ RemoveShardProgress ShardingCatalogManager::removeShard(OperationContext* opCtx,
|
||||
uassertStatusOK(getStatusFromCommandResult(result));
|
||||
}
|
||||
}
|
||||
DBDirectClient client(opCtx);
|
||||
BSONObj rangeDeletionsResult;
|
||||
if (!client.dropCollection(
|
||||
NamespaceString::kRangeDeletionNamespace,
|
||||
ShardingCatalogClient::writeConcernLocalHavingUpstreamWaiter(),
|
||||
&rangeDeletionsResult)) {
|
||||
uassertStatusOK(getStatusFromCommandResult(rangeDeletionsResult));
|
||||
}
|
||||
}
|
||||
|
||||
// Draining is done, now finish removing the shard.
|
||||
|
||||
@ -92,7 +92,6 @@
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/sharding_environment/shard_id.h"
|
||||
#include "mongo/db/sharding_environment/sharding_config_server_parameters_gen.h"
|
||||
#include "mongo/db/sharding_environment/sharding_runtime_d_params_gen.h"
|
||||
#include "mongo/db/tenant_id.h"
|
||||
#include "mongo/db/topology/add_shard_gen.h"
|
||||
#include "mongo/db/topology/cluster_parameters/cluster_server_parameter_common.h"
|
||||
@ -397,6 +396,28 @@ void setAddOrRemoveShardInProgressClusterParam(OperationContext* opCtx, bool new
|
||||
}
|
||||
}
|
||||
|
||||
boost::optional<RemoveShardProgress> checkCollectionsAreEmpty(
|
||||
OperationContext* opCtx, const std::vector<NamespaceString>& collections) {
|
||||
for (const auto& nss : collections) {
|
||||
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
|
||||
if (!autoColl) {
|
||||
// Can't find the collection, so it must not have data.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!autoColl->isEmpty(opCtx)) {
|
||||
LOGV2(9022300, "removeShard: found non-empty local collection", logAttrs(nss));
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
progress.setFirstNonEmptyCollection(nss);
|
||||
progress.setPendingRangeDeletions(
|
||||
0); // Set this to 0 so that it is serialized in the response
|
||||
return {progress};
|
||||
}
|
||||
}
|
||||
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
void waitUntilReadyToBlockNewDDLCoordinators(OperationContext* opCtx) {
|
||||
const auto wouldJoinCoordinatorsBlock = [](OperationContext* opCtx) -> bool {
|
||||
// Check that all shards will be able to join ongoing DDLs quickly.
|
||||
@ -708,49 +729,6 @@ long long getRangeDeletionCount(OperationContext* opCtx) {
|
||||
return static_cast<long long>(store.count(opCtx, BSONObj()));
|
||||
}
|
||||
|
||||
boost::optional<RangeDeletionTask> getLatestNonProcessingRangeDeletionTask(
|
||||
OperationContext* opCtx) {
|
||||
AutoGetCollection collRangeDeletionLock(
|
||||
opCtx, NamespaceString::kRangeDeletionNamespace, MODE_S);
|
||||
DBDirectClient client(opCtx);
|
||||
|
||||
// Get latest non processing range deletion task scheduled for future cleanup
|
||||
// We include pending tasks to avoid a race condition where moveChunk commits
|
||||
// before marking the range deletion task as non-pending (SERVER-119117).
|
||||
FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
|
||||
findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << BSON("$ne" << true)));
|
||||
findCommand.setSort(BSON(RangeDeletionTask::kTimestampFieldName << -1));
|
||||
auto bsonDoc = client.findOne(std::move(findCommand));
|
||||
if (bsonDoc.isEmpty()) {
|
||||
return boost::none;
|
||||
}
|
||||
return RangeDeletionTask::parse(bsonDoc,
|
||||
IDLParserContext("getLatestNonProcessingRangeDeletionTask"));
|
||||
}
|
||||
|
||||
void checkOrphanCleanupDelayElapsed(OperationContext* opCtx, const RangeDeletionTask& task) {
|
||||
auto elapsedSec =
|
||||
getGlobalServiceContext()->getFastClockSource()->now().toMillisSinceEpoch() / 1000 -
|
||||
task.getTimestamp()->getSecs();
|
||||
// Note that in the normal range deletions workflow we begin waiting for
|
||||
// orphanCleanupDelaySecs after pending field is unset and it is marked as processing by the
|
||||
// range deleter service. Here the behavior is different and we wait since the time the task
|
||||
// was registered in DB.
|
||||
if (elapsedSec < orphanCleanupDelaySecs.load()) {
|
||||
LOGV2(1039900,
|
||||
"removeShard: waiting for orphanCleanupDelaySecs to complete",
|
||||
"elapsed"_attr = elapsedSec,
|
||||
"orphanCleanupDelaySecs"_attr = orphanCleanupDelaySecs.load());
|
||||
RemoveShardProgress progress(ShardDrainingStateEnum::kPendingDataCleanup);
|
||||
progress.setPendingRangeDeletionTask(task.toBSON());
|
||||
uasserted(RemoveShardDrainingInfo(progress),
|
||||
"The configured orphanCleanupDelaySecs must elapse before transitioning "
|
||||
"to a dedicated config server. Elapsed time: " +
|
||||
std::to_string(elapsedSec) + " seconds, orphanCleanupDelaySecs: " +
|
||||
std::to_string(orphanCleanupDelaySecs.load()));
|
||||
}
|
||||
}
|
||||
|
||||
void joinMigrations(OperationContext* opCtx) {
|
||||
// Join migrations to make sure there's no ongoing MigrationDestinationManager. New ones
|
||||
// will observe the draining state and abort before performing any work that could re-create
|
||||
@ -1680,10 +1658,26 @@ boost::optional<RemoveShardProgress> dropLocalCollectionsAndDatabases(
|
||||
// config server can transition back to catalog shard mode without requiring users to
|
||||
// manually drop them.
|
||||
|
||||
// First, verify all collections we would drop are empty. In normal operation, a
|
||||
// collection may still have data because of a sharded drop (which non-atomically
|
||||
// updates metadata before dropping user data). If this state persists, manual
|
||||
// intervention will be required to complete the transition, so we don't accidentally
|
||||
// delete real data.
|
||||
LOGV2(9022301, "Checking all local collections are empty", "shardId"_attr = shardName);
|
||||
|
||||
for (auto&& db : trackedDBs) {
|
||||
tassert(7783700,
|
||||
"Cannot drop admin or config database from the config server",
|
||||
!db.getDbName().isConfigDB() && !db.getDbName().isAdminDB());
|
||||
|
||||
auto collections = [&] {
|
||||
Lock::DBLock dbLock(opCtx, db.getDbName(), MODE_S);
|
||||
auto catalog = CollectionCatalog::get(opCtx);
|
||||
return catalog->getAllCollectionNamesFromDb(opCtx, db.getDbName());
|
||||
}();
|
||||
if (auto pendingDataCleanupState = checkCollectionsAreEmpty(opCtx, collections)) {
|
||||
return *pendingDataCleanupState;
|
||||
}
|
||||
}
|
||||
|
||||
// Now actually drop the databases; each request must either succeed or resolve into a
|
||||
@ -1698,6 +1692,14 @@ boost::optional<RemoveShardProgress> dropLocalCollectionsAndDatabases(
|
||||
hangAfterDroppingDatabaseInTransitionToDedicatedConfigServer.pauseWhileSet(opCtx);
|
||||
}
|
||||
|
||||
// Check if the sessions collection is empty. We defer dropping this collection to the caller
|
||||
// since it should only be dropped if featureFlagSessionsCollectionCoordinatorOnConfigServer is
|
||||
// disabled so the drop must be done in a fixed FCV region.
|
||||
if (auto pendingDataCleanupState =
|
||||
checkCollectionsAreEmpty(opCtx, {NamespaceString::kLogicalSessionsNamespace})) {
|
||||
return *pendingDataCleanupState;
|
||||
}
|
||||
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
|
||||
@ -45,7 +45,6 @@
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/repl/read_concern_level.h"
|
||||
#include "mongo/db/s/range_deletion_task_gen.h"
|
||||
#include "mongo/db/server_parameter.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/session/logical_session_id.h"
|
||||
@ -76,19 +75,6 @@ namespace topology_change_helpers {
|
||||
// Returns the count of range deletion tasks locally on the config server.
|
||||
long long getRangeDeletionCount(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Gets the latest non processing range deletion task scheduled for
|
||||
* future deletion. Used during transitionToDedicatedConfigServer to check for any pending delayed
|
||||
* range deletion tasks.
|
||||
*/
|
||||
boost::optional<RangeDeletionTask> getLatestNonProcessingRangeDeletionTask(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Checks if the orphan cleanup delay has elapsed. If not, throws a RemoveShardDrainingInfo
|
||||
* exception.
|
||||
*/
|
||||
void checkOrphanCleanupDelayElapsed(OperationContext* opCtx, const RangeDeletionTask& task);
|
||||
|
||||
// Calls ShardsvrJoinMigrations locally on the config server.
|
||||
void joinMigrations(OperationContext* opCtx);
|
||||
|
||||
|
||||
@ -1,261 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/topology/topology_change_helpers.h"
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/global_catalog/type_chunk.h"
|
||||
#include "mongo/db/global_catalog/type_shard.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/persistent_task_store.h"
|
||||
#include "mongo/db/s/range_deletion_task_gen.h"
|
||||
#include "mongo/db/sharding_environment/config_server_test_fixture.h"
|
||||
#include "mongo/db/sharding_environment/shard_id.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class TopologyChangeHelpersTest : public ConfigServerTestFixture {
|
||||
protected:
|
||||
void setUp() override {
|
||||
setUpAndInitializeConfigDb();
|
||||
setupShards({ShardType(kShardId.toString(), "host0:12345")});
|
||||
}
|
||||
|
||||
void insertRangeDeletionTask(const UUID& migrationId,
|
||||
const NamespaceString& nss,
|
||||
const UUID& collUuid,
|
||||
const ChunkRange& range,
|
||||
CleanWhenEnum whenToClean,
|
||||
const Timestamp& timestamp,
|
||||
boost::optional<bool> pending = boost::none,
|
||||
boost::optional<bool> processing = boost::none) {
|
||||
RangeDeletionTask task(
|
||||
migrationId, nss, collUuid, ShardId("donorShard"), range, whenToClean);
|
||||
task.setTimestamp(timestamp);
|
||||
|
||||
if (pending) {
|
||||
task.setPending(*pending);
|
||||
}
|
||||
if (processing) {
|
||||
task.setProcessing(*processing);
|
||||
}
|
||||
|
||||
PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
|
||||
store.add(operationContext(), task);
|
||||
}
|
||||
|
||||
const NamespaceString kTestNss =
|
||||
NamespaceString::createNamespaceString_forTest("testDb", "testColl");
|
||||
const ChunkRange kTestRange{BSON("_id" << 0), BSON("_id" << 100)};
|
||||
const ShardId kShardId{"shard0"};
|
||||
};
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, NoRangeDeletionTasks) {
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_FALSE(result.has_value());
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, OnlyPendingTasks) {
|
||||
Timestamp expectedTimestamp(100, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
true /* pending */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, OnlyProcessingTasks) {
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(100, 1),
|
||||
boost::none /* pending not set */,
|
||||
true /* processing */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_FALSE(result.has_value());
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, SingleNonPendingTask) {
|
||||
Timestamp expectedTimestamp(100, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
false /* pending */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, TaskWithNoPendingField) {
|
||||
Timestamp expectedTimestamp(200, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
boost::none /* pending not set */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, MultipleTasksReturnsLatest) {
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 0), BSON("_id" << 50)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(100, 1),
|
||||
false /* pending */);
|
||||
|
||||
Timestamp latestTimestamp(300, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 50), BSON("_id" << 100)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
latestTimestamp,
|
||||
false /* pending */);
|
||||
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 100), BSON("_id" << 150)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(200, 1),
|
||||
false /* pending */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), latestTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, MixedTasksFiltersCorrectly) {
|
||||
Timestamp expectedTimestamp(500, 1);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 0), BSON("_id" << 10)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
true /* pending */);
|
||||
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 10), BSON("_id" << 20)},
|
||||
CleanWhenEnum::kNow,
|
||||
Timestamp(400, 1),
|
||||
boost::none /* pending not set */,
|
||||
true /* processing */);
|
||||
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 30), BSON("_id" << 40)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(350, 1),
|
||||
false /* pending */);
|
||||
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
ChunkRange{BSON("_id" << 40), BSON("_id" << 50)},
|
||||
CleanWhenEnum::kDelayed,
|
||||
Timestamp(200, 1),
|
||||
false /* pending */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, PendingFalseExplicitlySet) {
|
||||
Timestamp expectedTimestamp(150, 5);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
false /* pending explicitly false */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
TEST_F(TopologyChangeHelpersTest, ProcessingFalseExplicitlySet) {
|
||||
Timestamp expectedTimestamp(175, 3);
|
||||
insertRangeDeletionTask(UUID::gen(),
|
||||
kTestNss,
|
||||
UUID::gen(),
|
||||
kTestRange,
|
||||
CleanWhenEnum::kDelayed,
|
||||
expectedTimestamp,
|
||||
false /* pending */,
|
||||
false /* processing */);
|
||||
|
||||
auto result =
|
||||
topology_change_helpers::getLatestNonProcessingRangeDeletionTask(operationContext());
|
||||
ASSERT_TRUE(result.has_value());
|
||||
ASSERT_EQ(*result->getTimestamp(), expectedTimestamp);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
Loading…
Reference in New Issue
Block a user