SERVER-122486 SERVER-123719 Make createCollection commit authoritative (#49675)
GitOrigin-RevId: df3f3805a49d5ec44a0207d2f7ab44bc7f6f4bc9
This commit is contained in:
parent
c5b0a52ee7
commit
115d041ce5
@ -1852,6 +1852,7 @@ tasks:
|
||||
|
||||
- <<: *gen_task_template
|
||||
name: sharding_gen
|
||||
# TODO(SERVER-122088): Remove "requires_large_host_aubsan" tag.
|
||||
tags:
|
||||
[
|
||||
"assigned_to_jira_team_server_cluster_scalability",
|
||||
@ -1860,6 +1861,7 @@ tasks:
|
||||
"sharding",
|
||||
"common",
|
||||
"requires_replicated_fast_count_recovery",
|
||||
"requires_large_host_aubsan",
|
||||
]
|
||||
commands:
|
||||
- func: "generate resmoke tasks"
|
||||
|
||||
@ -805,6 +805,12 @@ const internalCommandsMap = {
|
||||
testname: "_shardsvrCommitDropCollectionMetadata",
|
||||
command: {_shardsvrCommitDropCollectionMetadata: "test.x", collectionUUID: UUID()},
|
||||
},
|
||||
_shardsvrCommitCreateCollectionMetadata: {
|
||||
testname: "_shardsvrCommitCreateCollectionMetadata",
|
||||
command: {
|
||||
_shardsvrCommitCreateCollectionMetadata: "test.x",
|
||||
},
|
||||
},
|
||||
_shardsvrSetAllowMigrations: {
|
||||
testname: "_shardsvrSetAllowMigrations",
|
||||
command: {_shardsvrSetAllowMigrations: "db.collection", allowMigrations: true},
|
||||
|
||||
@ -201,6 +201,7 @@ let viewsCommandTests = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollection: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: isAnInternalCommand},
|
||||
|
||||
@ -189,6 +189,7 @@ const wcCommandsTests = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: "internal command"},
|
||||
@ -3446,6 +3447,7 @@ const wcTimeseriesViewsCommandsTests = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: "internal command"},
|
||||
|
||||
@ -298,9 +298,10 @@ function verifyCommitOpEntriesOnShards(expectedOpEntryTemplates, shards, orderSt
|
||||
ns: {$in: namespaces},
|
||||
op: {$in: ["c", "n"]},
|
||||
// Discard entries related to
|
||||
// authoritative db metadata management.
|
||||
// authoritative metadata management.
|
||||
"o.dropDatabaseMetadata": {$exists: false},
|
||||
"o.createDatabaseMetadata": {$exists: false},
|
||||
"o.invalidateCollectionMetadata": {$exists: false},
|
||||
})
|
||||
.sort({ts: -1})
|
||||
.limit(expectedOpEntryTemplates.length)
|
||||
|
||||
@ -1,43 +0,0 @@
|
||||
// Tests that authoritative metadata recovery works as expected on a very basic test.
|
||||
//
|
||||
// TODO SERVER-122509: We can remove this once we've comprehensively made more DDLs authoritative.
|
||||
// This is just a sanity check for now.
|
||||
//
|
||||
// @tags: [
|
||||
// featureFlagShardAuthoritativeCollMetadata
|
||||
// ]
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const DOC = {x: 1};
|
||||
|
||||
let st = new ShardingTest({shards: 1});
|
||||
let s = st.s;
|
||||
let testDb = st.getDB("test");
|
||||
assert.commandWorked(s.adminCommand({enableSharding: "test", primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(s.adminCommand({shardCollection: "test.test", key: {_id: 1}}));
|
||||
testDb.test.insertOne(DOC);
|
||||
|
||||
// At this point the collection is potentially non-authoritative on the shard. Let's copy the contents
|
||||
// of the global catalog to the collection and force the CSR to become authoritative pending a recovery.
|
||||
const colls = st.config.collections.find({}).toArray();
|
||||
const chunks = st.config.chunks.find({}).toArray();
|
||||
|
||||
const shardPrimary = st.rs0.getPrimary();
|
||||
const shardConfigDb = shardPrimary.getDB("config");
|
||||
shardConfigDb["shard.catalog.collections"].insertMany(colls);
|
||||
shardConfigDb["shard.catalog.chunks"].insertMany(chunks);
|
||||
|
||||
// At this point the collection is not marked as authoritative, force a clear and set the internal flag
|
||||
// to authoritative in order to force authoritative recovery.
|
||||
assert.commandWorked(
|
||||
shardPrimary.adminCommand({
|
||||
_internalClearCollectionShardingMetadata: "test.test",
|
||||
isAuthoritative: true,
|
||||
}),
|
||||
);
|
||||
|
||||
const result = testDb.test.findOne({}, {_id: 0});
|
||||
assert.docEq(result, DOC);
|
||||
|
||||
st.stop();
|
||||
@ -152,6 +152,7 @@ const allCommands = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
|
||||
@ -139,6 +139,7 @@ const allCommands = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: isPrimaryOnly},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: isPrimaryOnly},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isPrimaryOnly},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isPrimaryOnly},
|
||||
_shardsvrSetAllowMigrations: {skip: isPrimaryOnly},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isPrimaryOnly},
|
||||
|
||||
@ -148,6 +148,7 @@ const allCommands = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
|
||||
@ -1113,6 +1113,7 @@ const allTestCases = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: "TODO"},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "TODO"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "TODO"},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: "TODO"},
|
||||
|
||||
@ -8,10 +8,18 @@
|
||||
* ]
|
||||
*/
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const st = new ShardingTest({shards: 1, config: 1});
|
||||
|
||||
// TODO (SERVER-124050): Investigate failure in this test when authoritative shards are enabled.
|
||||
if (FeatureFlagUtil.isPresentAndEnabled(st.s, "ShardAuthoritativeCollMetadata")) {
|
||||
jsTestLog("Skipping test because featureFlagShardAuthoritativeCollMetadata is enabled");
|
||||
st.stop();
|
||||
quit();
|
||||
}
|
||||
|
||||
const kDbName = "testDb";
|
||||
const kCollName = "testColl";
|
||||
const mongosTestColl = st.s.getCollection(kDbName + "." + kCollName);
|
||||
|
||||
@ -76,6 +76,7 @@ export const commandsAddedToMongodSinceLastLTS = [
|
||||
"_shardsvrTimeseriesUpgradeDowngradeCommit",
|
||||
"persistenceProviderProperties",
|
||||
"_shardsvrCommitRefineCollectionShardKey",
|
||||
"_shardsvrCommitCreateCollectionMetadata",
|
||||
"_internalClearCollectionShardingMetadata",
|
||||
"_shardsvrCommitDropCollectionMetadata",
|
||||
];
|
||||
|
||||
@ -192,6 +192,7 @@ let testCases = {
|
||||
_shardsvrRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitRefineCollectionShardKey: {skip: "internal command"},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: "internal command"},
|
||||
_shardsvrRenameCollection: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipant: {skip: "internal command"},
|
||||
_shardsvrRenameCollectionParticipantUnblock: {skip: "internal command"},
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
* Test basic retryable write without errors by checking that the resulting collection after the
|
||||
* retry is as expected and it does not create additional oplog entries.
|
||||
*/
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
@ -121,10 +122,23 @@ function handleSessionsCollection(mainConn, priConn, configConn) {
|
||||
createSessionsCollection(configConn);
|
||||
createdCollection = true;
|
||||
}
|
||||
// If we are in a config shard and we dropped and recreated the sessions collection then, as
|
||||
// part of the create coordinator, we ran a transaction which will mess with server status
|
||||
// counts.
|
||||
return TestData.configShard && mainConn != priConn && createdCollection;
|
||||
|
||||
let extraCollectionWrites = 0;
|
||||
if (createdCollection && mainConn != priConn) {
|
||||
// If we are in a config shard and we dropped and recreated the sessions collection then,
|
||||
// as part of the create coordinator, we ran a transaction which will mess with server
|
||||
// status counts.
|
||||
if (TestData.configShard) {
|
||||
extraCollectionWrites += 1;
|
||||
}
|
||||
// When shard authoritative collection metadata is enabled, the create coordinator commits
|
||||
// metadata to the shard catalog via a retryable write (_shardsvrCommitCreateCollection
|
||||
// Metadata), adding another write to config.transactions on the shard.
|
||||
if (FeatureFlagUtil.isPresentAndEnabled(priConn, "ShardAuthoritativeCollMetadata")) {
|
||||
extraCollectionWrites += 1;
|
||||
}
|
||||
}
|
||||
return extraCollectionWrites;
|
||||
}
|
||||
|
||||
function runTests(mainConn, priConn, configConn) {
|
||||
@ -154,7 +168,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
let testDBPri = priConn.getDB("test");
|
||||
assert.eq(2, testDBPri.user.find().itcount());
|
||||
|
||||
let createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
let extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
let retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
assert.eq(result.ok, retryResult.ok);
|
||||
@ -172,7 +186,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
2 /* newStatements */,
|
||||
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
|
||||
1 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -202,7 +216,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
|
||||
assert.eq(3, testDBPri.user.find().itcount());
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
assert.eq(result.ok, retryResult.ok);
|
||||
@ -228,7 +242,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
3 /* newStatements */,
|
||||
createdCollectionInTxn ? 4 : 3 /* newCollectionWrites */,
|
||||
3 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -261,7 +275,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
assert.eq(1, testDBPri.user.find({x: 1}).itcount());
|
||||
assert.eq(1, testDBPri.user.find({y: 1}).itcount());
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
assert.eq(result.ok, retryResult.ok);
|
||||
@ -281,7 +295,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
2 /* newStatements */,
|
||||
createdCollectionInTxn ? 3 : 2 /* newCollectionWrites */,
|
||||
2 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -305,7 +319,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
updateOplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
|
||||
assert.eq({_id: 60, x: 1}, testDBPri.user.findOne({_id: 60}));
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
|
||||
@ -322,7 +336,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
1 /* newStatements */,
|
||||
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
|
||||
1 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -345,7 +359,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
let oplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
|
||||
assert.eq({_id: 60, x: 2}, testDBPri.user.findOne({_id: 60}));
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
|
||||
@ -361,7 +375,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
1 /* newStatements */,
|
||||
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
|
||||
1 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -384,7 +398,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
oplogEntries = oplog.find({ns: "test.user", op: "u"}).itcount();
|
||||
assert.eq({_id: 60, x: 3}, testDBPri.user.findOne({_id: 60}));
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
|
||||
@ -400,7 +414,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
1 /* newStatements */,
|
||||
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
|
||||
1 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
@ -424,7 +438,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
oplogEntries = oplog.find({ns: "test.user", op: "d"}).itcount();
|
||||
let docCount = testDBPri.user.find().itcount();
|
||||
|
||||
createdCollectionInTxn = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
extraCollectionWrites = handleSessionsCollection(mainConn, priConn, configConn);
|
||||
|
||||
retryResult = assert.commandWorked(testDBMain.runCommand(cmd));
|
||||
|
||||
@ -440,7 +454,7 @@ function runTests(mainConn, priConn, configConn) {
|
||||
newStatus.transactions,
|
||||
1 /* newCommands */,
|
||||
1 /* newStatements */,
|
||||
createdCollectionInTxn ? 2 : 1 /* newCollectionWrites */,
|
||||
1 + extraCollectionWrites /* newCollectionWrites */,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -90,6 +90,7 @@
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_catalog_helper.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_uuid_mismatch.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/shard_filtering_metadata_refresh.h"
|
||||
#include "mongo/db/shard_role/transaction_resources.h"
|
||||
#include "mongo/db/sharding_environment/client/shard.h"
|
||||
@ -1137,12 +1138,23 @@ void exitCriticalSectionsOnCoordinator(OperationContext* opCtx,
|
||||
? originalNss.getTimeseriesViewNamespace()
|
||||
: originalNss;
|
||||
|
||||
const bool clearFilteringMetadata = !feature_flags::gShardAuthoritativeCollMetadata.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot());
|
||||
|
||||
std::unique_ptr<ShardingRecoveryService::BeforeReleasingCustomAction> actionPtr;
|
||||
if (clearFilteringMetadata) {
|
||||
actionPtr = std::make_unique<ShardingRecoveryService::FilteringMetadataClearer>();
|
||||
} else {
|
||||
actionPtr = std::make_unique<ShardingRecoveryService::NoCustomAction>();
|
||||
}
|
||||
|
||||
ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
|
||||
opCtx,
|
||||
mainNss.makeTimeseriesBucketsNamespace(),
|
||||
critSecReason,
|
||||
defaultMajorityWriteConcernDoNotUse(),
|
||||
ShardingRecoveryService::FilteringMetadataClearer(),
|
||||
*actionPtr,
|
||||
throwIfReasonDiffers);
|
||||
|
||||
ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
|
||||
@ -1150,7 +1162,7 @@ void exitCriticalSectionsOnCoordinator(OperationContext* opCtx,
|
||||
mainNss,
|
||||
critSecReason,
|
||||
defaultMajorityWriteConcernDoNotUse(),
|
||||
ShardingRecoveryService::FilteringMetadataClearer(),
|
||||
*actionPtr,
|
||||
throwIfReasonDiffers);
|
||||
}
|
||||
|
||||
@ -1484,9 +1496,22 @@ void CreateCollectionCoordinator::_exitCriticalSectionOnShards(
|
||||
unblockCRUDOperationsRequest.setBlockType(CriticalSectionBlockTypeEnum::kUnblock);
|
||||
unblockCRUDOperationsRequest.setReason(_critSecReason);
|
||||
unblockCRUDOperationsRequest.setThrowIfReasonDiffers(throwIfReasonDiffers);
|
||||
|
||||
// When shards are authoritative, there is no need to clear the filtering metadata upon
|
||||
// releasing the critical section; the commit phase is responsible for updating the shard
|
||||
// catalog (both durable and in-memory) with current information on both primary and secondary
|
||||
// nodes.
|
||||
bool isDDLAuthoritative = feature_flags::gShardAuthoritativeCollMetadata.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot());
|
||||
if (isDDLAuthoritative) {
|
||||
unblockCRUDOperationsRequest.setClearCollMetadata(false);
|
||||
}
|
||||
|
||||
generic_argument_util::setMajorityWriteConcern(unblockCRUDOperationsRequest);
|
||||
generic_argument_util::setOperationSessionInfo(unblockCRUDOperationsRequest,
|
||||
getNewSession(opCtx));
|
||||
|
||||
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrParticipantBlock>>(
|
||||
**executor, token, unblockCRUDOperationsRequest);
|
||||
sharding_ddl_util::sendAuthenticatedCommandToShards(opCtx, opts, shardIds);
|
||||
@ -1578,7 +1603,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
|
||||
.then(_buildPhaseHandler(
|
||||
Phase::kCommitOnShardingCatalog,
|
||||
[this, token, executor = executor, anchor = shared_from_this()](auto* opCtx) {
|
||||
_commitOnShardingCatalog(opCtx, executor, token);
|
||||
_commitOnGlobalCatalog(opCtx, executor, token);
|
||||
_commitOnShardCatalog(opCtx, executor, token);
|
||||
}))
|
||||
.then(_buildPhaseHandler(
|
||||
Phase::kSetPostCommitMetadata,
|
||||
@ -2213,7 +2239,7 @@ void CreateCollectionCoordinator::_notifyChangeStreamReadersOnPlacementChanged(
|
||||
}
|
||||
|
||||
|
||||
void CreateCollectionCoordinator::_commitOnShardingCatalog(
|
||||
void CreateCollectionCoordinator::_commitOnGlobalCatalog(
|
||||
OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
@ -2322,6 +2348,39 @@ void CreateCollectionCoordinator::_commitOnShardingCatalog(
|
||||
VectorClockMutable::get(opCtx)->waitForDurable().get(opCtx);
|
||||
}
|
||||
|
||||
void CreateCollectionCoordinator::_commitOnShardCatalog(
|
||||
OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
if (!feature_flags::gShardAuthoritativeCollMetadata.isEnabled(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// At this point, metadata is published in the global catalog. The shard catalog commit is
|
||||
// protected from concurrent migrations because the critical section is still held on all
|
||||
// involved shards.
|
||||
|
||||
std::set<ShardId> involvedShards;
|
||||
const auto& cm = uassertStatusOK(
|
||||
Grid::get(opCtx)->catalogCache()->getCollectionPlacementInfoWithRefresh(opCtx, nss()));
|
||||
cm.getAllShardIds(&involvedShards);
|
||||
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::commitCreateCollectionMetadataToShardCatalog(
|
||||
opCtx, nss(), {involvedShards.begin(), involvedShards.end()}, session, executor, token);
|
||||
|
||||
// The DB primary shard must always know that a collection is tracked, even when it does not
|
||||
// own any chunks. We persist a placeholder chunk locally so that (1) disk recovery can
|
||||
// distinguish a chunkless-tracked collection from an untracked one without special-case
|
||||
// logic, and (2) CheckMetadataConsistency can verify the DB primary always has an entry.
|
||||
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
|
||||
if (involvedShards.find(primaryShardId) == involvedShards.end()) {
|
||||
shard_catalog_commit::commitCreateCollectionChunklessLocally(opCtx, nss());
|
||||
}
|
||||
}
|
||||
|
||||
void CreateCollectionCoordinator::_setPostCommitMetadata(
|
||||
OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
|
||||
@ -199,9 +199,15 @@ private:
|
||||
const CancellationToken& token);
|
||||
|
||||
// Commits the create collection operation to the global catalog within a transaction.
|
||||
void _commitOnShardingCatalog(OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token);
|
||||
void _commitOnGlobalCatalog(OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token);
|
||||
|
||||
|
||||
// Commits the create collection operation to the shard catalog with a command.
|
||||
void _commitOnShardCatalog(OperationContext* opCtx,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token);
|
||||
|
||||
// Ensure that the change stream event gets emitted and install the new filtering metadata.
|
||||
void _setPostCommitMetadata(OperationContext* opCtx,
|
||||
|
||||
@ -1080,3 +1080,13 @@ commands:
|
||||
type: uuid
|
||||
description: "UUID of the collection being dropped"
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrCommitCreateCollectionMetadata:
|
||||
command_name: _shardsvrCommitCreateCollectionMetadata
|
||||
cpp_name: ShardsvrCommitCreateCollectionMetadata
|
||||
description: "Internal command to create collection metadata in the shard's catalog."
|
||||
strict: false
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
|
||||
@ -926,6 +926,26 @@ void commitDropCollectionMetadataToShardCatalog(
|
||||
sendAuthenticatedCommandToShards(opCtx, opts, shardIds);
|
||||
}
|
||||
|
||||
void commitCreateCollectionMetadataToShardCatalog(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const std::vector<ShardId>& shardIds,
|
||||
const OperationSessionInfo& osi,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
ShardsvrCommitCreateCollectionMetadata request(nss);
|
||||
request.setDbName(DatabaseName::kAdmin);
|
||||
|
||||
generic_argument_util::setMajorityWriteConcern(request);
|
||||
generic_argument_util::setOperationSessionInfo(request, osi);
|
||||
|
||||
auto opts =
|
||||
std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrCommitCreateCollectionMetadata>>(
|
||||
**executor, token, std::move(request));
|
||||
|
||||
sendAuthenticatedCommandToShards(opCtx, opts, shardIds);
|
||||
}
|
||||
|
||||
AuthoritativeMetadataAccessLevelEnum getGrantedAuthoritativeMetadataAccessLevel(
|
||||
const VersionContext& vCtx, const ServerGlobalParams::FCVSnapshot& snapshot) {
|
||||
const bool isAuthoritativeDDLEnabled =
|
||||
|
||||
@ -405,6 +405,18 @@ MONGO_MOD_PRIVATE void commitDropCollectionMetadataToShardCatalog(
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token);
|
||||
|
||||
/**
|
||||
* Commits a shardCollection operation to the shard catalog by sending the command
|
||||
* `_shardsvrCommitCreateCollectionMetadata` to all given shards.
|
||||
*/
|
||||
MONGO_MOD_PRIVATE void commitCreateCollectionMetadataToShardCatalog(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const std::vector<ShardId>& shardIds,
|
||||
const OperationSessionInfo& osi,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token);
|
||||
|
||||
/**
|
||||
* Based on the FCV, get the where the DDL needs to act accordingly to the database
|
||||
* authoritativeness.
|
||||
|
||||
@ -713,6 +713,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/shard_role/shard_catalog:internal_clear_collection_sharding_metadata_gen",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_cleanup_structured_encryption_data_command.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_clone_authoritative_metadata_command.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_commit_create_collection_metadata_command.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_commit_drop_collection_metadata_command.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_commit_refine_collection_shard_key_command.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shardsvr_compact_structured_encryption_data_command.cpp",
|
||||
|
||||
@ -312,5 +312,45 @@ void commitDropCollectionLocally(OperationContext* opCtx,
|
||||
clearShardCatalogCacheForDroppedCollection(opCtx, nss, uuid);
|
||||
}
|
||||
|
||||
void commitCreateCollectionLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto coll = fetchCollection(opCtx, nss);
|
||||
auto ownedChunks = fetchOwnedChunks(opCtx, nss, coll);
|
||||
|
||||
// Write to `config.shard.catalog.(collections|chunks)` to insert collection metadata.
|
||||
writeCollectionMetadataLocally(opCtx, nss, coll, ownedChunks);
|
||||
|
||||
// Write an oplog 'c' entry to invalidate collection metadata on secondaries.
|
||||
invalidateCollectionMetadataOnSecondaries(
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
|
||||
// Update this node CSR with collection metadata and chunks.
|
||||
updateShardCatalogCache(opCtx, nss, coll, ownedChunks);
|
||||
}
|
||||
|
||||
void commitCreateCollectionChunklessLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto coll = fetchCollection(opCtx, nss);
|
||||
|
||||
// This shard does not own any chunks, but we still need the CSS to know the collection is
|
||||
// tracked. Persist a single placeholder chunk so that disk recovery can distinguish a
|
||||
// chunkless-tracked collection from an untracked one without special-case logic.
|
||||
auto range = ChunkRange(coll.getKeyPattern().globalMin(), coll.getKeyPattern().globalMax());
|
||||
ChunkType placeholder(coll.getUuid(),
|
||||
std::move(range),
|
||||
ChunkVersion({coll.getEpoch(), coll.getTimestamp()}, {1, 0}),
|
||||
kChunklessPlaceholderShardId);
|
||||
placeholder.setName(OID::gen());
|
||||
std::vector<ChunkType> placeholderChunks{std::move(placeholder)};
|
||||
|
||||
// Write the collection document and the placeholder chunk to the shard catalog.
|
||||
writeCollectionMetadataLocally(opCtx, nss, coll, placeholderChunks);
|
||||
|
||||
// Write an oplog 'c' entry to invalidate collection metadata on secondaries.
|
||||
invalidateCollectionMetadataOnSecondaries(
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
|
||||
// Update this node CSR with chunkless tracked metadata.
|
||||
updateShardCatalogCache(opCtx, nss, coll, placeholderChunks);
|
||||
}
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -31,12 +31,19 @@
|
||||
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/sharding_environment/shard_id.h"
|
||||
#include "mongo/util/modules.h"
|
||||
|
||||
|
||||
MONGO_MOD_PARENT_PRIVATE;
|
||||
namespace mongo {
|
||||
namespace shard_catalog_commit {
|
||||
|
||||
/**
|
||||
* Shard id for the placeholder chunk when a collection is tracked on a shard but owns no real
|
||||
* chunks (see commitCreateCollectionChunklessLocally).
|
||||
*/
|
||||
inline const ShardId kChunklessPlaceholderShardId{"__chunkless_placeholder__"};
|
||||
|
||||
/**
|
||||
* Fetches the latest collection metadata and owned chunks from the global catalog, persists them
|
||||
* to the shard catalog (config.shard.catalog.collections and config.shard.catalog.chunks), removes
|
||||
@ -56,5 +63,23 @@ void commitDropCollectionLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& uuid);
|
||||
|
||||
/**
|
||||
* Fetches the collection metadata and owned chunks from the global catalog, persists them to the
|
||||
* shard catalog (config.shard.catalog.collections and config.shard.catalog.chunks), writes an
|
||||
* oplog entry to invalidate collection metadata on secondaries, and updates the in-memory
|
||||
* CollectionShardingRuntime (CSR) with the new routing information.
|
||||
*/
|
||||
void commitCreateCollectionLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
/**
|
||||
* Fetches the collection metadata from the global catalog (without chunks), persists only the
|
||||
* collection document to the shard catalog (config.shard.catalog.collections), writes an oplog
|
||||
* entry to invalidate collection metadata on secondaries, and updates the in-memory
|
||||
* CollectionShardingRuntime (CSR) with a chunkless tracked metadata. This is used for shards that
|
||||
* participate on a tracked collection but do not own any chunks (e.g., the DB primary shard after
|
||||
* create collection).
|
||||
*/
|
||||
void commitCreateCollectionChunklessLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -197,6 +197,79 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyIsIdempotent) {
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionPersistsCollectionAndChunks) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(3);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 3);
|
||||
|
||||
auto collDocs = findLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
ASSERT_EQ(collDocs.size(), 1u);
|
||||
ASSERT_EQ(UUID::fromCDR(collDocs[0].getField("uuid").uuid()), collType.getUuid());
|
||||
|
||||
auto chunkDocs = findLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
ASSERT_EQ(chunkDocs.size(), 3u);
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionUpdatesCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
auto metadata = scopedCsr->getCurrentMetadataIfKnown();
|
||||
ASSERT_TRUE(metadata);
|
||||
ASSERT_TRUE(metadata->isSharded());
|
||||
ASSERT_EQ(metadata->getChunkManager()->getUUID(), collType.getUuid());
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionIsIdempotent) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCreateCollectionLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionChunklessPersistsTokenToDisk) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, {});
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionChunklessLocally(operationContext(), kTestNss);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
|
||||
auto collDocs = findLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
ASSERT_EQ(collDocs.size(), 1u);
|
||||
ASSERT_EQ(UUID::fromCDR(collDocs[0].getField("uuid").uuid()), collType.getUuid());
|
||||
|
||||
// The placeholder chunk is persisted so the token survives restarts.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 1);
|
||||
auto chunkDocs = findLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
ASSERT_EQ(UUID::fromCDR(chunkDocs[0].getField(ChunkType::collectionUUID.name()).uuid()),
|
||||
collType.getUuid());
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, CreateCollectionChunklessUpdatesCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, {});
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionChunklessLocally(operationContext(), kTestNss);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
auto metadata = scopedCsr->getCurrentMetadataIfKnown();
|
||||
ASSERT_TRUE(metadata);
|
||||
ASSERT_TRUE(metadata->isSharded());
|
||||
ASSERT_FALSE(metadata->getShardPlacementVersion().isSet());
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyRemovesStaleChunks) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
|
||||
|
||||
@ -32,9 +32,11 @@
|
||||
#include "mongo/db/database_name.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/chunk_manager.h"
|
||||
#include "mongo/db/global_catalog/type_chunk.h"
|
||||
#include "mongo/db/global_catalog/type_collection.h"
|
||||
#include "mongo/db/repl/optime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/sharding_environment/shard_server_test_fixture.h"
|
||||
#include "mongo/db/sharding_environment/sharding_statistics.h"
|
||||
#include "mongo/db/topology/vector_clock/vector_clock.h"
|
||||
@ -907,5 +909,47 @@ TEST_F(AuthoritativeRefreshFixture, CriticalSectionExitedWithExternalMetadataSki
|
||||
ASSERT_EQ(stats.getIntField("versionResolvedBeforeRecovery"), 1);
|
||||
}
|
||||
|
||||
TEST_F(AuthoritativeRefreshFixture, TrackedCollectionWithNoChunksOnDiskRecoveredCorrectly) {
|
||||
RAIIServerParameterControllerForTest featureFlag("featureFlagShardAuthoritativeCollMetadata",
|
||||
true);
|
||||
auto* opCtx = operationContext();
|
||||
|
||||
const UUID uuid = UUID::gen();
|
||||
const OID epoch = OID::gen();
|
||||
const Timestamp timestamp(Date_t::now());
|
||||
CollectionType collType{kTestNss, epoch, timestamp, Date_t::now(), uuid, kShardKeyPattern};
|
||||
|
||||
auto keyPattern = KeyPattern(kShardKeyPattern);
|
||||
auto range = ChunkRange(keyPattern.globalMin(), keyPattern.globalMax());
|
||||
ChunkType placeholder(uuid,
|
||||
std::move(range),
|
||||
ChunkVersion({epoch, timestamp}, {1, 0}),
|
||||
shard_catalog_commit::kChunklessPlaceholderShardId);
|
||||
placeholder.setName(OID::gen());
|
||||
|
||||
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
{
|
||||
DBDirectClient client(opCtx);
|
||||
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
placeholder.toConfigBSON());
|
||||
}
|
||||
|
||||
{
|
||||
auto csr = CollectionShardingRuntime::acquireExclusive(opCtx, kTestNss);
|
||||
csr->clearFilteringMetadata_authoritative(opCtx);
|
||||
}
|
||||
|
||||
auto status = onShardVersionMismatch(opCtx, kTestNss, boost::none);
|
||||
ASSERT_OK(status);
|
||||
|
||||
auto csr = CollectionShardingRuntime::acquireShared(opCtx, kTestNss);
|
||||
auto metadataOpt = csr->getCurrentMetadataIfKnown();
|
||||
ASSERT_TRUE(metadataOpt.has_value());
|
||||
ASSERT_TRUE(metadataOpt->isSharded());
|
||||
ASSERT_FALSE(metadataOpt->getShardPlacementVersion().isSet());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
||||
@ -0,0 +1,136 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/cancelable_operation_context.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/transaction/transaction_participant.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class ShardsvrCommitCreateCollectionMetadataCommand final
|
||||
: public TypedCommand<ShardsvrCommitCreateCollectionMetadataCommand> {
|
||||
public:
|
||||
using Request = ShardsvrCommitCreateCollectionMetadata;
|
||||
|
||||
bool skipApiVersionCheck() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command to create collection metadata in the shard's catalog";
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return Command::AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool supportsRetryableWrite() const final {
|
||||
return true;
|
||||
}
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
|
||||
uassert(12248600,
|
||||
"expected to be called within a retryable write",
|
||||
TransactionParticipant::get(opCtx));
|
||||
|
||||
// Use an AlternativeClientRegion to perform the shard catalog writes outside the
|
||||
// retryable write session. The shard catalog commit contains its own idempotency
|
||||
// logic, and running inside the parent session would conflict with the dummy write.
|
||||
{
|
||||
auto newClient = getGlobalServiceContext()->getService()->makeClient(
|
||||
"ShardsvrCommitCreateCollectionMetadata");
|
||||
AlternativeClientRegion acr(newClient);
|
||||
auto newOpCtx = CancelableOperationContext(
|
||||
cc().makeOperationContext(),
|
||||
opCtx->getCancellationToken(),
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
shard_catalog_commit::commitCreateCollectionLocally(newOpCtx.get(), ns());
|
||||
}
|
||||
|
||||
LOGV2_INFO(
|
||||
12248601, "Committed collection metadata locally on shard", "ns"_attr = ns());
|
||||
|
||||
// Since no write happened on this txnNumber, we need to make a dummy write so that
|
||||
// secondaries can be aware of this txn.
|
||||
DBDirectClient client(opCtx);
|
||||
client.update(NamespaceString::kServerConfigurationNamespace,
|
||||
BSON("_id" << Request::kCommandName),
|
||||
BSON("$inc" << BSON("count" << 1)),
|
||||
true /* upsert */,
|
||||
false /* multi */);
|
||||
}
|
||||
|
||||
private:
|
||||
NamespaceString ns() const override {
|
||||
return request().getCommandParameter();
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(
|
||||
ResourcePattern::forClusterResource(request().getDbName().tenantId()),
|
||||
ActionType::internal));
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ShardsvrCommitCreateCollectionMetadataCommand).forShard();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
Loading…
Reference in New Issue
Block a user