SERVER-125628 chunk operations should leave shard catalog metadata as non-authoritative (#52828)
GitOrigin-RevId: bd5b6bb73d77874a7a6cf7472541b68f8de27adf
This commit is contained in:
parent
811da367ec
commit
aaaf011cd4
@ -17,27 +17,10 @@ function getChunksMetadataFromGlobalCatalog(uuid) {
|
||||
return st.s.getDB("config").chunks.find({uuid}).toArray();
|
||||
}
|
||||
|
||||
// Helper: Validate that the collection metadata from the shard catalog matches expected.
|
||||
function validateCollectionMetadataFromShardCatalog(ns, shard, expectedCollMetadata) {
|
||||
const collMetadataFromShard = shard.getDB("config").getCollection("shard.catalog.collections").findOne({_id: ns});
|
||||
assert.docEq(expectedCollMetadata, collMetadataFromShard, "Mismatch in collection metadata for namespace: " + ns);
|
||||
}
|
||||
|
||||
// Helper: Validate that the chunks metadata from the shard catalog matches expected.
|
||||
function validateChunksFromShardCatalog(uuid, shard, expectedChunksMetadata) {
|
||||
const chunksMetadataFromShard = shard.getDB("config").getCollection("shard.catalog.chunks").find({uuid}).toArray();
|
||||
|
||||
assert.eq(
|
||||
expectedChunksMetadata.length,
|
||||
chunksMetadataFromShard.length,
|
||||
"Mismatch in number of chunks for uuid: " + uuid,
|
||||
);
|
||||
|
||||
expectedChunksMetadata.forEach((expectedChunk) => {
|
||||
const localChunk = chunksMetadataFromShard.find((c) => c._id.equals(expectedChunk._id));
|
||||
assert(localChunk, "Chunk " + expectedChunk._id + " missing locally on shard");
|
||||
assert.docEq(localChunk, expectedChunk, "Chunk metadata mismatch for " + expectedChunk._id);
|
||||
});
|
||||
// Helper: Assert zero metadata inconsistencies for the given collection
|
||||
function assertNoMetadataInconsistencies(coll) {
|
||||
const inconsistencies = coll.checkMetadataConsistency().toArray();
|
||||
assert.eq(0, inconsistencies.length, tojson(inconsistencies));
|
||||
}
|
||||
|
||||
{
|
||||
@ -102,12 +85,8 @@ function validateChunksFromShardCatalog(uuid, shard, expectedChunksMetadata) {
|
||||
session.endSession();
|
||||
}
|
||||
|
||||
// Validate collection metadata.
|
||||
validateCollectionMetadataFromShardCatalog(ns, st.shard0, globalCollMetadata);
|
||||
|
||||
// Validate chunks metadata.
|
||||
const globalChunksMetadata = getChunksMetadataFromGlobalCatalog(collUUID);
|
||||
validateChunksFromShardCatalog(collUUID, st.shard0, globalChunksMetadata);
|
||||
// Validate collection and chunk metadata consistency for the collection.
|
||||
assertNoMetadataInconsistencies(testColl);
|
||||
}
|
||||
|
||||
{
|
||||
@ -155,13 +134,8 @@ function validateChunksFromShardCatalog(uuid, shard, expectedChunksMetadata) {
|
||||
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
||||
|
||||
// disable migrations
|
||||
assert.commandWorked(
|
||||
st.configRS
|
||||
.getPrimary()
|
||||
.adminCommand({_configsvrSetAllowMigrations: ns, allowMigrations: false, writeConcern: {w: "majority"}}),
|
||||
);
|
||||
|
||||
// The retryable-write uassert fires before the migrations-disabled check, so there's no need
|
||||
// to disable migrations here.
|
||||
assert.commandFailedWithCode(
|
||||
st.shard0.getDB(dbName).runCommand({_shardsvrFetchCollMetadata: ns, writeConcern: {w: "majority"}}),
|
||||
10303100,
|
||||
@ -241,11 +215,8 @@ function validateChunksFromShardCatalog(uuid, shard, expectedChunksMetadata) {
|
||||
session.endSession();
|
||||
}
|
||||
|
||||
// Validate metadata consistency.
|
||||
const globalChunksMetadata = getChunksMetadataFromGlobalCatalog(collUUID);
|
||||
|
||||
validateCollectionMetadataFromShardCatalog(ns, st.shard0, globalCollMetadata);
|
||||
validateChunksFromShardCatalog(collUUID, st.shard0, globalChunksMetadata);
|
||||
// Validate collection and chunk metadata consistency for the collection.
|
||||
assertNoMetadataInconsistencies(testColl);
|
||||
}
|
||||
|
||||
st.stop();
|
||||
|
||||
@ -0,0 +1,266 @@
|
||||
/**
|
||||
* Verifies that after each chunk operation (split, mergeChunks, mergeAllChunks, and moveRange), the
|
||||
* shard's filtering metadata for the collection matches what the config server recorded.
|
||||
*
|
||||
* @tags: [
|
||||
* featureFlagShardAuthoritativeCollMetadata,
|
||||
* does_not_support_stepdowns,
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {configureFailPointForRS} from "jstests/libs/fail_point_util.js";
|
||||
import {afterEach, after, before, beforeEach, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// Helper functions
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// Push every chunk's `onCurrentShardSince` outside the configsvr's snapshot history window,
|
||||
// so mergeAllChunksOnShard considers them eligible for merging.
|
||||
function setOnCurrentShardSince(mongoS, coll, extraQuery, refTimestamp, offsetInSeconds) {
|
||||
const session = mongoS.startSession({retryWrites: true});
|
||||
const sessionConfigDB = session.getDatabase("config");
|
||||
const collUuid = sessionConfigDB.collections.findOne({_id: coll.getFullName()}).uuid;
|
||||
const query = Object.assign({uuid: collUuid}, extraQuery);
|
||||
const newValue = new Timestamp(refTimestamp.getTime() + offsetInSeconds, 0);
|
||||
sessionConfigDB.chunks.find(query).forEach((chunk) => {
|
||||
assert.commandWorked(
|
||||
sessionConfigDB.chunks.updateOne({_id: chunk._id}, [
|
||||
{
|
||||
$set: {
|
||||
"onCurrentShardSince": newValue,
|
||||
"history": [{validAfter: newValue, shard: "$shard"}],
|
||||
},
|
||||
},
|
||||
]),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
function setHistoryWindowInSecs(st, valueInSeconds) {
|
||||
configureFailPointForRS(st.configRS.nodes, "overrideHistoryWindowInSecs", {seconds: valueInSeconds}, "alwaysOn");
|
||||
}
|
||||
|
||||
function resetHistoryWindowInSecs(st) {
|
||||
configureFailPointForRS(st.configRS.nodes, "overrideHistoryWindowInSecs", {}, "off");
|
||||
}
|
||||
|
||||
// Forces the CSR for `ns` on `shardConn` into the kNonAuthoritative state by clearing
|
||||
// the in-memory filtering metadata via the test-only internal command. The CSR will be
|
||||
// repopulated by the next versioned operation; if the authoritative-collection-metadata
|
||||
// feature flag is on, that subsequent refresh may flip the CSR back to kAuthoritative —
|
||||
// so this only guarantees the starting state immediately before the next op.
|
||||
function forceCsrNonAuthoritative(shardConn, ns) {
|
||||
assert.commandWorked(
|
||||
shardConn.adminCommand({
|
||||
_internalClearCollectionShardingMetadata: ns,
|
||||
isAuthoritative: false,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Forces the CSR for `ns` on `shardConn` into the kAuthoritative state by running
|
||||
// _shardsvrFetchCollMetadata, which fetches the latest metadata from the config server
|
||||
// and installs it authoritatively on the shard. _shardsvrFetchCollMetadata requires
|
||||
// migrations to be disabled, so we toggle setAllowMigrations off, run the command, then
|
||||
// toggle migrations back on. Each toggle bumps the placement minor version; the chunk
|
||||
// op under test triggers a refresh that brings the CSR back in sync with config, so the
|
||||
// transient bumps don't affect the post-op assertion.
|
||||
function forceCsrAuthoritative(st, shardConn, ns) {
|
||||
const setAllowMigrations = (allow) => {
|
||||
assert.commandWorked(
|
||||
st.configRS.getPrimary().adminCommand({
|
||||
_configsvrSetAllowMigrations: ns,
|
||||
allowMigrations: allow,
|
||||
writeConcern: {w: "majority"},
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
setAllowMigrations(false);
|
||||
const session = shardConn.startSession({retryWrites: true});
|
||||
try {
|
||||
assert.commandWorked(
|
||||
session.getDatabase("admin").runCommand({
|
||||
_shardsvrFetchCollMetadata: ns,
|
||||
writeConcern: {w: "majority"},
|
||||
lsid: session.getSessionId(),
|
||||
txnNumber: NumberLong(1),
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
session.endSession();
|
||||
}
|
||||
setAllowMigrations(true);
|
||||
}
|
||||
|
||||
// Reads the shard's CSR placement version directly via getShardVersion against the shard
|
||||
// primary (no router-injected shard version, so this read does not trigger an implicit
|
||||
// stale-config refresh) and compares it — exact (major, minor) — to the highest
|
||||
// lastmod on a chunk owned by that shard in config.chunks. The existing helper in
|
||||
// jstests/libs/check_shard_filtering_metadata_helpers.js only checks the major
|
||||
// component, which is insufficient: split/mergeChunks/mergeAllChunks bump only the
|
||||
// minor component.
|
||||
function assertCsrMatchesConfig(st, ns, shardConn, shardName) {
|
||||
const configDB = st.s.getDB("config");
|
||||
const collEntry = configDB.collections.findOne({_id: ns});
|
||||
assert(collEntry, `config.collections has no entry for ${ns}`);
|
||||
|
||||
const topChunk = configDB.chunks
|
||||
.find({uuid: collEntry.uuid, shard: shardName})
|
||||
.sort({lastmod: -1})
|
||||
.limit(1)
|
||||
.toArray()[0];
|
||||
assert(topChunk, `config.chunks has no chunk for ${ns} on ${shardName}`);
|
||||
|
||||
const shardVersionRes = assert.commandWorked(shardConn.adminCommand({getShardVersion: ns, fullMetadata: true}));
|
||||
assert(
|
||||
shardVersionRes.metadata && shardVersionRes.metadata.shardVersion,
|
||||
`getShardVersion returned no shardVersion for ${ns} on ${shardName}: ${tojson(shardVersionRes)}`,
|
||||
);
|
||||
const csrShardVersion = shardVersionRes.metadata.shardVersion;
|
||||
|
||||
assert.eq(
|
||||
csrShardVersion.t,
|
||||
topChunk.lastmod.t,
|
||||
`Major version mismatch for ${ns} on ${shardName}: csr=${tojson(csrShardVersion)} configTop=${tojson(topChunk.lastmod)}`,
|
||||
);
|
||||
assert.eq(
|
||||
csrShardVersion.i,
|
||||
topChunk.lastmod.i,
|
||||
`Minor version mismatch for ${ns} on ${shardName}: csr=${tojson(csrShardVersion)} configTop=${tojson(topChunk.lastmod)}`,
|
||||
);
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// Test cases
|
||||
// ------------------------------------------------------------
|
||||
|
||||
describe("CSR health after chunk ops", function () {
|
||||
before(() => {
|
||||
this.st = new ShardingTest({
|
||||
mongos: 1,
|
||||
shards: 2,
|
||||
rs: {nodes: 1},
|
||||
});
|
||||
|
||||
this.shard0Name = this.st.shard0.shardName;
|
||||
this.shard1Name = this.st.shard1.shardName;
|
||||
this.shard0Primary = this.st.rs0.getPrimary();
|
||||
this.shard1Primary = this.st.rs1.getPrimary();
|
||||
|
||||
this.dbCounter = 0;
|
||||
});
|
||||
|
||||
after(() => {
|
||||
this.st.stop();
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
// Unique db per case; shared dbs would let earlier ops contaminate later ones.
|
||||
this.dbName = `csrHealthDb_${this.dbCounter++}`;
|
||||
this.collName = "coll";
|
||||
this.ns = `${this.dbName}.${this.collName}`;
|
||||
|
||||
assert.commandWorked(this.st.s.adminCommand({enableSharding: this.dbName, primaryShard: this.shard0Name}));
|
||||
assert.commandWorked(this.st.s.adminCommand({shardCollection: this.ns, key: {x: 1}}));
|
||||
|
||||
this.coll = this.st.s.getDB(this.dbName)[this.collName];
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
assert.commandWorked(this.st.s.getDB(this.dbName).dropDatabase());
|
||||
});
|
||||
|
||||
// Each chunk op runs twice: once with the shard's CSR forced into kAuthoritative via
|
||||
// _shardsvrFetchCollMetadata, and once with the CSR cleared to kNonAuthoritative via
|
||||
// the test-only _internalClearCollectionShardingMetadata command. Both starting states
|
||||
// must end with the CSR matching config after the chunk op.
|
||||
for (const startingState of ["Authoritative", "NonAuthoritative"]) {
|
||||
const forceShardCsrStartingState = (shardConn, ns) => {
|
||||
if (startingState === "Authoritative") {
|
||||
forceCsrAuthoritative(this.st, shardConn, ns);
|
||||
} else if (startingState === "NonAuthoritative") {
|
||||
forceCsrNonAuthoritative(shardConn, ns);
|
||||
}
|
||||
};
|
||||
|
||||
it(`split leaves donor CSR matching config [${startingState}]`, () => {
|
||||
forceShardCsrStartingState(this.shard0Primary, this.ns);
|
||||
|
||||
// Single chunk (-inf, +inf) on shard0. Splitting at x=50 bumps the minor
|
||||
// version on shard0.
|
||||
assert.commandWorked(this.st.s.adminCommand({split: this.ns, middle: {x: 50}}));
|
||||
|
||||
assertCsrMatchesConfig(this.st, this.ns, this.shard0Primary, this.shard0Name);
|
||||
});
|
||||
|
||||
it(`mergeChunks leaves donor CSR matching config [${startingState}]`, () => {
|
||||
// Set up two adjacent chunks on shard0 around x=50, then merge them back.
|
||||
assert.commandWorked(this.st.s.adminCommand({split: this.ns, middle: {x: 50}}));
|
||||
|
||||
forceShardCsrStartingState(this.shard0Primary, this.ns);
|
||||
|
||||
assert.commandWorked(
|
||||
this.st.s.adminCommand({
|
||||
mergeChunks: this.ns,
|
||||
bounds: [{x: MinKey}, {x: MaxKey}],
|
||||
}),
|
||||
);
|
||||
|
||||
assertCsrMatchesConfig(this.st, this.ns, this.shard0Primary, this.shard0Name);
|
||||
});
|
||||
|
||||
it(`mergeAllChunks leaves shard CSR matching config [${startingState}]`, () => {
|
||||
// Set up several adjacent chunks on shard0.
|
||||
for (const middle of [10, 20, 30, 40, 50]) {
|
||||
assert.commandWorked(this.st.s.adminCommand({split: this.ns, middle: {x: middle}}));
|
||||
}
|
||||
|
||||
// mergeAllChunksOnShard skips chunks inside the snapshot history window.
|
||||
// Push the window negative and rewrite onCurrentShardSince so every chunk
|
||||
// is eligible.
|
||||
setHistoryWindowInSecs(this.st, -10 * 60);
|
||||
try {
|
||||
setOnCurrentShardSince(this.st.s, this.coll, {shard: this.shard0Name}, new Timestamp(100, 0), 0);
|
||||
|
||||
forceShardCsrStartingState(this.shard0Primary, this.ns);
|
||||
|
||||
assert.commandWorked(
|
||||
this.st.s.adminCommand({
|
||||
mergeAllChunksOnShard: this.ns,
|
||||
shard: this.shard0Name,
|
||||
}),
|
||||
);
|
||||
|
||||
assertCsrMatchesConfig(this.st, this.ns, this.shard0Primary, this.shard0Name);
|
||||
} finally {
|
||||
resetHistoryWindowInSecs(this.st);
|
||||
}
|
||||
});
|
||||
|
||||
it(`moveRange leaves donor and recipient CSRs matching config [${startingState}]`, () => {
|
||||
// Only flip the donor: the recipient owns no chunks for this collection yet,
|
||||
// so its CSR has nothing to install or clear. (For 'Authoritative',
|
||||
// _shardsvrFetchCollMetadata would tassert on the recipient since there are
|
||||
// no owned chunks to persist.) The recipient's CSR is installed by the
|
||||
// migration itself on commit.
|
||||
forceShardCsrStartingState(this.shard0Primary, this.ns);
|
||||
|
||||
assert.commandWorked(
|
||||
this.st.s.adminCommand({
|
||||
moveRange: this.ns,
|
||||
min: {x: 50},
|
||||
toShard: this.shard1Name,
|
||||
// Wait for orphan range deletion so the donor's post-cleanup CSR is
|
||||
// observable rather than a transient pre-cleanup snapshot.
|
||||
_waitForDelete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
assertCsrMatchesConfig(this.st, this.ns, this.shard0Primary, this.shard0Name);
|
||||
assertCsrMatchesConfig(this.st, this.ns, this.shard1Primary, this.shard1Name);
|
||||
});
|
||||
}
|
||||
});
|
||||
@ -45,6 +45,8 @@
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/db/sharding_environment/client/shard.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/topology/shard_registry.h"
|
||||
@ -100,6 +102,19 @@ public:
|
||||
"invalid namespace specified for request",
|
||||
ns().isValid());
|
||||
|
||||
// Because this is a non-authoritative update, we must mark the CSR metadata as
|
||||
// kNonAuthoritative so that the following refresh will fetch the metadata from the
|
||||
// config server. Leaving it kAuthoritative would short-circuit the refresh against the
|
||||
// durable shard catalog and keep the CSR pinned to the pre-split version.
|
||||
// This must be done before starting the operation to ensure the CSR is left as
|
||||
// kNonAuthoritative in case of an unexpected failure.
|
||||
// TODO (SERVER-125786) The clearFilteringMetadata_nonAuthoritative should go away once
|
||||
// mergeAllChunks becomes authoritative.
|
||||
{
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, ns());
|
||||
scopedCsr->clearFilteringMetadata_nonAuthoritative(opCtx);
|
||||
}
|
||||
|
||||
ConfigSvrCommitMergeAllChunksOnShard configSvrCommitMergeAllChunksOnShard(ns());
|
||||
configSvrCommitMergeAllChunksOnShard.setDbName(DatabaseName::kAdmin);
|
||||
configSvrCommitMergeAllChunksOnShard.setShard(request().getShard());
|
||||
@ -120,8 +135,16 @@ public:
|
||||
|
||||
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(swCommandResponse));
|
||||
|
||||
return MergeAllChunksOnShardResponse::parse(swCommandResponse.getValue().response,
|
||||
IDL_PARSER_CONTEXT);
|
||||
auto response = MergeAllChunksOnShardResponse::parse(
|
||||
swCommandResponse.getValue().response, IDL_PARSER_CONTEXT);
|
||||
|
||||
// Update the shard catalog filtering metadata to reflect the new shard
|
||||
// version produced by the config server merge.
|
||||
uassertStatusOK(
|
||||
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, ns(), response.getShardVersion()));
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@ -49,6 +49,7 @@
|
||||
#include "mongo/db/s/chunk_operation_precondition_checks.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
@ -192,6 +193,19 @@ public:
|
||||
return metadata;
|
||||
}();
|
||||
|
||||
// Because this is a non-authoritative update, we must mark the CSR metadata as
|
||||
// kNonAuthoritative so that the following refresh will fetch the metadata from the
|
||||
// config server. Leaving it kAuthoritative would short-circuit the refresh against the
|
||||
// durable shard catalog and keep the CSR pinned to the pre-split version.
|
||||
// This must be done before starting the operation to ensure the CSR is left as
|
||||
// kNonAuthoritative in case of an unexpected failure.
|
||||
// TODO (SERVER-125784) The clearFilteringMetadata_nonAuthoritative should go away once
|
||||
// merge becomes authoritative.
|
||||
{
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, ns());
|
||||
scopedCsr->clearFilteringMetadata_nonAuthoritative(opCtx);
|
||||
}
|
||||
|
||||
auto const shardingState = ShardingState::get(opCtx);
|
||||
|
||||
ConfigSvrMergeChunks configRequest{
|
||||
@ -218,6 +232,8 @@ public:
|
||||
}
|
||||
return boost::none;
|
||||
}();
|
||||
|
||||
// Update the shard catalog filtering metadata to reflect the new shard version.
|
||||
uassertStatusOK(
|
||||
FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, nss, std::move(chunkVersionReceived)));
|
||||
|
||||
@ -177,6 +177,19 @@ Status splitChunk(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
|
||||
// Because this is a non-authoritative update, we must mark the CSR metadata as
|
||||
// kNonAuthoritative so that the following refresh will fetch the metadata from the
|
||||
// config server. Leaving it kAuthoritative would short-circuit the refresh against the
|
||||
// durable shard catalog and keep the CSR pinned to the pre-split version.
|
||||
// This must be done before starting the operation to ensure the CSR is left as
|
||||
// kNonAuthoritative in case of an unexpected failure.
|
||||
// TODO (SERVER-125785) The clearFilteringMetadata_nonAuthoritative should go away once
|
||||
// splitChunk becomes authoritative.
|
||||
{
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, nss);
|
||||
scopedCsr->clearFilteringMetadata_nonAuthoritative(opCtx);
|
||||
}
|
||||
|
||||
// Commit the split to the config server.
|
||||
auto request = SplitChunkRequest(nss,
|
||||
shardName,
|
||||
@ -209,6 +222,8 @@ Status splitChunk(OperationContext* opCtx,
|
||||
}
|
||||
return boost::none;
|
||||
}();
|
||||
|
||||
// Update the shard catalog filtering metadata to reflect the new shard version.
|
||||
uassertStatusOK(FilteringMetadataCache::get(opCtx)->onCollectionPlacementVersionMismatch(
|
||||
opCtx, nss, chunkVersionReceived));
|
||||
|
||||
|
||||
@ -313,7 +313,7 @@ void commitDropCollectionLocally(OperationContext* opCtx,
|
||||
clearShardCatalogCacheForDroppedCollection(opCtx, nss, uuid);
|
||||
}
|
||||
|
||||
void commitCreateCollectionLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
void commitCollectionMetadataLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto coll = fetchCollection(opCtx, nss);
|
||||
auto ownedChunks = fetchOwnedChunks(opCtx, nss, coll);
|
||||
|
||||
|
||||
@ -64,12 +64,18 @@ void commitDropCollectionLocally(OperationContext* opCtx,
|
||||
const UUID& uuid);
|
||||
|
||||
/**
|
||||
* Fetches the collection metadata and owned chunks from the global catalog, persists them to the
|
||||
* shard catalog (config.shard.catalog.collections and config.shard.catalog.chunks), writes an
|
||||
* oplog entry to invalidate collection metadata on secondaries, and updates the in-memory
|
||||
* CollectionShardingRuntime (CSR) with the new routing information.
|
||||
* Performs the local persistence of up-to-date collection metadata and chunk information for a
|
||||
* sharded collection on the shard. Specifically:
|
||||
* 1. Removes any existing chunk entries for the specified collection from the shard catalog
|
||||
* (config.shard.catalog.chunks).
|
||||
* 2. Fetches the latest collection metadata and owned chunk entries from the global catalog.
|
||||
* 3. Persists the collection metadata and owned chunks to the shard catalog collections and
|
||||
* chunks namespaces (config.shard.catalog.collections and config.shard.catalog.chunks).
|
||||
* 4. Writes an oplog entry to invalidate collection metadata on secondaries.
|
||||
* 5. Updates the in-memory CollectionShardingRuntime (CSR) to reflect the new filtering
|
||||
* information.
|
||||
*/
|
||||
void commitCreateCollectionLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
void commitCollectionMetadataLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
/**
|
||||
* Fetches the collection metadata from the global catalog, removes any existing chunk entries for
|
||||
|
||||
@ -201,7 +201,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionPersistsCollectionAn
|
||||
auto [collType, chunks] = makeCollectionMetadata(3);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 3);
|
||||
@ -218,7 +218,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionUpdatesCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
auto metadata = scopedCsr->getCurrentMetadataIfKnown();
|
||||
@ -231,8 +231,8 @@ TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionIsIdempotent) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
@ -242,7 +242,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionReplacesStaleChunksO
|
||||
// First pass: persist the initial chunks for the collection.
|
||||
auto [collType, chunksPass1] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunksPass1);
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
|
||||
@ -253,7 +253,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionReplacesStaleChunksO
|
||||
chunk.setName(OID::gen());
|
||||
}
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunksPass2);
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
// Only the second-pass chunks should remain; the first-pass rows must be deleted, not appended.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
|
||||
@ -95,7 +95,7 @@ public:
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(newOpCtx.get(), ns());
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(newOpCtx.get(), ns());
|
||||
}
|
||||
|
||||
LOGV2_INFO(
|
||||
|
||||
@ -28,15 +28,15 @@
|
||||
*/
|
||||
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/cancelable_operation_context.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/dbhelpers.h"
|
||||
#include "mongo/db/generic_argument_util.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/topology/vector_clock/vector_clock.h"
|
||||
#include "mongo/db/transaction/transaction_participant.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
@ -55,9 +55,10 @@ public:
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command. This command aims to fetch collection and chunks metadata, for a "
|
||||
"specific namespace, from the global catalog and persist it locally in the "
|
||||
"shard catalog";
|
||||
return "Internal command. Fetches collection and chunk metadata for a specific namespace "
|
||||
"from the global catalog, persists it locally in the shard catalog, installs "
|
||||
"it authoritatively on this node's in-memory CollectionShardingRuntime, and "
|
||||
"invalidates the collection metadata on secondaries.";
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
@ -73,13 +74,13 @@ public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
// Ensure shard is ready to accept sharded commands
|
||||
// Ensure shard is ready to accept sharded commands.
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
|
||||
// Ensure interruption on step down/up
|
||||
// Ensure interruption on step down/up.
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
// Check command write concern
|
||||
// Check command write concern.
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
|
||||
@ -95,9 +96,22 @@ public:
|
||||
"_shardsvrFetchCollMetadata can only run when migrations are disabled",
|
||||
!sharding_ddl_util::checkAllowMigrations(opCtx, nss));
|
||||
|
||||
auto collAndChunks = _fetchCollectionAndChunks(opCtx, nss);
|
||||
// Use an AlternativeClientRegion to perform the shard catalog writes outside the
|
||||
// retryable write session. The shard catalog commit contains its own idempotency
|
||||
// logic, and running inside the parent session would conflict with the dummy write
|
||||
// we issue below to mark the txn on secondaries.
|
||||
{
|
||||
auto newClient = getGlobalServiceContext()->getService()->makeClient(
|
||||
"ShardsvrFetchCollMetadata");
|
||||
AlternativeClientRegion acr(newClient);
|
||||
auto newOpCtx = CancelableOperationContext(
|
||||
cc().makeOperationContext(),
|
||||
opCtx->getCancellationToken(),
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
_persistMetadataLocally(opCtx, nss, collAndChunks);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(newOpCtx.get(), nss);
|
||||
}
|
||||
|
||||
LOGV2_INFO(10140202, "Persisted metadata locally on shard", "ns"_attr = nss);
|
||||
|
||||
@ -128,74 +142,6 @@ public:
|
||||
ResourcePattern::forClusterResource(request().getDbName().tenantId()),
|
||||
ActionType::internal));
|
||||
}
|
||||
|
||||
auto _fetchCollectionAndChunks(OperationContext* opCtx, const NamespaceString& nss)
|
||||
-> std::pair<CollectionType, std::vector<ChunkType>> {
|
||||
|
||||
const auto readConcern = [&]() -> repl::ReadConcernArgs {
|
||||
const auto vcTime = VectorClock::get(opCtx)->getTime();
|
||||
return {vcTime.configTime(), repl::ReadConcernLevel::kSnapshotReadConcern};
|
||||
}();
|
||||
|
||||
return Grid::get(opCtx)->catalogClient()->getCollectionAndChunks(
|
||||
opCtx, nss, ChunkVersion::IGNORED(), readConcern);
|
||||
}
|
||||
|
||||
void _persistMetadataLocally(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const std::pair<CollectionType, std::vector<ChunkType>>& collAndChunks) {
|
||||
auto newClient =
|
||||
opCtx->getServiceContext()->getService()->makeClient("ShardsvrFetchCollMetadata");
|
||||
AlternativeClientRegion acr(newClient);
|
||||
auto newOpCtx =
|
||||
CancelableOperationContext(cc().makeOperationContext(),
|
||||
opCtx->getCancellationToken(),
|
||||
Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
|
||||
auto newOpCtxPtr = newOpCtx.get();
|
||||
|
||||
DBDirectClient dbClient(newOpCtxPtr);
|
||||
|
||||
// Persist Collection Metadata
|
||||
write_ops::UpdateCommandRequest collUpdateReq(
|
||||
NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
{
|
||||
write_ops::UpdateOpEntry entry;
|
||||
const auto serializedNs =
|
||||
NamespaceStringUtil::serialize(nss, SerializationContext::stateDefault());
|
||||
entry.setQ(BSON(CollectionType::kNssFieldName << serializedNs));
|
||||
entry.setU(collAndChunks.first.toBSON());
|
||||
entry.setUpsert(true);
|
||||
entry.setMulti(false);
|
||||
collUpdateReq.setUpdates({std::move(entry)});
|
||||
}
|
||||
|
||||
collUpdateReq.setWriteConcern(defaultMajorityWriteConcern());
|
||||
write_ops::checkWriteErrors(dbClient.update(collUpdateReq));
|
||||
|
||||
// Persist Chunk Metadata
|
||||
const auto chunks = collAndChunks.second;
|
||||
if (chunks.empty()) {
|
||||
LOGV2_INFO(10303101, "No chunk metadata to persist", "ns"_attr = nss);
|
||||
return;
|
||||
}
|
||||
write_ops::UpdateCommandRequest chunkUpdateReq(
|
||||
NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
std::vector<write_ops::UpdateOpEntry> chunkUpdates;
|
||||
chunkUpdates.reserve(chunks.size());
|
||||
|
||||
for (const auto& chunk : chunks) {
|
||||
write_ops::UpdateOpEntry entry;
|
||||
entry.setQ(BSON(ChunkType::name() << chunk.getName()));
|
||||
entry.setU(chunk.toConfigBSON());
|
||||
entry.setUpsert(true);
|
||||
entry.setMulti(false);
|
||||
chunkUpdates.push_back(std::move(entry));
|
||||
}
|
||||
chunkUpdateReq.setUpdates(std::move(chunkUpdates));
|
||||
chunkUpdateReq.setWriteConcern(defaultMajorityWriteConcern());
|
||||
write_ops::checkWriteErrors(dbClient.update(chunkUpdateReq));
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ShardsvrFetchCollMetadataCommand).forShard();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user