SERVER-121209 setAllowChunkOperations (AKA setAllowMigrations V2) (#53927)
GitOrigin-RevId: c7f29974616fcf6dccc49e3f9fbfe0f5eaffa7fa
This commit is contained in:
parent
1da2e825aa
commit
876d27d03e
@ -257,6 +257,11 @@ const internalCommandsMap = {
|
||||
testname: "_configsvrRunRestore",
|
||||
command: {_configsvrRunRestore: 1},
|
||||
},
|
||||
_configsvrSetAllowChunkOperations: {
|
||||
testname: "_configsvrSetAllowChunkOperations",
|
||||
command: {_configsvrSetAllowChunkOperations: ns, allowChunkOperations: true},
|
||||
writeConcern: {w: "majority"},
|
||||
},
|
||||
_configsvrSetAllowMigrations: {
|
||||
testname: "_configsvrSetAllowMigrations",
|
||||
command: {
|
||||
@ -818,6 +823,10 @@ const internalCommandsMap = {
|
||||
primaryShardId: "",
|
||||
},
|
||||
},
|
||||
_shardsvrSetAllowChunkOperations: {
|
||||
testname: "_shardsvrSetAllowChunkOperations",
|
||||
command: {_shardsvrSetAllowChunkOperations: "db.collection", allowChunkOperations: true},
|
||||
},
|
||||
_shardsvrSetAllowMigrations: {
|
||||
testname: "_shardsvrSetAllowMigrations",
|
||||
command: {_shardsvrSetAllowMigrations: "db.collection", allowMigrations: true},
|
||||
|
||||
@ -120,6 +120,7 @@ let viewsCommandTests = {
|
||||
_configsvrResetPlacementHistory: {skip: isAnInternalCommand},
|
||||
_configsvrReshardCollection: {skip: isAnInternalCommand},
|
||||
_configsvrRunRestore: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_configsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_configsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
@ -216,6 +217,7 @@ let viewsCommandTests = {
|
||||
_shardsvrReshardRecipientInitialize: {skip: isAnInternalCommand},
|
||||
_shardsvrReshardRecipientClone: {skip: isAnInternalCommand},
|
||||
_shardsvrReshardRecipientCriticalSectionStarted: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrResolveView: {skip: isAnInternalCommand},
|
||||
_shardsvrRunSearchIndexCommand: {skip: isAnInternalCommand},
|
||||
|
||||
@ -115,6 +115,7 @@ const wcCommandsTests = {
|
||||
_configsvrResetPlacementHistory: {skip: "internal command"},
|
||||
_configsvrReshardCollection: {skip: "internal command"},
|
||||
_configsvrRunRestore: {skip: "internal command"},
|
||||
_configsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_configsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_configsvrSetClusterParameter: {skip: "internal command"},
|
||||
_configsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
@ -206,6 +207,7 @@ const wcCommandsTests = {
|
||||
_shardsvrReshardRecipientCriticalSectionStarted: {skip: "internal command"},
|
||||
_shardsvrResolveView: {skip: "internal command"},
|
||||
_shardsvrRunSearchIndexCommand: {skip: "internal command"},
|
||||
_shardsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
@ -3339,6 +3341,7 @@ const wcTimeseriesCommandsTests = {
|
||||
_configsvrResetPlacementHistory: {skip: "internal command"},
|
||||
_configsvrReshardCollection: {skip: "internal command"},
|
||||
_configsvrRunRestore: {skip: "internal command"},
|
||||
_configsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_configsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_configsvrSetClusterParameter: {skip: "internal command"},
|
||||
_configsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
@ -3432,6 +3435,7 @@ const wcTimeseriesCommandsTests = {
|
||||
_shardsvrReshardRecipientCriticalSectionStarted: {skip: "internal command"},
|
||||
_shardsvrResolveView: {skip: "internal command"},
|
||||
_shardsvrRunSearchIndexCommand: {skip: "internal command"},
|
||||
_shardsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
|
||||
@ -66,6 +66,7 @@ const allCommands = {
|
||||
_configsvrResetPlacementHistory: {skip: isAnInternalCommand},
|
||||
_configsvrReshardCollection: {skip: isAnInternalCommand},
|
||||
_configsvrRunRestore: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_configsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_configsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
@ -155,6 +156,7 @@ const allCommands = {
|
||||
_shardsvrCommitCollModCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
|
||||
@ -57,6 +57,7 @@ const allCommands = {
|
||||
_configsvrResetPlacementHistory: {skip: isPrimaryOnly},
|
||||
_configsvrReshardCollection: {skip: isPrimaryOnly},
|
||||
_configsvrRunRestore: {skip: isPrimaryOnly},
|
||||
_configsvrSetAllowChunkOperations: {skip: isPrimaryOnly},
|
||||
_configsvrSetAllowMigrations: {skip: isPrimaryOnly},
|
||||
_configsvrSetClusterParameter: {skip: isPrimaryOnly},
|
||||
_configsvrSetUserWriteBlockMode: {skip: isPrimaryOnly},
|
||||
@ -141,6 +142,7 @@ const allCommands = {
|
||||
_shardsvrCommitCollModCollectionMetadata: {skip: isPrimaryOnly},
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isPrimaryOnly},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isPrimaryOnly},
|
||||
_shardsvrSetAllowChunkOperations: {skip: isPrimaryOnly},
|
||||
_shardsvrSetAllowMigrations: {skip: isPrimaryOnly},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isPrimaryOnly},
|
||||
|
||||
@ -61,6 +61,7 @@ const allCommands = {
|
||||
_configsvrResetPlacementHistory: {skip: isAnInternalCommand},
|
||||
_configsvrReshardCollection: {skip: isAnInternalCommand},
|
||||
_configsvrRunRestore: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_configsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_configsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_configsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
@ -150,6 +151,7 @@ const allCommands = {
|
||||
_shardsvrCommitDropCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitCreateCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrCommitRenameCollectionMetadata: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowChunkOperations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetAllowMigrations: {skip: isAnInternalCommand},
|
||||
_shardsvrSetClusterParameter: {skip: isAnInternalCommand},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: isAnInternalCommand},
|
||||
|
||||
@ -1040,6 +1040,7 @@ const allTestCases = {
|
||||
_configsvrResetPlacementHistory: {skip: "TODO"},
|
||||
_configsvrReshardCollection: {skip: "TODO"},
|
||||
_configsvrRunRestore: {skip: "TODO"},
|
||||
_configsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_configsvrSetAllowMigrations: {skip: "TODO"},
|
||||
_configsvrSetClusterParameter: {skip: "TODO"},
|
||||
_configsvrSetUserWriteBlockMode: {skip: "TODO"},
|
||||
@ -1232,6 +1233,7 @@ const allTestCases = {
|
||||
],
|
||||
},
|
||||
_shardsvrRunSearchIndexCommand: {skip: "TODO"},
|
||||
_shardsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_shardsvrSetAllowMigrations: {skip: "TODO"},
|
||||
_shardsvrSetClusterParameter: {skip: "TODO"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "TODO"},
|
||||
|
||||
@ -87,4 +87,6 @@ export const commandsAddedToMongodSinceLastLTS = [
|
||||
"_shardsvrCommitDropCollectionMetadata",
|
||||
"_shardsvrCommitRenameCollectionMetadata",
|
||||
"_shardsvrSplitChunk",
|
||||
"_shardsvrSetAllowChunkOperations",
|
||||
"_configsvrSetAllowChunkOperations",
|
||||
];
|
||||
|
||||
@ -11,6 +11,11 @@ import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
||||
import {ShardVersioningUtil} from "jstests/sharding/libs/shard_versioning_util.js";
|
||||
|
||||
// _shardsvrBeginMigrationBlockingOperation and _shardsvrEndMigrationBlockingOperation internally
|
||||
// send commands with an OSI, which conflicts with the implicit session created by the shell
|
||||
// (tassert 10090100)
|
||||
TestData.disableImplicitSessions = true;
|
||||
|
||||
const st = new ShardingTest({shards: {rs0: {nodes: 3}}});
|
||||
const replicaSet = new ReplSetTest({nodes: 1});
|
||||
replicaSet.startSet();
|
||||
|
||||
@ -115,6 +115,7 @@ let testCases = {
|
||||
_configsvrResetPlacementHistory: {skip: "internal command"},
|
||||
_configsvrReshardCollection: {skip: "internal command"},
|
||||
_configsvrRunRestore: {skip: "internal command"},
|
||||
_configsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_configsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_configsvrSetClusterParameter: {skip: "internal command"},
|
||||
_configsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
@ -208,6 +209,7 @@ let testCases = {
|
||||
_shardsvrReshardRecipientCriticalSectionStarted: {skip: "internal command"},
|
||||
_shardsvrResolveView: {skip: "internal command"},
|
||||
_shardsvrRunSearchIndexCommand: {skip: "internal command"},
|
||||
_shardsvrSetAllowChunkOperations: {skip: "internal command"},
|
||||
_shardsvrSetAllowMigrations: {skip: "internal command"},
|
||||
_shardsvrSetClusterParameter: {skip: "internal command"},
|
||||
_shardsvrSetUserWriteBlockMode: {skip: "internal command"},
|
||||
|
||||
@ -89,11 +89,12 @@ let awaitShellToRefineCollectionShardKey = startParallelShell(() => {
|
||||
hangBeforeCommitFailPoint.wait();
|
||||
|
||||
// Verify that 'config.collections' has not been updated since we haven't committed the transaction,
|
||||
// except for the 'allowMigrations' property which is updated by the
|
||||
// except for the 'allowMigrations' and 'allowChunkOperations' property which are updated by the
|
||||
// RefineCollectionShardKeyCoordinator before the commit phase.
|
||||
let newCollArr = mongos.getCollection(kConfigCollections).find({_id: kNsName}).toArray();
|
||||
newCollArr.forEach((element) => {
|
||||
delete element["allowMigrations"];
|
||||
delete element["allowChunkOperations"];
|
||||
});
|
||||
assert.sameMembers(oldCollArr, newCollArr);
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@
|
||||
//
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
|
||||
const st = new ShardingTest({shards: 1});
|
||||
const mongos = st.s0;
|
||||
@ -13,6 +14,8 @@ const kCollName = "foo";
|
||||
const kNsName = kDbName + "." + kCollName;
|
||||
const kCachedCollectionsNs = "config.cache.collections";
|
||||
const kCacheChunksNs = "config.cache.chunks." + kNsName;
|
||||
const kAuthoritativeCollectionsNs = "config.shard.catalog.collections";
|
||||
const kAuthoritativeChunksNs = "config.shard.catalog.chunks";
|
||||
const oldKeyDoc = {
|
||||
a: 1,
|
||||
b: 1,
|
||||
@ -24,6 +27,8 @@ const newKeyDoc = {
|
||||
d: 1,
|
||||
};
|
||||
|
||||
const isAuthoritativeMetadata = FeatureFlagUtil.isEnabled(st.s.getDB("admin"), "featureFlagAuthoritativeShardsDDL");
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
|
||||
assert.commandWorked(mongos.adminCommand({shardCollection: kNsName, key: oldKeyDoc}));
|
||||
assert.commandWorked(mongos.getCollection(kNsName).createIndex(newKeyDoc));
|
||||
@ -38,6 +43,9 @@ assert.commandWorked(mongos.adminCommand({split: kNsName, middle: {a: 5, b: 5}})
|
||||
|
||||
// Flush the routing table cache and verify that 'config.cache.chunks.db.foo' is as expected
|
||||
// before refineCollectionShardKey.
|
||||
// TODO (SERVER-125785): with isAuthoritativeMetadata = true and authoritative split, this
|
||||
// _flushRoutingTableCacheUpdates can be removed and check
|
||||
// kAuthoritativeCollectionsNs/kCacheChunksNs instead of kCachedCollectionsNs/kCacheChunksNs.
|
||||
assert.commandWorked(shard.adminCommand({_flushRoutingTableCacheUpdates: kNsName}));
|
||||
|
||||
let collEntry = shard.getCollection(kCachedCollectionsNs).findOne({_id: kNsName});
|
||||
@ -55,18 +63,28 @@ assert.commandWorked(mongos.adminCommand({refineCollectionShardKey: kNsName, key
|
||||
// refineCollectionShardKey doesn't block for each shard to refresh, so wait until the cached
|
||||
// information is fully up to date.
|
||||
assert.soon(() => {
|
||||
let newCollEntry = shard.getCollection(kCachedCollectionsNs).findOne({_id: kNsName});
|
||||
const kCollectionsNs = isAuthoritativeMetadata ? kAuthoritativeCollectionsNs : kCachedCollectionsNs;
|
||||
let newCollEntry = shard.getCollection(kCollectionsNs).findOne({_id: kNsName});
|
||||
return newCollEntry.epoch != collEntry.epoch && !newCollEntry.refreshing;
|
||||
});
|
||||
|
||||
// Verify that 'config.cache.chunks.db.foo' is as expected after refineCollectionShardKey.
|
||||
chunkArr = shard.getCollection(kCacheChunksNs).find({}).sort({min: 1}).toArray();
|
||||
// Verify that the chunks collection is as expected after refineCollectionShardKey.
|
||||
let minFieldName;
|
||||
if (isAuthoritativeMetadata) {
|
||||
const uuid = shard.getCollection(kAuthoritativeCollectionsNs).findOne({_id: kNsName}).uuid;
|
||||
chunkArr = shard.getCollection(kAuthoritativeChunksNs).find({uuid: uuid}).sort({min: 1}).toArray();
|
||||
minFieldName = "min";
|
||||
} else {
|
||||
chunkArr = shard.getCollection(kCacheChunksNs).find({}).sort({min: 1}).toArray();
|
||||
minFieldName = "_id";
|
||||
}
|
||||
|
||||
assert.eq(3, chunkArr.length);
|
||||
assert.eq({a: MinKey, b: MinKey, c: MinKey, d: MinKey}, chunkArr[0]._id);
|
||||
assert.eq({a: MinKey, b: MinKey, c: MinKey, d: MinKey}, chunkArr[0][minFieldName]);
|
||||
assert.eq({a: 0, b: 0, c: MinKey, d: MinKey}, chunkArr[0].max);
|
||||
assert.eq({a: 0, b: 0, c: MinKey, d: MinKey}, chunkArr[1]._id);
|
||||
assert.eq({a: 0, b: 0, c: MinKey, d: MinKey}, chunkArr[1][minFieldName]);
|
||||
assert.eq({a: 5, b: 5, c: MinKey, d: MinKey}, chunkArr[1].max);
|
||||
assert.eq({a: 5, b: 5, c: MinKey, d: MinKey}, chunkArr[2]._id);
|
||||
assert.eq({a: 5, b: 5, c: MinKey, d: MinKey}, chunkArr[2][minFieldName]);
|
||||
assert.eq({a: MaxKey, b: MaxKey, c: MaxKey, d: MaxKey}, chunkArr[2].max);
|
||||
|
||||
st.stop();
|
||||
|
||||
@ -377,7 +377,11 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
|
||||
sharding_ddl_util::getCollectionUUID(opCtx, _collInfo->nsForTargeting);
|
||||
_doc.setCollUUID(collUUID);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx, _collInfo->nsForTargeting, collUUID, getNewSession(opCtx));
|
||||
opCtx,
|
||||
_collInfo->nsForTargeting,
|
||||
collUUID,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
}
|
||||
})();
|
||||
})
|
||||
@ -449,8 +453,9 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
|
||||
if (_collInfo->isTracked) {
|
||||
try {
|
||||
if (!_firstExecution && _collInfo->isSharded) {
|
||||
bool allowMigrations = sharding_ddl_util::checkAllowMigrations(
|
||||
opCtx, _collInfo->nsForTargeting);
|
||||
bool allowMigrations =
|
||||
sharding_ddl_util::checkAllowMigrationsOnConfigServer(
|
||||
opCtx, _collInfo->nsForTargeting);
|
||||
if (_result.is_initialized() && allowMigrations) {
|
||||
// The command finished and we have the response. Return it.
|
||||
return;
|
||||
@ -542,12 +547,20 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
|
||||
|
||||
const auto collUUID = _doc.getCollUUID();
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx, _collInfo->nsForTargeting, collUUID, getNewSession(opCtx));
|
||||
opCtx,
|
||||
_collInfo->nsForTargeting,
|
||||
collUUID,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
} catch (DBException& ex) {
|
||||
if (!_isRetriableErrorForDDLCoordinator(ex.toStatus())) {
|
||||
const auto collUUID = _doc.getCollUUID();
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx, _collInfo->nsForTargeting, collUUID, getNewSession(opCtx));
|
||||
opCtx,
|
||||
_collInfo->nsForTargeting,
|
||||
collUUID,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
@ -0,0 +1,173 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/db/auth/action_type.h"
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/auth/resource_pattern.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_catalog_manager.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/topology/cluster_role.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include <boost/move/utility_core.hpp>
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class ConfigsvrSetAllowChunkOperationsCommand final
|
||||
: public TypedCommand<ConfigsvrSetAllowChunkOperationsCommand> {
|
||||
public:
|
||||
using Request = ConfigsvrSetAllowChunkOperations;
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
const NamespaceString& nss = ns();
|
||||
|
||||
uassert(ErrorCodes::IllegalOperation,
|
||||
"_configsvrSetAllowChunkOperations can only be run on config servers",
|
||||
serverGlobalParams.clusterRole.has(ClusterRole::ConfigServer));
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
|
||||
uassert(12120911,
|
||||
"_configsvrSetAllowChunkOperations should only run with AuthoritativeShardsDDL "
|
||||
"enabled",
|
||||
sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) !=
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
|
||||
{
|
||||
// Use ACR to have a thread holding the session while we do the metadata updates so
|
||||
// we can serialize concurrent requests to setAllowChunkOperations (i.e. a stepdown
|
||||
// happens and the new primary sends a setAllowChunkOperations with the same
|
||||
// sessionId). We could think about weakening the serialization guarantee in the
|
||||
// future because the replay protection comes from the oplog write with a specific
|
||||
// txnNumber. Using ACR also prevents having deadlocks with the shutdown thread
|
||||
// because the cancellation of the new operation context is linked to the parent
|
||||
// one.
|
||||
auto newClient =
|
||||
opCtx->getServiceContext()->getService()->makeClient("SetAllowChunkOperations");
|
||||
AlternativeClientRegion acr(newClient);
|
||||
auto executor =
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
|
||||
auto newOpCtxPtr = CancelableOperationContext(
|
||||
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
|
||||
|
||||
AuthorizationSession::get(newOpCtxPtr.get()->getClient())
|
||||
->grantInternalAuthorization();
|
||||
newOpCtxPtr->setWriteConcern(opCtx->getWriteConcern());
|
||||
|
||||
// Set the operation context read concern level to local for reads into the config
|
||||
// database.
|
||||
repl::ReadConcernArgs::get(newOpCtxPtr.get()) =
|
||||
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
|
||||
|
||||
const auto allowChunkOperations = request().getAllowChunkOperations();
|
||||
const auto& collectionUUID = request().getCollectionUUID();
|
||||
|
||||
ShardingCatalogManager::get(newOpCtxPtr.get())
|
||||
->setAllowChunkOperations(
|
||||
newOpCtxPtr.get(), nss, collectionUUID, allowChunkOperations);
|
||||
}
|
||||
|
||||
// Since we no write happened on this txnNumber, we need to make a dummy write to
|
||||
// protect against older requests with old txnNumbers.
|
||||
DBDirectClient client(opCtx);
|
||||
client.update(NamespaceString::kServerConfigurationNamespace,
|
||||
BSON("_id" << "setAllowChunkOperationsStats"),
|
||||
BSON("$inc" << BSON("count" << 1)),
|
||||
true /* upsert */,
|
||||
false /* multi */);
|
||||
}
|
||||
|
||||
private:
|
||||
NamespaceString ns() const override {
|
||||
return request().getCommandParameter();
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(
|
||||
ResourcePattern::forClusterResource(request().getDbName().tenantId()),
|
||||
ActionType::internal));
|
||||
}
|
||||
};
|
||||
|
||||
bool skipApiVersionCheck() const override {
|
||||
// Internal command (server to server).
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command, which is exported by the sharding config server. Do not call "
|
||||
"directly. Sets the allowChunkOperations flag on the specified collection.";
|
||||
}
|
||||
|
||||
bool adminOnly() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool supportsRetryableWrite() const final {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ConfigsvrSetAllowChunkOperationsCommand).forShard();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -273,8 +273,12 @@ void DropCollectionCoordinator::_freezeMigrations(
|
||||
|
||||
if (_doc.getCollInfo()) {
|
||||
const auto collUUID = _doc.getCollInfo()->getUuid();
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss(), collUUID, session);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
collUUID,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -499,7 +499,11 @@ ExecutorFuture<void> DropDatabaseCoordinator::_runImpl(
|
||||
// before persisting its identity on the recovery doc (This condition won't be
|
||||
// reinforced in case of a stepdown)
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx, nss, coll.getUuid(), getNewSession(opCtx));
|
||||
opCtx,
|
||||
nss,
|
||||
coll.getUuid(),
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
|
||||
auto newStateDoc = _doc;
|
||||
newStateDoc.setCollInfo(coll);
|
||||
|
||||
@ -139,8 +139,13 @@ ExecutorFuture<void> DropIndexesCoordinator::_cleanupOnAbort(
|
||||
auto* opCtx = opCtxHolder.get();
|
||||
|
||||
// Ensure migrations are resumed before terminating the coordinator.
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss(), boost::none /* uuid */, session);
|
||||
// TODO (SERVER-127439): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
boost::none /* uuid */,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
|
||||
return Status::OK();
|
||||
});
|
||||
@ -153,8 +158,13 @@ ExecutorFuture<void> mongo::DropIndexesCoordinator::_runImpl(
|
||||
.then(_buildPhaseHandler(
|
||||
Phase::kFreezeMigrations,
|
||||
[this, anchor = shared_from_this(), executor, token](OperationContext* opCtx) {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss(), boost::none, session);
|
||||
// TODO (SERVER-127439): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}))
|
||||
.then(_buildPhaseHandler(
|
||||
Phase::kDropIndexes,
|
||||
@ -165,7 +175,13 @@ ExecutorFuture<void> mongo::DropIndexesCoordinator::_runImpl(
|
||||
Phase::kResumeMigrations,
|
||||
[this, anchor = shared_from_this(), executor, token](OperationContext* opCtx) {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss(), boost::none, session);
|
||||
// TODO (SERVER-127439): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}))
|
||||
.onError([this, anchor = shared_from_this()](const Status& status) {
|
||||
if (_doc.getPhase() < Phase::kFreezeMigrations) {
|
||||
|
||||
@ -138,19 +138,33 @@ void MigrationBlockingOperationCoordinator::_throwIfCleaningUp(WithLock) {
|
||||
void MigrationBlockingOperationCoordinator::_recoverIfNecessary(WithLock lk,
|
||||
OperationContext* opCtx,
|
||||
bool isBeginOperation) {
|
||||
if (!_needsRecovery || !_getExternalState()->checkAllowMigrations(opCtx, nss())) {
|
||||
_needsRecovery = false;
|
||||
if (!_needsRecovery) {
|
||||
return;
|
||||
}
|
||||
|
||||
const bool allowMigrations =
|
||||
_getExternalState()->checkAllowMigrationsOnConfigServer(opCtx, nss());
|
||||
|
||||
tassert(10644530,
|
||||
"If there is a state document on disk and migrations are not "
|
||||
"blocked, then there must be only one operation.",
|
||||
_operations.size() == 1);
|
||||
_operations.size() == 1 || !allowMigrations);
|
||||
|
||||
if (isBeginOperation) {
|
||||
// If checkAllowMigrationsOnConfigServer returned false, that doesn't guarantee that migrations
|
||||
// are consistently disabled across all shards, nor that migrations have been drained, so call
|
||||
// again allowMigrations(false).
|
||||
// If checkAllowMigrationsOnConfigServer returned true, it doesn't guarantee that migrations are
|
||||
// enabled consistently either, but the coordinator will always call allowMigrations(true)
|
||||
// before finishing, so it's ok to continue.
|
||||
if (isBeginOperation || !allowMigrations) {
|
||||
try {
|
||||
_getExternalState()->allowMigrations(opCtx, nss(), false);
|
||||
// TODO (SERVER-127440): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
_getExternalState()->allowMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
false,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
_needsRecovery = false;
|
||||
return;
|
||||
} catch (const DBException& e) {
|
||||
@ -204,7 +218,13 @@ void MigrationBlockingOperationCoordinator::beginOperation(OperationContext* opC
|
||||
ScopeGuard removeStateDocumentGuard(
|
||||
[&] { ensureFulfilledPromise(lock, _beginCleanupPromise); });
|
||||
hangBeforeBlockingMigrations.pauseWhileSet();
|
||||
_getExternalState()->allowMigrations(opCtx, nss(), false);
|
||||
// TODO (SERVER-127440): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
_getExternalState()->allowMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
false,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
removeStateDocumentGuard.dismiss();
|
||||
}
|
||||
|
||||
@ -239,7 +259,13 @@ void MigrationBlockingOperationCoordinator::endOperation(OperationContext* opCtx
|
||||
|
||||
if (_operations.empty()) {
|
||||
hangBeforeAllowingMigrations.pauseWhileSet();
|
||||
_getExternalState()->allowMigrations(opCtx, nss(), true);
|
||||
// TODO (SERVER-127440): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
_getExternalState()->allowMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
true,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
|
||||
hangBeforeFulfillingPromise.pauseWhileSet();
|
||||
ensureFulfilledPromise(lock, _beginCleanupPromise);
|
||||
|
||||
@ -236,11 +236,12 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
|
||||
|
||||
// Stop migrations during most of the execution of the coordinator to guarantee a
|
||||
// stable placement.
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx, nss(), _request.getCollectionUUID(), session);
|
||||
}
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
_request.getCollectionUUID(),
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
|
||||
const auto& ns = nss();
|
||||
auto opts = [&] {
|
||||
@ -371,11 +372,12 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
|
||||
[this, token, anchor = shared_from_this(), executor](auto* opCtx) {
|
||||
notifyChangeStreamsOnRefineCollectionShardKeyComplete(
|
||||
opCtx, nss(), _doc.getNewShardKey(), _doc.getOldKey().get(), *_doc.getUuid());
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss(), boost::none, session);
|
||||
}
|
||||
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
logRefineCollectionShardKey(opCtx, nss(), "end", BSONObj());
|
||||
}))
|
||||
.then([this, anchor = shared_from_this(), executor] {
|
||||
@ -437,8 +439,12 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_cleanupOnAbort(
|
||||
}
|
||||
|
||||
if (_doc.getPhase() >= Phase::kRemoteIndexValidation) {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss(), boost::none, session);
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
nss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
_doc.getAuthoritativeMetadataAccessLevel());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -779,17 +779,26 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
|
||||
|
||||
// Block migrations on involved collections.
|
||||
try {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx, fromNss, _doc.getSourceUUID(), session);
|
||||
// TODO (SERVER-127441): take AuthoritativeMetadataAccessLevelEnum from _doc
|
||||
opCtx,
|
||||
fromNss,
|
||||
_doc.getSourceUUID(),
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
// stopMigrations is allowed to fail when the source collection is not tracked
|
||||
// by the sharding catalog.
|
||||
}
|
||||
|
||||
try {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(opCtx, toNss, _doc.getTargetUUID(), session);
|
||||
// TODO (SERVER-127441): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
toNss,
|
||||
_doc.getTargetUUID(),
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
|
||||
// stopMigrations is allowed to fail when the target collection doesn't exist or
|
||||
// is not tracked by the sharding catalog.
|
||||
|
||||
@ -527,6 +527,19 @@ structs:
|
||||
optional: true
|
||||
description: "The expected collection timestamp."
|
||||
|
||||
SetAllowChunkOperationsFields:
|
||||
description: "Fields of _shardsvrSetAllowChunkOperations and _configsvrsvrSetAllowChunkOperations."
|
||||
strict: false
|
||||
fields:
|
||||
allowChunkOperations:
|
||||
type: bool
|
||||
description: "If false, chunk operations (including migrations) should be stopped and disabled."
|
||||
optional: false
|
||||
collectionUUID:
|
||||
type: uuid
|
||||
description: "The uuid of the collection."
|
||||
optional: true
|
||||
|
||||
commands:
|
||||
_shardsvrCreateCollection:
|
||||
command_name: _shardsvrCreateCollection
|
||||
@ -1190,3 +1203,27 @@ commands:
|
||||
description: Primary shard identifier, used to determine whether the shard needs to durably persist collection metadata or not.
|
||||
optional: false
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrSetAllowChunkOperations:
|
||||
command_name: _shardsvrSetAllowChunkOperations
|
||||
cpp_name: ShardsvrSetAllowChunkOperations
|
||||
description: "Internal setAllowChunkOperations command for a shard."
|
||||
strict: false
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
chained_structs:
|
||||
SetAllowChunkOperationsFields: SetAllowChunkOperationsFields
|
||||
|
||||
_configsvrSetAllowChunkOperations:
|
||||
command_name: _configsvrSetAllowChunkOperations
|
||||
cpp_name: ConfigsvrSetAllowChunkOperations
|
||||
description: "Internal setAllowChunkOperations command for the config server."
|
||||
strict: false
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
reply_type: OkReply
|
||||
chained_structs:
|
||||
SetAllowChunkOperationsFields: SetAllowChunkOperationsFields
|
||||
|
||||
@ -412,12 +412,21 @@ public:
|
||||
/**
|
||||
* In a transaction, sets the 'allowMigrations' to the requested state and bumps the collection
|
||||
* version.
|
||||
* TODO (SERVER-89169): Remove this function.
|
||||
*/
|
||||
void setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& collectionUUID,
|
||||
bool allowMigrations);
|
||||
|
||||
/**
|
||||
* Sets the 'allowChunkOperations' field to the requested state.
|
||||
*/
|
||||
void setAllowChunkOperations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& collectionUUID,
|
||||
bool allowChunkOperations);
|
||||
|
||||
/**
|
||||
* Bump the minor version of the newest chunk on each shard
|
||||
*/
|
||||
|
||||
@ -2366,6 +2366,56 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(
|
||||
// will own chunks for this collection.
|
||||
}
|
||||
|
||||
void ShardingCatalogManager::setAllowChunkOperations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& collectionUUID,
|
||||
bool allowChunkOperations) {
|
||||
// Mark opCtx as interruptible to ensure that all reads and writes to the metadata
|
||||
// collections under the exclusive _kChunkOpLock happen on the same term.
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
// Take _kChunkOpLock in exclusive mode to prevent concurrent chunk operations.
|
||||
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
|
||||
|
||||
const auto cm = uassertStatusOK(
|
||||
RoutingInformationCache::get(opCtx)->getCollectionPlacementInfoWithRefresh(opCtx, nss));
|
||||
uassert(ErrorCodes::NamespaceNotSharded,
|
||||
str::stream() << "Collection '" << nss.toStringForErrorMsg() << "' is not sharded",
|
||||
cm.isSharded());
|
||||
|
||||
uassert(ErrorCodes::InvalidUUID,
|
||||
str::stream() << "Collection uuid " << collectionUUID
|
||||
<< " in the request does not match the current uuid " << cm.getUUID()
|
||||
<< " for ns " << nss.toStringForErrorMsg(),
|
||||
!collectionUUID || collectionUUID == cm.getUUID());
|
||||
|
||||
write_ops::UpdateCommandRequest updateCollOp(NamespaceString::kConfigsvrCollectionsNamespace);
|
||||
updateCollOp.setUpdates([&] {
|
||||
write_ops::UpdateOpEntry entry;
|
||||
const auto update = allowChunkOperations
|
||||
? BSON("$unset" << BSON(CollectionType::kAllowChunkOperationsFieldName << ""))
|
||||
: BSON("$set" << BSON(CollectionType::kAllowChunkOperationsFieldName << false));
|
||||
|
||||
BSONObj query = BSON(CollectionType::kNssFieldName << NamespaceStringUtil::serialize(
|
||||
nss, SerializationContext::stateDefault()));
|
||||
if (collectionUUID) {
|
||||
query = query.addFields(BSON(CollectionType::kUuidFieldName << *collectionUUID));
|
||||
}
|
||||
entry.setQ(query);
|
||||
entry.setU(update);
|
||||
entry.setMulti(false);
|
||||
return std::vector<write_ops::UpdateOpEntry>{entry};
|
||||
}());
|
||||
|
||||
DBDirectClient client(opCtx);
|
||||
const auto result = client.update(updateCollOp);
|
||||
write_ops::checkWriteErrors(result);
|
||||
|
||||
uassert(ErrorCodes::ConflictingOperationInProgress,
|
||||
str::stream() << "Expected to match one doc but matched " << result.getN(),
|
||||
result.getN() == 1);
|
||||
}
|
||||
|
||||
void ShardingCatalogManager::bumpCollectionMinorVersionInTxn(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
TxnNumber txnNumber) const {
|
||||
|
||||
@ -66,19 +66,22 @@ bool ShardingCoordinatorExternalStateImpl::isTrackedTimeseries(
|
||||
}
|
||||
}
|
||||
|
||||
void ShardingCoordinatorExternalStateImpl::allowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations) {
|
||||
void ShardingCoordinatorExternalStateImpl::allowMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations,
|
||||
std::function<OperationSessionInfo()> osiGetter,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) {
|
||||
if (allowMigrations) {
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss, boost::none);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss, boost::none, osiGetter, authoritativeState);
|
||||
} else {
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss, boost::none);
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss, boost::none, osiGetter, authoritativeState);
|
||||
}
|
||||
}
|
||||
|
||||
bool ShardingCoordinatorExternalStateImpl::checkAllowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss) {
|
||||
return sharding_ddl_util::checkAllowMigrations(opCtx, nss);
|
||||
bool ShardingCoordinatorExternalStateImpl::checkAllowMigrationsOnConfigServer(
|
||||
OperationContext* opCtx, const NamespaceString& nss) {
|
||||
return sharding_ddl_util::checkAllowMigrationsOnConfigServer(opCtx, nss);
|
||||
}
|
||||
|
||||
std::shared_ptr<ShardingCoordinatorExternalState>
|
||||
|
||||
@ -29,6 +29,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/global_catalog/ddl/sharding_coordinator_gen.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/util/modules.h"
|
||||
@ -46,8 +47,11 @@ public:
|
||||
const NamespaceString& bucketNss) const = 0;
|
||||
virtual void allowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations) = 0;
|
||||
virtual bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) = 0;
|
||||
bool allowMigrations,
|
||||
std::function<OperationSessionInfo()> osiGetter,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) = 0;
|
||||
virtual bool checkAllowMigrationsOnConfigServer(OperationContext* opCtx,
|
||||
const NamespaceString& nss) = 0;
|
||||
|
||||
private:
|
||||
};
|
||||
@ -63,8 +67,11 @@ public:
|
||||
const NamespaceString& bucketNss) const override;
|
||||
void allowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations) override;
|
||||
bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) override;
|
||||
bool allowMigrations,
|
||||
std::function<OperationSessionInfo()> osiGetter,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) override;
|
||||
bool checkAllowMigrationsOnConfigServer(OperationContext* opCtx,
|
||||
const NamespaceString& nss) override;
|
||||
};
|
||||
|
||||
class ShardingCoordinatorExternalStateFactory {
|
||||
|
||||
@ -50,15 +50,18 @@ bool ShardingCoordinatorExternalStateForTest::isTrackedTimeseries(
|
||||
return false;
|
||||
}
|
||||
|
||||
void ShardingCoordinatorExternalStateForTest::allowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations) {
|
||||
void ShardingCoordinatorExternalStateForTest::allowMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations,
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) {
|
||||
allowMigrationsResponse.getNext();
|
||||
migrationsAllowed = allowMigrations;
|
||||
}
|
||||
|
||||
bool ShardingCoordinatorExternalStateForTest::checkAllowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss) {
|
||||
bool ShardingCoordinatorExternalStateForTest::checkAllowMigrationsOnConfigServer(
|
||||
OperationContext* opCtx, const NamespaceString& nss) {
|
||||
migrationsAllowedResponse.getNext();
|
||||
return migrationsAllowed;
|
||||
}
|
||||
|
||||
@ -50,8 +50,11 @@ public:
|
||||
const NamespaceString& bucketNss) const override;
|
||||
void allowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowMigrations) override;
|
||||
bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) override;
|
||||
bool allowMigrations,
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) override;
|
||||
bool checkAllowMigrationsOnConfigServer(OperationContext* opCtx,
|
||||
const NamespaceString& nss) override;
|
||||
|
||||
MockCommandResponse allowMigrationsResponse;
|
||||
MockCommandResponse migrationsAllowedResponse;
|
||||
|
||||
@ -78,7 +78,6 @@
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/sharding_environment/sharding_logging.h"
|
||||
#include "mongo/db/topology/cluster_parameters/sharding_cluster_parameters_gen.h"
|
||||
#include "mongo/db/topology/cluster_role.h"
|
||||
#include "mongo/db/topology/shard_registry.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/topology/vector_clock/vector_clock.h"
|
||||
@ -215,17 +214,15 @@ void deleteCollection(OperationContext* opCtx,
|
||||
opCtx, std::move(transactionChain), writeConcern, osi, executor);
|
||||
}
|
||||
|
||||
void setAllowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const boost::optional<OperationSessionInfo>& osi,
|
||||
bool allowMigrations) {
|
||||
void setAllowMigrationsOnConfigServer(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi,
|
||||
bool allowMigrations) {
|
||||
ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, allowMigrations);
|
||||
configsvrSetAllowMigrationsCmd.setCollectionUUID(expectedCollectionUUID);
|
||||
generic_argument_util::setMajorityWriteConcern(configsvrSetAllowMigrationsCmd);
|
||||
if (osi) {
|
||||
generic_argument_util::setOperationSessionInfo(configsvrSetAllowMigrationsCmd, *osi);
|
||||
}
|
||||
generic_argument_util::setOperationSessionInfo(configsvrSetAllowMigrationsCmd, osi);
|
||||
|
||||
const auto swSetAllowMigrationsResult =
|
||||
Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand(
|
||||
@ -251,6 +248,79 @@ void setAllowMigrations(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
|
||||
void setAllowChunkOperations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
bool allowChunkOperations) {
|
||||
{
|
||||
ConfigsvrSetAllowChunkOperations configsvrSetAllowChunkOperationsCmd(nss);
|
||||
configsvrSetAllowChunkOperationsCmd.setDbName(nss.dbName());
|
||||
configsvrSetAllowChunkOperationsCmd.setAllowChunkOperations(allowChunkOperations);
|
||||
configsvrSetAllowChunkOperationsCmd.setCollectionUUID(expectedCollectionUUID);
|
||||
generic_argument_util::setMajorityWriteConcern(configsvrSetAllowChunkOperationsCmd);
|
||||
generic_argument_util::setOperationSessionInfo(configsvrSetAllowChunkOperationsCmd,
|
||||
osiGenerator());
|
||||
|
||||
const auto swSetAllowChunkOperationsResult =
|
||||
Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand(
|
||||
opCtx,
|
||||
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
|
||||
DatabaseName::kAdmin,
|
||||
configsvrSetAllowChunkOperationsCmd.toBSON(),
|
||||
Shard::RetryPolicy::kIdempotent);
|
||||
|
||||
// TODO (SERVER-127531) Review these catch clauses.
|
||||
try {
|
||||
uassertStatusOKWithContext(
|
||||
Shard::CommandResponse::getEffectiveStatus(swSetAllowChunkOperationsResult),
|
||||
str::stream() << "Error setting allowChunkOperations to " << allowChunkOperations
|
||||
<< " in the config server for collection "
|
||||
<< nss.toStringForErrorMsg());
|
||||
} catch (const ExceptionFor<ErrorCodes::NamespaceNotSharded>&) {
|
||||
// Collection no longer exists
|
||||
return;
|
||||
} catch (const ExceptionFor<ErrorCodes::ConflictingOperationInProgress>&) {
|
||||
// Collection metadata was concurrently dropped
|
||||
return;
|
||||
} catch (const ExceptionFor<ErrorCodes::ChunkMetadataInconsistency>&) {
|
||||
// Collection metadata has inconsistencies
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Use the router loop to target data-bearing shards. Shards containing no chunks of a
|
||||
// collection don't maintain any metadata for that collection, so we should not target them. By
|
||||
// using the shard protocol, we guarantee we are not missing any shard.
|
||||
sharding::router::CollectionRouter router(opCtx, nss);
|
||||
router.routeWithRoutingContext(
|
||||
"setAllowChunkOperations", [&](OperationContext* opCtx, RoutingContext& routingCtx) {
|
||||
const auto osi = osiGenerator();
|
||||
ShardsvrSetAllowChunkOperations shardsvrSetAllowChunkOperationsCmd(nss);
|
||||
shardsvrSetAllowChunkOperationsCmd.setDbName(nss.dbName());
|
||||
shardsvrSetAllowChunkOperationsCmd.setAllowChunkOperations(allowChunkOperations);
|
||||
shardsvrSetAllowChunkOperationsCmd.setCollectionUUID(expectedCollectionUUID);
|
||||
generic_argument_util::setMajorityWriteConcern(shardsvrSetAllowChunkOperationsCmd);
|
||||
generic_argument_util::setOperationSessionInfo(shardsvrSetAllowChunkOperationsCmd, osi);
|
||||
|
||||
const auto shardResponses = scatterGatherVersionedTargetByRoutingTable(
|
||||
opCtx,
|
||||
routingCtx,
|
||||
nss,
|
||||
shardsvrSetAllowChunkOperationsCmd.toBSON(),
|
||||
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
|
||||
Shard::RetryPolicy::kIdempotent,
|
||||
/*query=*/{},
|
||||
/*collation=*/{},
|
||||
/*letParameters=*/boost::none,
|
||||
/*runtimeConstants=*/boost::none);
|
||||
|
||||
for (const auto& response : shardResponses) {
|
||||
uassertStatusOK(AsyncRequestsSender::Response::getEffectiveStatus(response));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Status possiblyTruncateErrorStatus(const Status& status) {
|
||||
@ -498,18 +568,28 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadyTrackedWithOpt
|
||||
void stopMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const boost::optional<OperationSessionInfo>& osi) {
|
||||
setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, false);
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) {
|
||||
if (authoritativeState != AuthoritativeMetadataAccessLevelEnum::kNone) {
|
||||
setAllowChunkOperations(opCtx, nss, expectedCollectionUUID, osiGenerator, false);
|
||||
} else {
|
||||
setAllowMigrationsOnConfigServer(opCtx, nss, expectedCollectionUUID, osiGenerator(), false);
|
||||
}
|
||||
}
|
||||
|
||||
void resumeMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const boost::optional<OperationSessionInfo>& osi) {
|
||||
setAllowMigrations(opCtx, nss, expectedCollectionUUID, osi, true);
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState) {
|
||||
if (authoritativeState != AuthoritativeMetadataAccessLevelEnum::kNone) {
|
||||
setAllowChunkOperations(opCtx, nss, expectedCollectionUUID, osiGenerator, true);
|
||||
} else {
|
||||
setAllowMigrationsOnConfigServer(opCtx, nss, expectedCollectionUUID, osiGenerator(), true);
|
||||
}
|
||||
}
|
||||
|
||||
bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
bool checkAllowMigrationsOnConfigServer(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto collDoc =
|
||||
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
|
||||
opCtx,
|
||||
@ -527,7 +607,7 @@ bool checkAllowMigrations(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
!collDoc.empty());
|
||||
|
||||
auto coll = CollectionType(collDoc[0]);
|
||||
return coll.getAllowMigrations();
|
||||
return coll.getAllowMigrations() && coll.getAllowChunkOperations();
|
||||
}
|
||||
|
||||
boost::optional<UUID> getCollectionUUID(OperationContext* opCtx,
|
||||
|
||||
@ -214,7 +214,8 @@ MONGO_MOD_NEEDS_REPLACEMENT void stopMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const boost::optional<OperationSessionInfo>& osi = boost::none);
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState);
|
||||
|
||||
/**
|
||||
* Resume migrations and balancing rounds for the given nss.
|
||||
@ -225,14 +226,19 @@ MONGO_MOD_NEEDS_REPLACEMENT void resumeMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<UUID>& expectedCollectionUUID,
|
||||
const boost::optional<OperationSessionInfo>& osi = boost::none);
|
||||
std::function<OperationSessionInfo()> osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum authoritativeState);
|
||||
|
||||
/**
|
||||
* Calls to the config server primary to get the collection document for the given nss.
|
||||
* Returns the value of the allowMigrations flag on the collection document.
|
||||
* Returns false if either the allowMigrations or allowChunkOperations is false, true otherwise.
|
||||
* NOTE: This function does not guarantee that migrations have been drained if it returns
|
||||
* false, nor it guarantees consistency between shards in the case of allowChunkOperations
|
||||
* (authoritative behavior). You must call either stopMigrations or resumeMigrations and wait for
|
||||
* them to return without exception to have such guarantees.
|
||||
*/
|
||||
MONGO_MOD_NEEDS_REPLACEMENT bool checkAllowMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss);
|
||||
MONGO_MOD_NEEDS_REPLACEMENT bool checkAllowMigrationsOnConfigServer(OperationContext* opCtx,
|
||||
const NamespaceString& nss);
|
||||
|
||||
/*
|
||||
* Returns the UUID of the collection (if exists) using the catalog. It does not provide any locking
|
||||
|
||||
@ -37,6 +37,7 @@
|
||||
#include "mongo/db/admission/execution_control/execution_admission_context.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/exec/document_value/document.h"
|
||||
#include "mongo/db/global_catalog/type_collection.h"
|
||||
#include "mongo/db/global_catalog/type_database_gen.h"
|
||||
#include "mongo/db/namespace_string_util.h"
|
||||
#include "mongo/db/persistent_task_store.h"
|
||||
@ -565,6 +566,7 @@ void ShardingRecoveryService::onReplicationRollback(
|
||||
})) {
|
||||
_resetInMemoryStates(opCtx);
|
||||
_recoverDatabaseShardingState(opCtx);
|
||||
_recoverAllowChunkOperations(opCtx);
|
||||
_recoverRecoverableCriticalSections(opCtx);
|
||||
}
|
||||
|
||||
@ -587,6 +589,7 @@ void ShardingRecoveryService::onConsistentDataAvailable(OperationContext* opCtx,
|
||||
|
||||
_resetInMemoryStates(opCtx);
|
||||
_recoverDatabaseShardingState(opCtx);
|
||||
_recoverAllowChunkOperations(opCtx);
|
||||
_recoverRecoverableCriticalSections(opCtx);
|
||||
}
|
||||
|
||||
@ -644,6 +647,20 @@ void ShardingRecoveryService::_recoverDatabaseShardingState(OperationContext* op
|
||||
LOGV2_DEBUG(9813602, 2, "Recovered the DatabaseShardingState from the shard catalog");
|
||||
}
|
||||
|
||||
void ShardingRecoveryService::_recoverAllowChunkOperations(OperationContext* opCtx) {
|
||||
LOGV2_DEBUG(12120909, 2, "Recovering setAllowChunkOperations from the shard catalog");
|
||||
|
||||
PersistentTaskStore<CollectionType> store(
|
||||
NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
store.forEach(opCtx, BSONObj{}, [&opCtx](const CollectionType& coll) {
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, coll.getNss());
|
||||
scopedCsr->setAllowChunkOperations(coll.getAllowChunkOperations());
|
||||
return true;
|
||||
});
|
||||
|
||||
LOGV2_DEBUG(12120910, 2, "Recovered setAllowChunkOperations from the shard catalog");
|
||||
}
|
||||
|
||||
void ShardingRecoveryService::_resetInMemoryStates(OperationContext* opCtx) {
|
||||
LOGV2_DEBUG(10371108, 2, "Resetting all in-memory sharding states");
|
||||
|
||||
|
||||
@ -207,6 +207,12 @@ private:
|
||||
*/
|
||||
void _recoverDatabaseShardingState(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* This method is called when we have to recover the allowChunkOperations flag from disk to
|
||||
* memory (on startup or on rollback).
|
||||
*/
|
||||
void _recoverAllowChunkOperations(OperationContext* opCtx);
|
||||
|
||||
void onStartup(OperationContext* opCtx) final {}
|
||||
void onSetCurrentConfig(OperationContext* opCtx) final {}
|
||||
void onShutdown() final {}
|
||||
|
||||
@ -0,0 +1,233 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/db/auth/action_type.h"
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/auth/resource_pattern.h"
|
||||
#include "mongo/db/cancelable_operation_context.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/database_name.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharded_ddl_commands_gen.h"
|
||||
#include "mongo/db/global_catalog/ddl/sharding_ddl_util.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/s/migration_source_manager.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/shard_role/lock_manager/exception_util.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/operation_sharding_state.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/transaction/transaction_participant.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#include <string>
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class ShardsvrSetAllowChunkOperationsCommand final
|
||||
: public TypedCommand<ShardsvrSetAllowChunkOperationsCommand> {
|
||||
public:
|
||||
using Request = ShardsvrSetAllowChunkOperations;
|
||||
|
||||
bool skipApiVersionCheck() const override {
|
||||
// Internal command (server to server).
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string help() const override {
|
||||
return "Internal command. Do not call directly. Enable/disable chunk operations in a "
|
||||
"collection.";
|
||||
}
|
||||
|
||||
bool adminOnly() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool supportsRetryableWrite() const final {
|
||||
return true;
|
||||
}
|
||||
|
||||
class Invocation final : public InvocationBase {
|
||||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void setAllowChunkOperations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
boost::optional<CollectionAcquisition> acq,
|
||||
bool allowChunkOperations,
|
||||
const boost::optional<UUID>& uuid) {
|
||||
shard_catalog_commit::commitSetAllowChunkOperationsLocally(
|
||||
opCtx, nss, allowChunkOperations, uuid);
|
||||
|
||||
if (allowChunkOperations) {
|
||||
// If we are setting chunk operations back on, there's nothing to drain.
|
||||
return;
|
||||
}
|
||||
|
||||
boost::optional<SharedSemiFuture<void>> waitForMigrationAbort;
|
||||
|
||||
{
|
||||
const auto scopedCsr = CollectionShardingRuntime::acquireShared(opCtx, nss);
|
||||
if (auto msm = MigrationSourceManager::get(*scopedCsr)) {
|
||||
waitForMigrationAbort.emplace(msm->abort());
|
||||
}
|
||||
}
|
||||
|
||||
if (waitForMigrationAbort) {
|
||||
// Release the collection acquisition at this point. If some chunk operation (or
|
||||
// migration) was in the commit phase, we have to release the acquisition so that it
|
||||
// can acquire the critical section to attempt to commit. The commit should always
|
||||
// fail, because by the time this command is invoked, the CSRS has already stored
|
||||
// "allowMigrations: false", so it should reject any commit attempt.
|
||||
acq.reset();
|
||||
waitForMigrationAbort->get(opCtx);
|
||||
}
|
||||
}
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
ShardingState::get(opCtx)->assertCanAcceptShardedCommands();
|
||||
|
||||
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
|
||||
opCtx->getWriteConcern());
|
||||
|
||||
uassert(12120900,
|
||||
"Expected to be called within a retryable write",
|
||||
TransactionParticipant::get(opCtx));
|
||||
|
||||
const auto nss = ns();
|
||||
|
||||
uassert(12120913,
|
||||
"_shardsvrSetAllowChunkOperations should only run with AuthoritativeShardsDDL "
|
||||
"enabled",
|
||||
sharding_ddl_util::getGrantedAuthoritativeMetadataAccessLevel(
|
||||
VersionContext::getDecoration(opCtx),
|
||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()) !=
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
|
||||
uassert(12120901,
|
||||
"This command must be invoked following the shard protocol",
|
||||
OperationShardingState::isVersioned(opCtx, nss));
|
||||
|
||||
const auto allowChunkOperations = request().getAllowChunkOperations();
|
||||
const auto collectionUUID = request().getCollectionUUID();
|
||||
|
||||
{
|
||||
auto newClient = getGlobalServiceContext()->getService()->makeClient(
|
||||
"ShardsvrSetAllowChunkOperations");
|
||||
AlternativeClientRegion acr(newClient);
|
||||
auto newOpCtxContainer = CancelableOperationContext(
|
||||
cc().makeOperationContext(),
|
||||
opCtx->getCancellationToken(),
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
auto* const newOpCtx = newOpCtxContainer.get();
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
AuthorizationSession::get(newOpCtx->getClient())->grantInternalAuthorization();
|
||||
newOpCtx->setWriteConcern(opCtx->getWriteConcern());
|
||||
|
||||
// ScopedSetShardRole to transfer the shard version to the new opCtx.
|
||||
ScopedSetShardRole scopedSetShardRole(
|
||||
newOpCtx,
|
||||
nss,
|
||||
OperationShardingState::get(opCtx).getShardVersion(nss),
|
||||
OperationShardingState::get(opCtx).getDbVersion(nss.dbName()));
|
||||
|
||||
// While holding an acquisition, write conflicts are likely, so wrap everything
|
||||
// inside a writeConflictRetry.
|
||||
// TODO (SERVER-127438): remove this outer writeConflictRetry loop.
|
||||
writeConflictRetry(newOpCtx, "ShardsvrSetAllowChunkOperations", nss, [&] {
|
||||
// This command is invoked following the shard version protocol, so it is
|
||||
// necessary to acquire the target collection to enforce it. Additionally,
|
||||
// holding this acquisition guarantees serialization with the critical section
|
||||
// (both the blocking writes CS and the blocking reads CS), so it is guaranteed
|
||||
// that no chunk operation commits while this acquisition is held.
|
||||
boost::optional<CollectionAcquisition> acq = acquireCollection(
|
||||
newOpCtx,
|
||||
CollectionAcquisitionRequest::fromOpCtx(
|
||||
newOpCtx, nss, AcquisitionPrerequisites::OperationType::kWrite),
|
||||
MODE_IX);
|
||||
|
||||
setAllowChunkOperations(
|
||||
newOpCtx, nss, std::move(acq), allowChunkOperations, collectionUUID);
|
||||
});
|
||||
}
|
||||
|
||||
LOGV2_INFO(12120902,
|
||||
"setAllowChunkOperations finished",
|
||||
"ns"_attr = nss,
|
||||
"allowChunkOperations"_attr = allowChunkOperations);
|
||||
|
||||
// Since no write happened on this txnNumber, we need to make a dummy write so that
|
||||
// secondaries can be aware of this txn.
|
||||
DBDirectClient client(opCtx);
|
||||
client.update(NamespaceString::kServerConfigurationNamespace,
|
||||
BSON("_id" << Request::kCommandName),
|
||||
BSON("$inc" << BSON("count" << 1)),
|
||||
true /* upsert */,
|
||||
false /* multi */);
|
||||
}
|
||||
|
||||
private:
|
||||
NamespaceString ns() const override {
|
||||
return request().getCommandParameter();
|
||||
}
|
||||
|
||||
bool supportsWriteConcern() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {
|
||||
uassert(ErrorCodes::Unauthorized,
|
||||
"Unauthorized",
|
||||
AuthorizationSession::get(opCtx->getClient())
|
||||
->isAuthorizedForActionsOnResource(
|
||||
ResourcePattern::forClusterResource(request().getDbName().tenantId()),
|
||||
ActionType::internal));
|
||||
}
|
||||
};
|
||||
};
|
||||
MONGO_REGISTER_COMMAND(ShardsvrSetAllowChunkOperationsCommand).forShard();
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -280,18 +280,19 @@ ExecutorFuture<void> TimeseriesUpgradeDowngradeCoordinator::_runImpl(
|
||||
|
||||
// Freeze migrations on both namespaces. One of them will be the tracked namespace
|
||||
// and the other will be a no-op since it doesn't exist in the sharding catalog.
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(opCtx, originalNss(), boost::none, session);
|
||||
}
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
session);
|
||||
}
|
||||
// TODO (SERVER-126811): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
originalNss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
sharding_ddl_util::stopMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}))
|
||||
.then(_buildPhaseHandler(
|
||||
Phase::kAcquireCriticalSection,
|
||||
@ -517,18 +518,19 @@ ExecutorFuture<void> TimeseriesUpgradeDowngradeCoordinator::_runImpl(
|
||||
|
||||
// Resume migrations on both namespaces. One of them will be the tracked namespace
|
||||
// and the other will be a no-op since it doesn't exist in the sharding catalog.
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, originalNss(), boost::none, session);
|
||||
}
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
session);
|
||||
}
|
||||
// TODO (SERVER-126811): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}))
|
||||
.then([this, anchor = shared_from_this()] {
|
||||
auto opCtxHolder = makeOperationContext();
|
||||
@ -625,18 +627,19 @@ ExecutorFuture<void> TimeseriesUpgradeDowngradeCoordinator::_cleanupOnAbort(
|
||||
|
||||
// Resume migrations on both namespaces. One of them will be the tracked namespace
|
||||
// and the other will be a no-op since it doesn't exist in the sharding catalog.
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(opCtx, originalNss(), boost::none, session);
|
||||
}
|
||||
{
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
session);
|
||||
}
|
||||
// TODO (SERVER-126811): take AuthoritativeMetadataAccessLevelEnum from _doc.
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
sharding_ddl_util::resumeMigrations(
|
||||
opCtx,
|
||||
originalNss().makeTimeseriesBucketsNamespace(),
|
||||
boost::none,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -96,6 +96,7 @@ public:
|
||||
// Make field names accessible.
|
||||
static constexpr auto kEpochFieldName = kPre22CompatibleEpochFieldName;
|
||||
|
||||
using GlobalCatalogCollectionTypeBase::kAllowChunkOperationsFieldName;
|
||||
using GlobalCatalogCollectionTypeBase::kAllowMigrationsFieldName;
|
||||
using GlobalCatalogCollectionTypeBase::kDefaultCollationFieldName;
|
||||
using GlobalCatalogCollectionTypeBase::kDefragmentationPhaseFieldName;
|
||||
@ -187,6 +188,10 @@ public:
|
||||
GlobalCatalogCollectionTypeBase::setAllowMigrations(false);
|
||||
}
|
||||
|
||||
bool getAllowChunkOperations() const {
|
||||
return GlobalCatalogCollectionTypeBase::getAllowChunkOperations().get_value_or(true);
|
||||
}
|
||||
|
||||
// TODO SERVER-61033: remove after permitMigrations have been merge with allowMigrations.
|
||||
bool getPermitMigrations() const {
|
||||
return GlobalCatalogCollectionTypeBase::getPermitMigrations().get_value_or(true);
|
||||
|
||||
@ -117,6 +117,15 @@ structs:
|
||||
almost all DDL operations in order to guarantee that the set of
|
||||
shards, which comprise a collection will not change.
|
||||
|
||||
If the flag is not present it defaults to true."
|
||||
optional: true
|
||||
allowChunkOperations:
|
||||
type: bool
|
||||
description:
|
||||
"Whether this collection allows running chunk operations, including migrations.
|
||||
It is required by almost all authoritative DDL operations in order to guarantee
|
||||
mutual exclusion between chunk operations and regular DDL operations.
|
||||
|
||||
If the flag is not present it defaults to true."
|
||||
optional: true
|
||||
|
||||
|
||||
@ -591,6 +591,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_reset_placement_history_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_run_restore_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_run_restore_gen",
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_set_allow_chunk_operations_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_set_allow_migrations_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:configsvr_split_chunk_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:convert_to_capped_coordinator.cpp",
|
||||
@ -660,6 +661,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_rename_collection_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_rename_collection_participant_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_rename_index_metadata_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_set_allow_chunk_operations_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_set_allow_migrations_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_split_chunk_command.cpp",
|
||||
"//src/mongo/db/global_catalog/ddl:shardsvr_timeseries_upgrade_downgrade_participant_command.cpp",
|
||||
|
||||
@ -615,7 +615,7 @@ private:
|
||||
// warning is emitted.
|
||||
for (const auto& collType : collections) {
|
||||
if (!collType.getAllowBalance() || !collType.getAllowMigrations() ||
|
||||
!collType.getPermitMigrations()) {
|
||||
!collType.getPermitMigrations() || !collType.getAllowChunkOperations()) {
|
||||
auto matchStage = BSON("$match" << BSON(ChunkType::collectionUUID()
|
||||
<< collType.getUuid() << ChunkType::shard()
|
||||
<< BSON("$in" << drainingShardNameArray)));
|
||||
|
||||
@ -323,11 +323,14 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
|
||||
|
||||
// Atomically (still under the CSR lock held above) check whether migrations are allowed and
|
||||
// register the MigrationSourceManager on the CSR. This ensures that interruption due to the
|
||||
// change of allowMigrations to false will properly serialise and not allow any new MSMs to
|
||||
// be running after the change.
|
||||
// change of allowMigrations or allowChunkOperations to false will properly serialize and
|
||||
// not allow any new MSMs to be running after the change.
|
||||
uassert(ErrorCodes::ConflictingOperationInProgress,
|
||||
"Collection is undergoing changes so moveChunk is not allowed.",
|
||||
metadata.allowMigrations());
|
||||
fmt::format("Collection is undergoing changes so moveChunk is not allowed. "
|
||||
"allowMigrations: {}, allowChunkOperations: {}",
|
||||
metadata.allowMigrations(),
|
||||
scopedCsr->allowChunkOperations()),
|
||||
metadata.allowMigrations() && scopedCsr->allowChunkOperations());
|
||||
|
||||
_scopedRegisterer.emplace(this, *scopedCsr);
|
||||
|
||||
|
||||
@ -381,10 +381,10 @@ void ReshardingCoordinator::_stopMigrations(
|
||||
// We must actively stop existing migrations and prevent new from starting to prevent
|
||||
// migrations from racing with resharding to acquire the critical section.
|
||||
auto opCtx = _makeOperationContext();
|
||||
_reshardingCoordinatorExternalState->stopMigrations(opCtx.get(),
|
||||
_coordinatorDoc.getSourceNss(),
|
||||
_coordinatorDoc.getSourceUUID(),
|
||||
_getNewSession(opCtx.get()));
|
||||
_reshardingCoordinatorExternalState->stopMigrations(
|
||||
opCtx.get(), _coordinatorDoc.getSourceNss(), _coordinatorDoc.getSourceUUID(), [&] {
|
||||
return _getNewSession(opCtx.get());
|
||||
});
|
||||
|
||||
resharding::tellAllParticipantsToJoinMigrations(opCtx.get(),
|
||||
_getNewSession(opCtx.get()),
|
||||
@ -405,8 +405,10 @@ void ReshardingCoordinator::_resumeMigrations(OperationContext* opCtx,
|
||||
|
||||
auto collectionUUID =
|
||||
abortReason ? _coordinatorDoc.getSourceUUID() : _coordinatorDoc.getReshardingUUID();
|
||||
_reshardingCoordinatorExternalState->resumeMigrations(
|
||||
opCtx, _coordinatorDoc.getSourceNss(), collectionUUID, _getNewSession(opCtx));
|
||||
_reshardingCoordinatorExternalState->resumeMigrations(opCtx,
|
||||
_coordinatorDoc.getSourceNss(),
|
||||
collectionUUID,
|
||||
[&] { return _getNewSession(opCtx); });
|
||||
}
|
||||
|
||||
ExecutorFuture<void> ReshardingCoordinator::_initializeCoordinator(
|
||||
|
||||
@ -654,18 +654,30 @@ void ReshardingCoordinatorExternalStateImpl::verifyFinalCollection(
|
||||
"recipientDocumentsFinal"_attr = numDocsTemporary);
|
||||
}
|
||||
|
||||
void ReshardingCoordinatorExternalStateImpl::stopMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) {
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss, expectedCollectionUUID, osi);
|
||||
void ReshardingCoordinatorExternalStateImpl::stopMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
std::function<OperationSessionInfo()> osiGenerator) {
|
||||
// TODO (SERVER-127443): take AuthoritativeMetadataAccessLevelEnum from coordinator document.
|
||||
sharding_ddl_util::stopMigrations(opCtx,
|
||||
nss,
|
||||
expectedCollectionUUID,
|
||||
osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}
|
||||
|
||||
void ReshardingCoordinatorExternalStateImpl::resumeMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) {
|
||||
sharding_ddl_util::resumeMigrations(opCtx, nss, expectedCollectionUUID, osi);
|
||||
void ReshardingCoordinatorExternalStateImpl::resumeMigrations(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
std::function<OperationSessionInfo()> osiGenerator) {
|
||||
// TODO (SERVER-127443): take AuthoritativeMetadataAccessLevelEnum from coordinator document.
|
||||
sharding_ddl_util::resumeMigrations(opCtx,
|
||||
nss,
|
||||
expectedCollectionUUID,
|
||||
osiGenerator,
|
||||
AuthoritativeMetadataAccessLevelEnum::kNone);
|
||||
}
|
||||
|
||||
std::unique_ptr<CausalityBarrier> ReshardingCoordinatorExternalStateImpl::buildCausalityBarrier(
|
||||
|
||||
@ -154,7 +154,7 @@ public:
|
||||
virtual void stopMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) = 0;
|
||||
std::function<OperationSessionInfo()> osiGenerator) = 0;
|
||||
|
||||
/**
|
||||
* To be called on completion (both success and abort) to unset allowMigrations, re-enabling
|
||||
@ -163,7 +163,7 @@ public:
|
||||
virtual void resumeMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) = 0;
|
||||
std::function<OperationSessionInfo()> osiGenerator) = 0;
|
||||
/**
|
||||
* Builds a CausalityBarrier for the given participant shards, which is used to perform a no-op
|
||||
* retryable write on each shard.
|
||||
@ -239,12 +239,12 @@ public:
|
||||
void stopMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) override;
|
||||
std::function<OperationSessionInfo()> osiGenerator) override;
|
||||
|
||||
void resumeMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& expectedCollectionUUID,
|
||||
const OperationSessionInfo& osi) override;
|
||||
std::function<OperationSessionInfo()> osiGenerator) override;
|
||||
|
||||
std::unique_ptr<CausalityBarrier> buildCausalityBarrier(
|
||||
std::vector<ShardId> participants,
|
||||
|
||||
@ -270,7 +270,7 @@ public:
|
||||
void stopMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID&,
|
||||
const OperationSessionInfo&) override {
|
||||
std::function<OperationSessionInfo()>) override {
|
||||
DBDirectClient client(opCtx);
|
||||
client.update(NamespaceString::kConfigsvrCollectionsNamespace,
|
||||
BSON(CollectionType::kNssFieldName << NamespaceStringUtil::serialize(
|
||||
@ -284,7 +284,7 @@ public:
|
||||
void resumeMigrations(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID&,
|
||||
const OperationSessionInfo&) override {
|
||||
std::function<OperationSessionInfo()>) override {
|
||||
DBDirectClient client(opCtx);
|
||||
client.update(NamespaceString::kConfigsvrCollectionsNamespace,
|
||||
BSON(CollectionType::kNssFieldName << NamespaceStringUtil::serialize(
|
||||
|
||||
@ -122,13 +122,13 @@ public:
|
||||
void stopMigrations(OperationContext*,
|
||||
const NamespaceString&,
|
||||
const UUID&,
|
||||
const OperationSessionInfo&) override {
|
||||
std::function<OperationSessionInfo()>) override {
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
void resumeMigrations(OperationContext*,
|
||||
const NamespaceString&,
|
||||
const UUID&,
|
||||
const OperationSessionInfo&) override {
|
||||
std::function<OperationSessionInfo()>) override {
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
std::unique_ptr<CausalityBarrier> buildCausalityBarrier(
|
||||
|
||||
@ -493,6 +493,8 @@ void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection_autho
|
||||
OperationContext* opCtx, const UUID& collectionUuid) {
|
||||
_clearFilteringMetadata(opCtx, /* collIsDropped */ true);
|
||||
_authoritativeState = AuthoritativeState::kAuthoritative;
|
||||
// Since we are dropping the collection, allowChunkOperations should be reset to `true`.
|
||||
_allowChunkOperations = true;
|
||||
}
|
||||
|
||||
Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
|
||||
@ -818,6 +820,14 @@ void CollectionShardingRuntime::setNonAuthoritative() {
|
||||
_authoritativeState = AuthoritativeState::kNonAuthoritative;
|
||||
}
|
||||
|
||||
bool CollectionShardingRuntime::allowChunkOperations() const {
|
||||
return _allowChunkOperations;
|
||||
}
|
||||
|
||||
void CollectionShardingRuntime::setAllowChunkOperations(bool allowChunkOperations) {
|
||||
_allowChunkOperations = allowChunkOperations;
|
||||
}
|
||||
|
||||
CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx,
|
||||
NamespaceString nss,
|
||||
BSONObj reason)
|
||||
|
||||
@ -238,6 +238,10 @@ public:
|
||||
* Calls to clearFilteringMetadata + clears the _metadataManager object.
|
||||
*/
|
||||
void clearFilteringMetadataForDroppedCollection_nonAuthoritative(OperationContext* opCtx);
|
||||
/**
|
||||
* Calls to clearFilteringMetadata + clears the _metadataManager object. Also resets the
|
||||
* _allowChunkOperations flag (i.e. sets it to `true`).
|
||||
*/
|
||||
void clearFilteringMetadataForDroppedCollection_authoritative(OperationContext* opCtx,
|
||||
const UUID& collectionUuid);
|
||||
|
||||
@ -355,6 +359,9 @@ public:
|
||||
void setCollectionRecoverer(std::shared_ptr<CollectionCacheRecoverer> recoverer);
|
||||
std::shared_ptr<CollectionCacheRecoverer> getCollectionCacheRecoverer() const;
|
||||
|
||||
bool allowChunkOperations() const;
|
||||
void setAllowChunkOperations(bool allowChunkOperations);
|
||||
|
||||
private:
|
||||
friend class CollectionShardingRuntimeTest;
|
||||
|
||||
@ -467,6 +474,8 @@ private:
|
||||
// Tracks the fact that concurrent recovery of the collection's sharding metadata is taking
|
||||
// place by a concurrent thread handling a shard version mismatch
|
||||
std::shared_ptr<CollectionCacheRecoverer> _collectionRecoverer;
|
||||
|
||||
bool _allowChunkOperations{true};
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -41,11 +41,12 @@
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_raii.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_metadata.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/database_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/operation_sharding_state.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata_gen.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/db/storage/write_unit_of_work.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/db/versioning_protocol/stale_exception.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
|
||||
@ -91,15 +92,21 @@ bool doesShardOwnChunks(OperationContext* opCtx,
|
||||
return !chunkList.empty();
|
||||
}
|
||||
|
||||
write_ops::UpdateOpEntry makeUpsertEntry(BSONObj filter, BSONObj replacement) {
|
||||
write_ops::UpdateOpEntry makeUpdateEntry(BSONObj filter, BSONObj replacement) {
|
||||
write_ops::UpdateOpEntry entry;
|
||||
entry.setQ(std::move(filter));
|
||||
entry.setU(std::move(replacement));
|
||||
entry.setUpsert(true);
|
||||
entry.setUpsert(false);
|
||||
entry.setMulti(false);
|
||||
return entry;
|
||||
}
|
||||
|
||||
write_ops::UpdateOpEntry makeUpsertEntry(BSONObj filter, BSONObj replacement) {
|
||||
write_ops::UpdateOpEntry entry = makeUpdateEntry(std::move(filter), std::move(replacement));
|
||||
entry.setUpsert(true);
|
||||
return entry;
|
||||
}
|
||||
|
||||
void executeLocalUpdates(DBDirectClient& dbClient,
|
||||
const NamespaceString& nss,
|
||||
std::vector<write_ops::UpdateOpEntry> updates) {
|
||||
@ -122,6 +129,24 @@ void executeLocalDelete(DBDirectClient& dbClient,
|
||||
write_ops::checkWriteErrors(dbClient.remove(std::move(req)));
|
||||
}
|
||||
|
||||
void writeCollectionAllowChunkOperationsLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowChunkOperations) {
|
||||
const auto serializedNs =
|
||||
NamespaceStringUtil::serialize(nss, SerializationContext::stateDefault());
|
||||
|
||||
DBDirectClient dbClient(opCtx);
|
||||
|
||||
executeLocalUpdates(
|
||||
dbClient,
|
||||
NamespaceString::kConfigShardCatalogCollectionsNamespace,
|
||||
{makeUpdateEntry(
|
||||
BSON(CollectionType::kNssFieldName << serializedNs),
|
||||
allowChunkOperations
|
||||
? BSON("$unset" << BSON(CollectionType::kAllowChunkOperationsFieldName << ""))
|
||||
: BSON("$set" << BSON(CollectionType::kAllowChunkOperationsFieldName << false)))});
|
||||
}
|
||||
|
||||
void writeCollectionMetadataLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const ShardCatalogCollectionTypeBase& coll,
|
||||
@ -242,6 +267,34 @@ void invalidateCollectionMetadataOnSecondaries(OperationContext* opCtx,
|
||||
});
|
||||
}
|
||||
|
||||
void setAllowChunkOperationsOnSecondaries(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& uuid,
|
||||
bool allowChunkOperations) {
|
||||
repl::MutableOplogEntry oplogEntry;
|
||||
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
|
||||
oplogEntry.setVersionContextIfHasOperationFCV(VersionContext::getDecoration(opCtx));
|
||||
oplogEntry.setNss(nss);
|
||||
oplogEntry.setUuid(uuid);
|
||||
auto entry = SetAllowChunkOperationsOplogEntry{std::string(nss.coll()), allowChunkOperations};
|
||||
oplogEntry.setObject(entry.toBSON());
|
||||
oplogEntry.setOpTime(OplogSlot());
|
||||
oplogEntry.setWallClockTime(opCtx->fastClockSource().now());
|
||||
|
||||
writeConflictRetry(opCtx, "setAllowChunkOperations", NamespaceString::kRsOplogNamespace, [&] {
|
||||
AutoGetOplogFastPath oplogWrite(opCtx, OplogAccessMode::kWrite);
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
|
||||
uassert(12120908,
|
||||
str::stream() << "Failed to create new oplog entry for "
|
||||
"setAllowChunkOperations with opTime: "
|
||||
<< oplogEntry.getOpTime().toString() << ": "
|
||||
<< redact(oplogEntry.toBSON()),
|
||||
!opTime.isNull());
|
||||
wuow.commit();
|
||||
});
|
||||
}
|
||||
|
||||
void updateShardCatalogCache(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const CollectionType& coll,
|
||||
@ -281,6 +334,7 @@ void updateShardCatalogCache(OperationContext* opCtx,
|
||||
|
||||
scopedCsr->setFilteringMetadata_authoritative(
|
||||
opCtx, std::move(ownedMetadata), CollectionShardingRuntime::NoRoutingTableAs::kUntracked);
|
||||
scopedCsr->setAllowChunkOperations(coll.getAllowChunkOperations());
|
||||
}
|
||||
|
||||
void clearShardCatalogCacheForDroppedCollection(OperationContext* opCtx,
|
||||
@ -448,5 +502,97 @@ void commitCollectionMetadataLocally(OperationContext* opCtx,
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
}
|
||||
|
||||
void commitSetAllowChunkOperationsLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowChunkOperations,
|
||||
const boost::optional<UUID>& uuid) {
|
||||
const auto [metadata, isAuthoritative] = [&]() {
|
||||
const auto csr = CollectionShardingRuntime::acquireShared(opCtx, nss);
|
||||
return std::make_pair(csr->getCurrentMetadataIfKnown(),
|
||||
csr->getAuthoritativeState() ==
|
||||
CollectionShardingRuntime::AuthoritativeState::kAuthoritative);
|
||||
}();
|
||||
|
||||
if (isAuthoritative) {
|
||||
// This function is always be called after having passed a shard version check. If the
|
||||
// installed metadata is authoritative, then it can't be unknown by this point, unless there
|
||||
// was a concurrent invalidation. The StaleConfigInfo will cause a refresh in case the
|
||||
// metadata is unknown.
|
||||
// TODO (SERVER-127444): investigate if this can be a tassert once there can't be concurrent
|
||||
// flushes.
|
||||
uassert(StaleConfigInfo(nss,
|
||||
*OperationShardingState::get(opCtx).getShardVersion(nss),
|
||||
boost::none /* wantedVersion */,
|
||||
ShardingState::get(opCtx)->shardId()),
|
||||
fmt::format("commitSetAllowChunkOperationsLocally: collection metadata for {} is "
|
||||
"not currently known and needs to be recovered",
|
||||
nss.toStringForErrorMsg()),
|
||||
metadata);
|
||||
|
||||
uassert(
|
||||
ErrorCodes::InvalidUUID,
|
||||
fmt::format(
|
||||
"Collection uuid {} in the request does not match the current uuid {} for ns {}",
|
||||
uuid->toString(),
|
||||
metadata->hasRoutingTable() ? metadata->getUUID().toString() : "(no routing table)",
|
||||
nss.toStringForErrorMsg()),
|
||||
!uuid || (metadata->hasRoutingTable() && uuid == metadata->getUUID()));
|
||||
|
||||
writeCollectionAllowChunkOperationsLocally(opCtx, nss, allowChunkOperations);
|
||||
|
||||
// Write an oplog 'c' entry to send the new value to secondaries.
|
||||
setAllowChunkOperationsOnSecondaries(opCtx, nss, metadata->getUUID(), allowChunkOperations);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, nss);
|
||||
scopedCsr->setAllowChunkOperations(allowChunkOperations);
|
||||
|
||||
LOGV2(12120905,
|
||||
"commitSetAllowChunkOperationsLocally authoritatively",
|
||||
"nss"_attr = nss,
|
||||
"allowChunkOperations"_attr = allowChunkOperations);
|
||||
} else {
|
||||
const auto coll = fetchCollection(opCtx, nss);
|
||||
|
||||
uassert(
|
||||
ErrorCodes::InvalidUUID,
|
||||
fmt::format(
|
||||
"Collection uuid {} in the request does not match the current uuid {} for ns {}",
|
||||
uuid->toString(),
|
||||
coll.getUuid().toString(),
|
||||
nss.toStringForErrorMsg()),
|
||||
!uuid || uuid == coll.getUuid());
|
||||
|
||||
const auto chunks = fetchOwnedChunks(opCtx, nss, coll);
|
||||
tassert(12120907,
|
||||
"Retrieved allowChunkOperations from CSRS doesn't match",
|
||||
coll.getAllowChunkOperations() == allowChunkOperations);
|
||||
|
||||
// The CSR is non-authoritative at this point, so the durable shard catalog may still hold
|
||||
// rows written during a prior authoritative phase that nothing has cleaned up since. Drop
|
||||
// the older chunks before upserting the currently-owned ones so it doesn't end up
|
||||
// coexisting with real chunk rows for the same UUID.
|
||||
DBDirectClient dbClient(opCtx);
|
||||
executeLocalDelete(dbClient,
|
||||
NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
BSON(ChunkType::collectionUUID() << coll.getUuid()),
|
||||
true /* multi */);
|
||||
|
||||
writeCollectionMetadataLocally(opCtx, nss, coll.asShardCatalogType(), chunks);
|
||||
|
||||
// Write an oplog 'c' entry to invalidate collection metadata on secondaries.
|
||||
invalidateCollectionMetadataOnSecondaries(
|
||||
opCtx, nss, coll.getUuid(), /*forDropCollection=*/false);
|
||||
// Write an oplog 'c' entry to send the new allowChunkOperations value to secondaries.
|
||||
setAllowChunkOperationsOnSecondaries(opCtx, nss, metadata->getUUID(), allowChunkOperations);
|
||||
|
||||
updateShardCatalogCache(opCtx, nss, coll, chunks);
|
||||
|
||||
LOGV2(12120906,
|
||||
"commitSetAllowChunkOperationsLocally non-authoritatively",
|
||||
"nss"_attr = nss,
|
||||
"allowChunkOperations"_attr = allowChunkOperations);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -85,5 +85,13 @@ void commitCollectionMetadataLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool isDbPrimaryShard = false);
|
||||
|
||||
/**
|
||||
* Commits the allowChunkOperations flag to the shard catalog (config.shard.catalog.collections).
|
||||
*/
|
||||
void commitSetAllowChunkOperationsLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool allowChunkOperations,
|
||||
const boost::optional<UUID>& uuid);
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -94,7 +94,7 @@ public:
|
||||
// Assert that migrations are disabled.
|
||||
uassert(10140200,
|
||||
"_shardsvrFetchCollMetadata can only run when migrations are disabled",
|
||||
!sharding_ddl_util::checkAllowMigrations(opCtx, nss));
|
||||
!sharding_ddl_util::checkAllowMigrationsOnConfigServer(opCtx, nss));
|
||||
|
||||
// Use an AlternativeClientRegion to perform the shard catalog writes outside the
|
||||
// retryable write session. The shard catalog commit contains its own idempotency
|
||||
|
||||
@ -754,7 +754,18 @@ void ShardServerOpObserver::onInvalidateCollectionMetadata(OperationContext* opC
|
||||
|
||||
void ShardServerOpObserver::onSetAllowChunkOperations(OperationContext* opCtx,
|
||||
const repl::OplogEntry& op) {
|
||||
// TODO (SERVER-121209): implement.
|
||||
tassert(12120912,
|
||||
"setAllowChunkOperations oplog entry is missing the collection UUID",
|
||||
op.getUuid());
|
||||
|
||||
const auto nss =
|
||||
CommandHelpers::parseNsCollectionRequired(op.getNss().dbName(), op.getObject());
|
||||
|
||||
const auto entry = SetAllowChunkOperationsOplogEntry::parse(
|
||||
op.getObject(), IDLParserContext("SetAllowChunkOperationsOplogEntryContext"));
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireExclusive(opCtx, nss);
|
||||
scopedCsr->setAllowChunkOperations(entry.getAllowChunkOperations());
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
Loading…
Reference in New Issue
Block a user