SERVER-126201 Implement cleanup of chunkless collections (#54093)
GitOrigin-RevId: 9541bf8067ba029282a97c165cce220ea1f9aab2
This commit is contained in:
parent
9bb4967995
commit
576672878f
@ -2747,7 +2747,6 @@ mongo_cc_library(
|
||||
"//src/mongo/db/auth:auth_op_observer",
|
||||
"//src/mongo/db/shard_role/shard_catalog:catalog_helpers",
|
||||
"//src/mongo/db/shard_role/shard_catalog:catalog_impl",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_catalog_history_cleanup",
|
||||
"//src/mongo/db/collection_crud",
|
||||
"//src/mongo/db/repl/dbcheck:health_log",
|
||||
"//src/mongo/db/repl/dbcheck:health_log_interface",
|
||||
|
||||
@ -453,6 +453,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/shard_role/shard_catalog:database_sharding_runtime.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:database_sharding_state_factory_shard.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:metadata_manager.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_catalog_history_cleanup.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_filtering_metadata_refresh.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:stale_shard_exception_handler.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:type_oplog_catalog_metadata_gen",
|
||||
@ -914,6 +915,7 @@ mongo_cc_unit_test(
|
||||
"//src/mongo/db/shard_role/shard_catalog:metadata_manager_test.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:op_observer_sharding_test.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:operation_sharding_state_test.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_catalog_history_cleanup_test.cpp",
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_filtering_metadata_refresh_test.cpp",
|
||||
"//src/mongo/db/sharding_environment:range_arithmetic_test.cpp",
|
||||
"//src/mongo/db/sharding_environment:shard_local_test.cpp",
|
||||
|
||||
@ -214,21 +214,6 @@ mongo_cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "shard_catalog_history_cleanup",
|
||||
srcs = [
|
||||
"//src/mongo/db/shard_role/shard_catalog:shard_catalog_history_cleanup.cpp",
|
||||
],
|
||||
deps = [
|
||||
"//src/mongo/db:dbdirectclient",
|
||||
"//src/mongo/db:rw_concern_d",
|
||||
"//src/mongo/db:server_feature_flags",
|
||||
"//src/mongo/db:shard_role_api",
|
||||
"//src/mongo/db/repl:replica_set_aware_service",
|
||||
"//src/mongo/db/s:sharding_commands_d",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "catalog_helpers",
|
||||
srcs = [
|
||||
@ -488,7 +473,6 @@ mongo_cc_unit_test(
|
||||
"index_signature_test.cpp",
|
||||
"rename_collection_test.cpp",
|
||||
"set_multikey_metadata_oplog_helpers_test.cpp",
|
||||
"shard_catalog_history_cleanup_test.cpp",
|
||||
],
|
||||
tags = ["mongo_unittest_eighth_group"],
|
||||
deps = [
|
||||
@ -500,7 +484,6 @@ mongo_cc_unit_test(
|
||||
":collection_mock",
|
||||
":collection_options",
|
||||
":database_holder",
|
||||
":shard_catalog_history_cleanup",
|
||||
"//src/mongo/db:dbhelpers",
|
||||
"//src/mongo/db:index_key_validate",
|
||||
"//src/mongo/db:multitenancy",
|
||||
|
||||
@ -45,6 +45,16 @@ server_parameters:
|
||||
lte: 1
|
||||
redact: false
|
||||
|
||||
enableBackgroundCleanupOfShardCatalog:
|
||||
description: Enables background cleanup of the authoritative shard catalog
|
||||
set_at:
|
||||
- startup
|
||||
- runtime
|
||||
cpp_varname: gEnableBackgroundCleanupOfShardCatalog
|
||||
cpp_vartype: AtomicWord<bool>
|
||||
default: true
|
||||
redact: false
|
||||
|
||||
feature_flags:
|
||||
featureFlagPerformDeepCopyOfCollections:
|
||||
description: Forces the CollectionCatalog to always do a deep copy of collections
|
||||
|
||||
@ -31,17 +31,116 @@
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/db/commands/feature_compatibility_version.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/type_chunk.h"
|
||||
#include "mongo/db/global_catalog/type_collection.h"
|
||||
#include "mongo/db/persistent_task_store.h"
|
||||
#include "mongo/db/pipeline/aggregate_command_gen.h"
|
||||
#include "mongo/db/repl/replication_coordinator.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/commit_collection_metadata_locally.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/database_sharding_runtime.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/server_parameters_gen.h"
|
||||
#include "mongo/db/shard_role/shard_role.h"
|
||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
|
||||
|
||||
namespace mongo::shard_catalog_helper {
|
||||
|
||||
namespace {
|
||||
|
||||
void cleanupCollectionEntry(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto acquisition = acquireCollection(
|
||||
opCtx,
|
||||
CollectionAcquisitionRequest::fromOpCtx(opCtx, nss, AcquisitionPrerequisites::kWrite),
|
||||
MODE_IX);
|
||||
|
||||
if (!acquisition.exists()) {
|
||||
// Collection got dropped, the drop coordinator will remove all the metadata on this shard.
|
||||
LOGV2_INFO(12620107,
|
||||
"Skipping collection as it got dropped and there shouldn't be any data present",
|
||||
"collection"_attr = nss);
|
||||
return;
|
||||
}
|
||||
|
||||
const auto isCriticalSectionActive =
|
||||
CollectionShardingRuntime::acquireShared(opCtx, nss)
|
||||
->getCriticalSectionSignal(ShardingMigrationCriticalSection::kWrite)
|
||||
.has_value();
|
||||
if (isCriticalSectionActive) {
|
||||
LOGV2_INFO(
|
||||
12620106, "Skipping collection as critical section is active", "collection"_attr = nss);
|
||||
// Do an early return since the critical section is active, meaning changes are being made
|
||||
// to the collection metadata. Bail out to prevent conflicts.
|
||||
return;
|
||||
}
|
||||
|
||||
PersistentTaskStore<ChunkType> chunkStore{NamespaceString::kConfigShardCatalogChunksNamespace};
|
||||
auto chunkCount =
|
||||
chunkStore.count(opCtx, BSON(ChunkType::collectionUUID() << acquisition.uuid()));
|
||||
if (chunkCount != 0) {
|
||||
LOGV2_INFO(
|
||||
12620104, "Skipping collection as it still holds chunks", "collection"_attr = nss);
|
||||
// Collection still has chunks in the shard.
|
||||
return;
|
||||
}
|
||||
|
||||
auto serializedNs = NamespaceStringUtil::serialize(nss, SerializationContext::stateDefault());
|
||||
PersistentTaskStore<CollectionType> collStore{
|
||||
NamespaceString::kConfigShardCatalogCollectionsNamespace};
|
||||
collStore.remove(opCtx, BSON(CollectionType::kNssFieldName << serializedNs));
|
||||
}
|
||||
|
||||
std::vector<NamespaceString> getStaleCollectionEntries(OperationContext* opCtx) {
|
||||
std::vector<NamespaceString> collections;
|
||||
|
||||
static constexpr auto kChunksFieldName = "collection_chunks"_sd;
|
||||
|
||||
AggregateCommandRequest aggRequest{NamespaceString::kConfigShardCatalogCollectionsNamespace};
|
||||
// Lookup collections that do not have any chunks on the durable catalog.
|
||||
aggRequest.setPipeline(
|
||||
{BSON("$lookup" << BSON("from" << NamespaceString::kConfigShardCatalogChunksNamespace.coll()
|
||||
<< "localField" << CollectionType::kUuidFieldName
|
||||
<< "foreignField" << ChunkType::collectionUUID() << "as"
|
||||
<< kChunksFieldName << "pipeline"
|
||||
<< BSON_ARRAY(BSON("$limit" << 1)))),
|
||||
BSON("$match" << BSON(kChunksFieldName << BSONArray())),
|
||||
BSON("$project" << BSON(CollectionType::kNssFieldName << 1))});
|
||||
|
||||
DBDirectClient client(opCtx);
|
||||
auto cursor = uassertStatusOKWithContext(
|
||||
DBClientCursor::fromAggregationRequest(
|
||||
&client, aggRequest, false /* secondaryOk */, true /* useExhaust */),
|
||||
"Failed to establish a cursor for aggregation");
|
||||
while (cursor->more()) {
|
||||
auto nssStringData =
|
||||
cursor->nextSafe().getField(CollectionType::kNssFieldName).valueStringData();
|
||||
auto nss = NamespaceStringUtil::deserialize(
|
||||
boost::none, nssStringData, SerializationContext::stateDefault());
|
||||
const auto isPrimaryDbShard =
|
||||
DatabaseShardingRuntime::acquireShared(opCtx, nss.dbName())->getDbPrimaryShard(opCtx) ==
|
||||
ShardingState::get(opCtx)->shardId();
|
||||
if (isPrimaryDbShard) {
|
||||
// The primary must always contain the collection entry in order to avoid having
|
||||
// tracked/untracked issues.
|
||||
continue;
|
||||
}
|
||||
collections.emplace_back(std::move(nss));
|
||||
}
|
||||
return collections;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
StorageEngine::TimestampMonitor::TimestampListener kShardCatalogHistoryCleanupTimestampListener(
|
||||
[](OperationContext* opCtx, const StorageEngine::TimestampMonitor::Timestamps& timestamp) {
|
||||
if (!gEnableBackgroundCleanupOfShardCatalog.loadRelaxed()) {
|
||||
LOGV2_DEBUG(12620105, 1, "Skipping cleanup of shard catalog");
|
||||
return;
|
||||
}
|
||||
|
||||
auto oldest = timestamp.oldest;
|
||||
auto* service = opCtx->getServiceContext();
|
||||
auto const shardingState = ShardingState::get(service);
|
||||
@ -56,15 +155,39 @@ StorageEngine::TimestampMonitor::TimestampListener kShardCatalogHistoryCleanupTi
|
||||
return;
|
||||
}
|
||||
|
||||
// We fix the FCV region here in order to ensure cleanup only works while FCV is fully 9.0
|
||||
FixedFCVRegion guard{opCtx};
|
||||
const auto fcvSnapshot = serverGlobalParams.featureCompatibility.acquireFCVSnapshot();
|
||||
if (!(feature_flags::gAuthoritativeShardsDDL.isEnabled(VersionContext::getDecoration(opCtx),
|
||||
fcvSnapshot) &&
|
||||
feature_flags::gAuthoritativeShardsCRUD.isEnabled(
|
||||
VersionContext::getDecoration(opCtx), fcvSnapshot))) {
|
||||
// The shard is not yet doing authoritative DDLs, as such it can skip the cleanup.
|
||||
return;
|
||||
}
|
||||
|
||||
auto shardId = shardingState->shardId();
|
||||
|
||||
PersistentTaskStore<ChunkType> chunkStore{
|
||||
NamespaceString::kConfigShardCatalogChunksNamespace};
|
||||
try {
|
||||
LOGV2_DEBUG(12620103,
|
||||
1,
|
||||
"Performing cleanup of stale chunks and collection entries",
|
||||
"oldestTimestamp"_attr = oldest);
|
||||
chunkStore.remove(opCtx,
|
||||
BSON(ChunkType::shard()
|
||||
<< BSON("$ne" << shardId.toString())
|
||||
<< ChunkType::onCurrentShardSince() << BSON("$lt" << oldest)));
|
||||
|
||||
const auto collectionsToRemove = getStaleCollectionEntries(opCtx);
|
||||
for (const auto& collNss : collectionsToRemove) {
|
||||
LOGV2_INFO(
|
||||
12620101,
|
||||
"Cleaning up potentially stale collection entry from durable shard catalog",
|
||||
"collection"_attr = collNss);
|
||||
cleanupCollectionEntry(opCtx, collNss);
|
||||
}
|
||||
} catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) {
|
||||
// Primary can be killed in the middle of the removal.
|
||||
return;
|
||||
@ -85,6 +208,7 @@ StorageEngine::TimestampMonitor::TimestampListener kShardCatalogHistoryCleanupTi
|
||||
return;
|
||||
}
|
||||
// Otherwise, re-throw the DBException
|
||||
throw;
|
||||
}
|
||||
});
|
||||
} // namespace mongo::shard_catalog_helper
|
||||
|
||||
@ -34,28 +34,40 @@
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
#include "mongo/db/global_catalog/type_chunk.h"
|
||||
#include "mongo/db/global_catalog/type_collection.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/service_context_d_test_fixture.h"
|
||||
#include "mongo/db/sharding_environment/shard_server_test_fixture.h"
|
||||
#include "mongo/db/storage/storage_engine.h"
|
||||
#include "mongo/db/topology/sharding_state.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/future.h"
|
||||
#include "mongo/util/future_util.h"
|
||||
#include "mongo/util/periodic_runner_factory.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class ShardCatalogHistoryCleanupTest : public ServiceContextMongoDTest {
|
||||
namespace {
|
||||
static const ShardId kAnotherShardName{"anotherTestShard"};
|
||||
static const KeyPattern kKeyPattern{BSON("_id" << 1)};
|
||||
} // namespace
|
||||
|
||||
class ShardCatalogHistoryCleanupTest : public ShardServerTestFixture {
|
||||
public:
|
||||
void setUp() override {
|
||||
ServiceContextMongoDTest::setUp();
|
||||
ShardServerTestFixture::setUp();
|
||||
auto* svc = getServiceContext();
|
||||
|
||||
auto* storageEngine = svc->getStorageEngine();
|
||||
|
||||
storageEngine->startTimestampMonitor(
|
||||
{&shard_catalog_helper::kShardCatalogHistoryCleanupTimestampListener});
|
||||
|
||||
// Imagining the test is running on a primary in sharded cluster
|
||||
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
|
||||
auto* replCoord = repl::ReplicationCoordinator::get(getServiceContext());
|
||||
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
|
||||
}
|
||||
|
||||
void waitForTimestampMonitorPass() {
|
||||
@ -72,112 +84,203 @@ public:
|
||||
pf.future.wait();
|
||||
timestampMonitor->removeListener(&listener);
|
||||
}
|
||||
|
||||
std::pair<CollectionType, std::vector<ChunkType>> makeCollectionAndChunks(
|
||||
const NamespaceString& nss,
|
||||
bool isCurrentlyOwned,
|
||||
bool isOutdatedButActive,
|
||||
bool isFullyOutdated,
|
||||
Timestamp currentTimestamp,
|
||||
Timestamp intermediateTimestamp,
|
||||
Timestamp staleTimestamp) {
|
||||
|
||||
auto collectionUUID = UUID::gen();
|
||||
auto epoch = OID::gen();
|
||||
auto placement = CollectionPlacement(1, 1);
|
||||
CollectionType collEntry{
|
||||
nss, epoch, staleTimestamp, Date_t::now(), collectionUUID, kKeyPattern};
|
||||
|
||||
std::vector<ChunkType> chunks;
|
||||
if (isCurrentlyOwned) {
|
||||
auto currentlyOwnedChunkType =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 100)),
|
||||
ChunkVersion(CollectionGeneration{epoch, currentTimestamp}, placement),
|
||||
kMyShardName);
|
||||
currentlyOwnedChunkType.setHistory({ChunkHistory(currentTimestamp, kMyShardName)});
|
||||
currentlyOwnedChunkType.setOnCurrentShardSince(currentTimestamp);
|
||||
chunks.emplace_back(std::move(currentlyOwnedChunkType));
|
||||
}
|
||||
if (isOutdatedButActive) {
|
||||
auto unownedButVisibleChunk =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << 100), BSON("_id" << 200)),
|
||||
ChunkVersion(CollectionGeneration{epoch, currentTimestamp}, placement),
|
||||
kAnotherShardName);
|
||||
unownedButVisibleChunk.setHistory(
|
||||
{ChunkHistory(intermediateTimestamp, kAnotherShardName),
|
||||
ChunkHistory(staleTimestamp, kMyShardName)});
|
||||
unownedButVisibleChunk.setOnCurrentShardSince(intermediateTimestamp);
|
||||
chunks.emplace_back(std::move(unownedButVisibleChunk));
|
||||
}
|
||||
if (isFullyOutdated) {
|
||||
auto staleChunk =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << 200), BSON("_id" << 300)),
|
||||
ChunkVersion(CollectionGeneration{epoch, currentTimestamp}, placement),
|
||||
kAnotherShardName);
|
||||
staleChunk.setHistory({ChunkHistory(staleTimestamp - 1, kAnotherShardName),
|
||||
ChunkHistory(staleTimestamp - 2, kMyShardName)});
|
||||
staleChunk.setOnCurrentShardSince(staleTimestamp - 1);
|
||||
|
||||
chunks.emplace_back(std::move(staleChunk));
|
||||
}
|
||||
|
||||
return {std::move(collEntry), std::move(chunks)};
|
||||
}
|
||||
void setupCollection(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool isCurrentlyOwned,
|
||||
bool isOutdatedButActive,
|
||||
bool isFullyOutdated,
|
||||
Timestamp currentTimestamp,
|
||||
Timestamp outdatedTimestamp,
|
||||
Timestamp staleTimestamp) {
|
||||
auto [coll, chunks] = makeCollectionAndChunks(nss,
|
||||
isCurrentlyOwned,
|
||||
isOutdatedButActive,
|
||||
isFullyOutdated,
|
||||
currentTimestamp,
|
||||
outdatedTimestamp,
|
||||
staleTimestamp);
|
||||
createTestCollection(opCtx, nss);
|
||||
auto uuid = *CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss);
|
||||
coll.setUuid(uuid);
|
||||
for (auto& chunk : chunks) {
|
||||
chunk.setCollectionUUID(uuid);
|
||||
}
|
||||
|
||||
DBDirectClient client{opCtx};
|
||||
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace,
|
||||
coll.asShardCatalogType().toBSON());
|
||||
std::vector<BSONObj> chunkBsons;
|
||||
for (const auto& chunk : chunks) {
|
||||
chunkBsons.emplace_back(chunk.toConfigBSON());
|
||||
}
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunkBsons);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ShardCatalogHistoryCleanupTest, ShardCatalogHistoryCleanupCalledOnTimestampMonitorAdvance) {
|
||||
const ShardId kShardName{"testShard"};
|
||||
const ShardId kAnotherShardName{"anotherTestShard"};
|
||||
RAIIServerParameterControllerForTest ddlFlag{"featureFlagAuthoritativeShardsDDL", true};
|
||||
RAIIServerParameterControllerForTest crudFlag{"featureFlagAuthoritativeShardsCRUD", true};
|
||||
|
||||
// Imagining the test is running on a primary in sharded cluster
|
||||
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
|
||||
auto* replCoord = repl::ReplicationCoordinator::get(getServiceContext());
|
||||
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
|
||||
|
||||
ShardingState::get(getServiceContext())
|
||||
->setRecoveryCompleted({OID::gen(),
|
||||
ClusterRole::ShardServer,
|
||||
ConnectionString(HostAndPort("dummy", 1)),
|
||||
kShardName});
|
||||
|
||||
auto opCtx = cc().makeOperationContext();
|
||||
|
||||
auto shardingState = ShardingState::get(getServiceContext());
|
||||
ASSERT_EQ(shardingState->shardId(), ShardId(kShardName));
|
||||
ASSERT(shardingState->enabled());
|
||||
auto opCtx = operationContext();
|
||||
|
||||
auto storageEngine = getServiceContext()->getStorageEngine();
|
||||
auto oldestTimestamp = storageEngine->getOldestTimestamp();
|
||||
auto oldTimestamp = Timestamp(3, 1);
|
||||
storageEngine->setOldestTimestamp(oldTimestamp, false /*force*/);
|
||||
auto staleTimestamp = storageEngine->getOldestTimestamp() + 20;
|
||||
auto oldestTimestamp = staleTimestamp + 10;
|
||||
auto currentTimestamp = oldestTimestamp + 10;
|
||||
|
||||
storageEngine->setOldestTimestamp(oldestTimestamp, false /*force*/);
|
||||
waitForTimestampMonitorPass();
|
||||
auto newTimestamp = Timestamp(4, 1);
|
||||
storageEngine->setOldestTimestamp(newTimestamp, false /*force*/);
|
||||
|
||||
// Filling kConfigShardCatalogChunksNamespace directly through DBClient to avoid
|
||||
// waitForMajorityConcern logic handling
|
||||
auto collectionUUID = UUID::gen();
|
||||
auto epoch = OID::gen();
|
||||
auto placement = CollectionPlacement(1, 1);
|
||||
auto [coll, chunks] = makeCollectionAndChunks(
|
||||
NamespaceStringUtil::deserialize(
|
||||
boost::none, "test.collection", SerializationContext::stateDefault()),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
currentTimestamp,
|
||||
oldestTimestamp,
|
||||
staleTimestamp);
|
||||
|
||||
auto currentlyOwnedChunkType =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 100)),
|
||||
ChunkVersion(CollectionGeneration{epoch, oldTimestamp}, placement),
|
||||
kShardName);
|
||||
currentlyOwnedChunkType.setHistory({ChunkHistory(oldTimestamp, kShardName)});
|
||||
currentlyOwnedChunkType.setOnCurrentShardSince(oldTimestamp);
|
||||
auto outdatedPreviouslyOwnedChunkType =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << 100), BSON("_id" << 200)),
|
||||
ChunkVersion(CollectionGeneration{epoch, oldTimestamp}, placement),
|
||||
kAnotherShardName);
|
||||
outdatedPreviouslyOwnedChunkType.setHistory(
|
||||
{ChunkHistory(oldTimestamp, kAnotherShardName), ChunkHistory(oldestTimestamp, kShardName)});
|
||||
outdatedPreviouslyOwnedChunkType.setOnCurrentShardSince(oldTimestamp);
|
||||
auto notOutdatedPreviouslyOwnedChunkType =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << 200), BSON("_id" << 300)),
|
||||
ChunkVersion(CollectionGeneration{epoch, newTimestamp}, placement),
|
||||
kAnotherShardName);
|
||||
notOutdatedPreviouslyOwnedChunkType.setHistory(
|
||||
{ChunkHistory(newTimestamp, kAnotherShardName), ChunkHistory(oldestTimestamp, kShardName)});
|
||||
notOutdatedPreviouslyOwnedChunkType.setOnCurrentShardSince(newTimestamp);
|
||||
|
||||
DBDirectClient client(opCtx.get());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
currentlyOwnedChunkType.toConfigBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
outdatedPreviouslyOwnedChunkType.toConfigBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
notOutdatedPreviouslyOwnedChunkType.toConfigBSON());
|
||||
std::vector<BSONObj> chunksBSON;
|
||||
for (const auto& chunk : chunks) {
|
||||
chunksBSON.emplace_back(chunk.toConfigBSON());
|
||||
}
|
||||
DBDirectClient client(opCtx);
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunksBSON);
|
||||
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace,
|
||||
coll.asShardCatalogType().toBSON());
|
||||
|
||||
waitForTimestampMonitorPass();
|
||||
waitForTimestampMonitorPass();
|
||||
|
||||
std::vector<ChunkType> chunks;
|
||||
size_t numChunks = 0;
|
||||
FindCommandRequest findRequest{NamespaceString::kConfigShardCatalogChunksNamespace};
|
||||
auto cursor = client.find(std::move(findRequest));
|
||||
while (cursor->more()) {
|
||||
BSONObj obj = cursor->nextSafe().getOwned();
|
||||
StatusWith<ChunkType> chunkRes = ChunkType::parseFromConfigBSON(obj, epoch, newTimestamp);
|
||||
ASSERT(chunkRes.getStatus().isOK());
|
||||
chunks.push_back(chunkRes.getValue());
|
||||
BSONObj obj = cursor->nextSafe();
|
||||
StatusWith<ChunkType> chunkRes =
|
||||
ChunkType::parseFromConfigBSON(obj, coll.getEpoch(), coll.getTimestamp());
|
||||
ASSERT_OK(chunkRes.getStatus());
|
||||
|
||||
const auto& chunk = chunkRes.getValue();
|
||||
|
||||
bool isCurrentlyOwned = chunk.getShard() == kMyShardName;
|
||||
bool isStillVisible = chunk.getOnCurrentShardSince() >= staleTimestamp;
|
||||
LOGV2_INFO(12620100,
|
||||
"Checking chunk",
|
||||
"chunk"_attr = obj,
|
||||
"isCurrentlyOwned"_attr = isCurrentlyOwned,
|
||||
"isStillVisible"_attr = isStillVisible);
|
||||
ASSERT_TRUE(isCurrentlyOwned || isStillVisible);
|
||||
numChunks++;
|
||||
}
|
||||
|
||||
ASSERT_EQ(chunks.size(), 2u);
|
||||
|
||||
// Note: Parsing from ChunkType overrode the chunk version, so we have to set it manually
|
||||
std::vector<ChunkType> expectedChunks = {currentlyOwnedChunkType,
|
||||
notOutdatedPreviouslyOwnedChunkType};
|
||||
for (auto& chunk : expectedChunks) {
|
||||
chunk.setVersion(ChunkVersion(CollectionGeneration(epoch, newTimestamp), placement));
|
||||
}
|
||||
|
||||
std::sort(
|
||||
expectedChunks.begin(),
|
||||
expectedChunks.end(),
|
||||
[](const ChunkType& lhs, const ChunkType& rhs) { return lhs.getRange() < rhs.getRange(); });
|
||||
std::sort(chunks.begin(), chunks.end(), [](const ChunkType& lhs, const ChunkType& rhs) {
|
||||
return lhs.getRange() < rhs.getRange();
|
||||
});
|
||||
|
||||
ASSERT(std::equal(chunks.begin(),
|
||||
chunks.end(),
|
||||
expectedChunks.begin(),
|
||||
expectedChunks.end(),
|
||||
[](const ChunkType& lhs, const ChunkType& rhs) {
|
||||
return lhs.getRange() == rhs.getRange();
|
||||
}));
|
||||
ASSERT_EQ(2, numChunks); // currently owned + unowned but visible
|
||||
}
|
||||
|
||||
TEST_F(ShardCatalogHistoryCleanupTest, DeletesStaleCollectionEntries) {
|
||||
RAIIServerParameterControllerForTest ddlFlag{"featureFlagAuthoritativeShardsDDL", true};
|
||||
RAIIServerParameterControllerForTest crudFlag{"featureFlagAuthoritativeShardsCRUD", true};
|
||||
|
||||
auto storageEngine = getServiceContext()->getStorageEngine();
|
||||
auto oldestTimestamp = storageEngine->getOldestTimestamp() + 20;
|
||||
auto oldTimestamp = oldestTimestamp + 1;
|
||||
storageEngine->setOldestTimestamp(oldTimestamp, false /*force*/);
|
||||
auto newTimestamp = oldTimestamp + 1;
|
||||
|
||||
const auto kStaleNss = NamespaceStringUtil::deserialize(
|
||||
boost::none, "test.stale_collection", SerializationContext::stateDefault());
|
||||
const auto kCurrentNss = NamespaceStringUtil::deserialize(
|
||||
boost::none, "test.valid_collection", SerializationContext::stateDefault());
|
||||
setupCollection(operationContext(),
|
||||
kStaleNss,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
newTimestamp,
|
||||
oldTimestamp,
|
||||
oldestTimestamp);
|
||||
setupCollection(operationContext(),
|
||||
kCurrentNss,
|
||||
true,
|
||||
true,
|
||||
false,
|
||||
newTimestamp,
|
||||
oldTimestamp,
|
||||
oldestTimestamp);
|
||||
|
||||
waitForTimestampMonitorPass();
|
||||
|
||||
DBDirectClient client{operationContext()};
|
||||
ASSERT_EQ(client.count(NamespaceString::kConfigShardCatalogCollectionsNamespace,
|
||||
BSON(CollectionType::kNssFieldName << kCurrentNss.toString_forTest())),
|
||||
1);
|
||||
ASSERT_EQ(client.count(NamespaceString::kConfigShardCatalogCollectionsNamespace,
|
||||
BSON(CollectionType::kNssFieldName << kStaleNss.toString_forTest())),
|
||||
0);
|
||||
const auto currentUuid = *CollectionCatalog::get(operationContext())
|
||||
->lookupUUIDByNSS(operationContext(), kCurrentNss);
|
||||
const auto staleUuid =
|
||||
*CollectionCatalog::get(operationContext())->lookupUUIDByNSS(operationContext(), kStaleNss);
|
||||
ASSERT_EQ(client.count(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
BSON(ChunkType::collectionUUID() << staleUuid)),
|
||||
0);
|
||||
ASSERT_EQ(client.count(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
BSON(ChunkType::collectionUUID() << currentUuid)),
|
||||
2);
|
||||
}
|
||||
} // namespace mongo
|
||||
|
||||
Loading…
Reference in New Issue
Block a user