SERVER-126200 Remove chunkless references in the authoritative shard catalog (#54004)
GitOrigin-RevId: 9f40ff9934f17e685c57eea190330327c380aadd
This commit is contained in:
parent
231399c6e6
commit
11d71e49c2
@ -788,12 +788,14 @@ const internalCommandsMap = {
|
||||
testname: "_shardsvrCommitRefineCollectionShardKey",
|
||||
command: {
|
||||
_shardsvrCommitRefineCollectionShardKey: "test.x",
|
||||
primaryShardId: "",
|
||||
},
|
||||
},
|
||||
_shardsvrCommitCollModCollectionMetadata: {
|
||||
testname: "_shardsvrCommitCollModCollectionMetadata",
|
||||
command: {
|
||||
_shardsvrCommitCollModCollectionMetadata: "test.x",
|
||||
primaryShardId: "",
|
||||
},
|
||||
},
|
||||
_shardsvrCommitDropCollectionMetadata: {
|
||||
@ -804,11 +806,17 @@ const internalCommandsMap = {
|
||||
testname: "_shardsvrCommitCreateCollectionMetadata",
|
||||
command: {
|
||||
_shardsvrCommitCreateCollectionMetadata: "test.x",
|
||||
primaryShardId: "",
|
||||
},
|
||||
},
|
||||
_shardsvrCommitRenameCollectionMetadata: {
|
||||
testname: "_shardsvrCommitRenameCollectionMetadata",
|
||||
command: {_shardsvrCommitRenameCollectionMetadata: "", fromNss: "test.x", toNss: "test.y", primaryShardId: ""},
|
||||
command: {
|
||||
_shardsvrCommitRenameCollectionMetadata: "",
|
||||
fromNss: "test.x",
|
||||
toNss: "test.y",
|
||||
primaryShardId: "",
|
||||
},
|
||||
},
|
||||
_shardsvrSetAllowMigrations: {
|
||||
testname: "_shardsvrSetAllowMigrations",
|
||||
|
||||
@ -424,8 +424,7 @@ describe("Authoritative collection metadata vs DDLs", function () {
|
||||
describe("convertToCapped", function () {
|
||||
// For a convertToCapped on a tracked unsplittable collection: no node must retain chunks
|
||||
// for the original UUID, the data shard must hold the new UUID's entry with real chunks,
|
||||
// the DB primary must hold the new UUID's entry (chunkless if it is not the data shard),
|
||||
// and any other shard must have no entry for the collection.
|
||||
// the DB primary must hold the new UUID's entry, and any other shard must have no entry for the collection.
|
||||
function assertShardCatalogAfterConvertToCapped(
|
||||
ns,
|
||||
{dataShardRs, primaryRs, newUuid, originalUuid, unsplittableKey},
|
||||
@ -452,8 +451,7 @@ describe("Authoritative collection metadata vs DDLs", function () {
|
||||
});
|
||||
|
||||
if (primaryRs !== dataShardRs) {
|
||||
// Primary is not the data shard: it owns no real chunks but must carry a chunkless
|
||||
// placeholder so disk recovery recognize the collection as tracked.
|
||||
// Primary is not the data shard: it owns no real chunks but must carry the collection entry so disk recovery recognize the collection as tracked.
|
||||
primaryRs.nodes.forEach((node) => {
|
||||
const meta = getShardCatalogCollMetadata(node, ns);
|
||||
assert.neq(null, meta, `${node.host}: chunkless primary is missing collection metadata`);
|
||||
@ -464,11 +462,7 @@ describe("Authoritative collection metadata vs DDLs", function () {
|
||||
);
|
||||
|
||||
const shardChunks = getShardCatalogChunks(node, newUuid);
|
||||
assert.eq(
|
||||
1,
|
||||
shardChunks.length,
|
||||
`${node.host}: chunkless primary must carry exactly one placeholder chunk`,
|
||||
);
|
||||
assert.eq(0, shardChunks.length, `${node.host}: chunkless primary must not have any chunks`);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -145,7 +145,6 @@ void commitToShardCatalog(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const std::vector<ShardId>& participantsOwningChunks,
|
||||
const ShardId& primaryShard,
|
||||
bool isPrimaryOwningChunks,
|
||||
GetSessionFn&& getSession,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
@ -153,20 +152,14 @@ void commitToShardCatalog(OperationContext* opCtx,
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<ShardId> involvedShards = participantsOwningChunks;
|
||||
if (isPrimaryOwningChunks) {
|
||||
involvedShards.push_back(primaryShard);
|
||||
auto shardIds = participantsOwningChunks;
|
||||
if (std::find(shardIds.begin(), shardIds.end(), primaryShard) == shardIds.end()) {
|
||||
shardIds.emplace_back(primaryShard);
|
||||
}
|
||||
|
||||
const auto session = getSession();
|
||||
sharding_ddl_util::commitCollModCollectionMetadataToShardCatalog(
|
||||
opCtx, nss, involvedShards, session, executor, token);
|
||||
|
||||
// The DB primary shard must always know that a collection is tracked, even when it does not own
|
||||
// any chunks.
|
||||
if (!isPrimaryOwningChunks) {
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(opCtx, nss);
|
||||
}
|
||||
opCtx, nss, shardIds, session, executor, token);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
@ -427,7 +420,6 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
|
||||
_collInfo->nsForTargeting,
|
||||
_shardingInfo->participantsOwningChunks,
|
||||
_shardingInfo->primaryShard,
|
||||
_shardingInfo->isPrimaryOwningChunks,
|
||||
[&] { return getNewSession(opCtx); },
|
||||
executor,
|
||||
token);
|
||||
|
||||
@ -410,17 +410,19 @@ ExecutorFuture<void> ConvertToCappedCoordinator::_runImpl(
|
||||
// Install the new incarnation of the collection in the shard catalog.
|
||||
if (isAuthoritative) {
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::commitCreateCollectionMetadataToShardCatalog(
|
||||
opCtx, nss(), {*(_doc.getDataShard())}, session, executor, token);
|
||||
|
||||
// The DB primary shard must always know that the collection is tracked,
|
||||
// even when it does not own any chunks. Persist a chunkless placeholder
|
||||
// locally so that disk recovery can tell a chunkless-tracked collection
|
||||
// apart from an untracked one.
|
||||
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
|
||||
if (primaryShardId != *_doc.getDataShard()) {
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(opCtx, nss());
|
||||
}
|
||||
// even when it does not own any chunks.
|
||||
std::set<ShardId> involvedShards;
|
||||
involvedShards.emplace(ShardingState::get(opCtx)->shardId());
|
||||
involvedShards.emplace(*(_doc.getDataShard()));
|
||||
|
||||
sharding_ddl_util::commitCreateCollectionMetadataToShardCatalog(
|
||||
opCtx,
|
||||
nss(),
|
||||
{involvedShards.begin(), involvedShards.end()},
|
||||
session,
|
||||
executor,
|
||||
token);
|
||||
}
|
||||
|
||||
// Checkpoint the configTime to ensure that, in the case of a stepdown/crash,
|
||||
|
||||
@ -2319,19 +2319,13 @@ void CreateCollectionCoordinator::_commitOnShardCatalog(
|
||||
const auto& cm = uassertStatusOK(
|
||||
Grid::get(opCtx)->catalogCache()->getCollectionPlacementInfoWithRefresh(opCtx, nss()));
|
||||
cm.getAllShardIds(&involvedShards);
|
||||
// The DB primary shard must always know that a collection is tracked, even when it does not
|
||||
// own any chunks.
|
||||
involvedShards.emplace(ShardingState::get(opCtx)->shardId());
|
||||
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::commitCreateCollectionMetadataToShardCatalog(
|
||||
opCtx, nss(), {involvedShards.begin(), involvedShards.end()}, session, executor, token);
|
||||
|
||||
// The DB primary shard must always know that a collection is tracked, even when it does not
|
||||
// own any chunks. We persist a placeholder chunk locally so that (1) disk recovery can
|
||||
// distinguish a chunkless-tracked collection from an untracked one without special-case
|
||||
// logic, and (2) CheckMetadataConsistency can verify the DB primary always has an entry.
|
||||
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
|
||||
if (involvedShards.find(primaryShardId) == involvedShards.end()) {
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(opCtx, nss());
|
||||
}
|
||||
}
|
||||
|
||||
void CreateCollectionCoordinator::_setPostCommitMetadata(
|
||||
|
||||
@ -342,20 +342,12 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
|
||||
|
||||
if (_doc.getAuthoritativeMetadataAccessLevel() >=
|
||||
AuthoritativeMetadataAccessLevelEnum::kWritesAllowed) {
|
||||
const auto involvedShards = getShardsWithDataForCollection(opCtx, nss());
|
||||
// The DB primary shard must always know that a collection is tracked, even when
|
||||
// it does not own any chunks.
|
||||
const auto involvedShards = getDataShardsAndDbPrimaryShard(opCtx, nss());
|
||||
const auto session = getNewSession(opCtx);
|
||||
sharding_ddl_util::commitRefineCollectionShardKeyToShardCatalog(
|
||||
opCtx, nss(), involvedShards, session, executor, token);
|
||||
|
||||
// The DB primary shard must always know that a collection is tracked, even when
|
||||
// it does not own any chunks. We persist a placeholder chunk locally so that
|
||||
// disk recovery can distinguish a chunkless-tracked collection from an
|
||||
// untracked one.
|
||||
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
|
||||
if (std::find(involvedShards.begin(), involvedShards.end(), primaryShardId) ==
|
||||
involvedShards.end()) {
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(opCtx, nss());
|
||||
}
|
||||
}
|
||||
|
||||
// Checkpoint the configTime to ensure that, in the case of a stepdown, the new
|
||||
|
||||
@ -1104,6 +1104,11 @@ commands:
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
fields:
|
||||
primaryShardId:
|
||||
type: shard_id
|
||||
description: Primary shard identifier, used to determine whether the shard needs to durably persist collection metadata or not.
|
||||
optional: false
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrCommitCollModCollectionMetadata:
|
||||
@ -1114,6 +1119,11 @@ commands:
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
fields:
|
||||
primaryShardId:
|
||||
type: shard_id
|
||||
description: Primary shard identifier, used to determine whether the shard needs to durably persist collection metadata or not.
|
||||
optional: false
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrCommitDropCollectionMetadata:
|
||||
@ -1138,6 +1148,11 @@ commands:
|
||||
namespace: type
|
||||
type: namespacestring
|
||||
api_version: ""
|
||||
fields:
|
||||
primaryShardId:
|
||||
type: shard_id
|
||||
description: Primary shard identifier, used to determine whether the shard needs to durably persist collection metadata or not.
|
||||
optional: false
|
||||
reply_type: OkReply
|
||||
|
||||
_shardsvrCommitRenameCollectionMetadata:
|
||||
|
||||
@ -893,7 +893,7 @@ void commitRefineCollectionShardKeyToShardCatalog(
|
||||
const OperationSessionInfo& osi,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
ShardsvrCommitRefineCollectionShardKey request(nss);
|
||||
ShardsvrCommitRefineCollectionShardKey request(nss, ShardingState::get(opCtx)->shardId());
|
||||
request.setDbName(DatabaseName::kAdmin);
|
||||
|
||||
generic_argument_util::setMajorityWriteConcern(request);
|
||||
@ -913,7 +913,7 @@ void commitCollModCollectionMetadataToShardCatalog(
|
||||
const OperationSessionInfo& osi,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
ShardsvrCommitCollModCollectionMetadata request(nss);
|
||||
ShardsvrCommitCollModCollectionMetadata request(nss, ShardingState::get(opCtx)->shardId());
|
||||
request.setDbName(DatabaseName::kAdmin);
|
||||
|
||||
generic_argument_util::setMajorityWriteConcern(request);
|
||||
@ -954,7 +954,7 @@ void commitCreateCollectionMetadataToShardCatalog(
|
||||
const OperationSessionInfo& osi,
|
||||
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
|
||||
const CancellationToken& token) {
|
||||
ShardsvrCommitCreateCollectionMetadata request(nss);
|
||||
ShardsvrCommitCreateCollectionMetadata request(nss, ShardingState::get(opCtx)->shardId());
|
||||
request.setDbName(DatabaseName::kAdmin);
|
||||
|
||||
generic_argument_util::setMajorityWriteConcern(request);
|
||||
|
||||
@ -293,38 +293,6 @@ void clearShardCatalogCacheForDroppedCollection(OperationContext* opCtx,
|
||||
|
||||
} // namespace
|
||||
|
||||
void commitRefineShardKeyLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto coll = fetchCollection(opCtx, nss);
|
||||
auto ownedChunks = fetchOwnedChunks(opCtx, nss, coll);
|
||||
|
||||
// Write to `config.shard.catalog.(collections|chunks)` to insert collection metadata.
|
||||
writeCollectionMetadataLocally(opCtx, nss, coll.asShardCatalogType(), ownedChunks);
|
||||
|
||||
// Delete stale chunks from config.shard.catalog.chunks whose shard key bounds do not match the
|
||||
// refined key pattern. This can occur when the shard catalog has an out-of-date view of the
|
||||
// owned chunk ranges (e.g., due to splits or merges).
|
||||
// TODO (SERVER-121709): Evaluate if this holds once merge/split are authoritative.
|
||||
const int numKeyFields = coll.getKeyPattern().toBSON().nFields();
|
||||
{
|
||||
DBDirectClient dbClient(opCtx);
|
||||
executeLocalDelete(
|
||||
dbClient,
|
||||
NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
BSON(ChunkType::collectionUUID()
|
||||
<< coll.getUuid() << "$expr"
|
||||
<< BSON("$ne" << BSON_ARRAY(BSON("$size" << BSON("$objectToArray" << "$min"))
|
||||
<< numKeyFields))),
|
||||
true /* multi */);
|
||||
}
|
||||
|
||||
// Write an oplog 'c' entry to invalidate collection metadata on secondaries.
|
||||
invalidateCollectionMetadataOnSecondaries(
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
|
||||
// Update this node CSR with collection metadata and chunks.
|
||||
updateShardCatalogCache(opCtx, nss, coll, ownedChunks);
|
||||
}
|
||||
|
||||
void commitDropCollectionLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
const UUID& uuid) {
|
||||
@ -480,34 +448,5 @@ void commitCollectionMetadataLocally(OperationContext* opCtx,
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
}
|
||||
|
||||
void commitChunklessCollectionLocally(OperationContext* opCtx, const NamespaceString& nss) {
|
||||
auto coll = fetchCollection(opCtx, nss);
|
||||
|
||||
// Drop all existing chunk entries for this collection to start from a clean slate. This removes
|
||||
// both real chunks and any prior chunkless placeholder.
|
||||
deleteCollectionChunksMetadataLocally(opCtx, coll.getUuid());
|
||||
|
||||
// This shard does not own any chunks, but we still need the CSS to know the collection is
|
||||
// tracked. Persist a single placeholder chunk so that disk recovery can distinguish a
|
||||
// chunkless-tracked collection from an untracked one without special-case logic.
|
||||
auto range = ChunkRange(coll.getKeyPattern().globalMin(), coll.getKeyPattern().globalMax());
|
||||
ChunkType placeholder(coll.getUuid(),
|
||||
std::move(range),
|
||||
ChunkVersion({coll.getEpoch(), coll.getTimestamp()}, {1, 0}),
|
||||
kChunklessPlaceholderShardId);
|
||||
placeholder.setName(OID::gen());
|
||||
std::vector<ChunkType> placeholderChunks{std::move(placeholder)};
|
||||
|
||||
// Write the collection document and the placeholder chunk to the shard catalog.
|
||||
writeCollectionMetadataLocally(opCtx, nss, coll.asShardCatalogType(), placeholderChunks);
|
||||
|
||||
// Write an oplog 'c' entry to invalidate collection metadata on secondaries.
|
||||
invalidateCollectionMetadataOnSecondaries(
|
||||
opCtx, nss, coll.getUuid(), false /* forDroppedCollection */);
|
||||
|
||||
// Update this node CSR with chunkless tracked metadata.
|
||||
updateShardCatalogCache(opCtx, nss, coll, placeholderChunks);
|
||||
}
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -38,20 +38,6 @@ MONGO_MOD_PARENT_PRIVATE;
|
||||
namespace mongo {
|
||||
namespace shard_catalog_commit {
|
||||
|
||||
/**
|
||||
* Shard id for the placeholder chunk when a collection is tracked on a shard but owns no real
|
||||
* chunks (see commitCreateCollectionChunklessLocally).
|
||||
*/
|
||||
inline const ShardId kChunklessPlaceholderShardId{"__chunkless_placeholder__"};
|
||||
|
||||
/**
|
||||
* Fetches the latest collection metadata and owned chunks from the global catalog, persists them
|
||||
* to the shard catalog (config.shard.catalog.collections and config.shard.catalog.chunks), removes
|
||||
* any stale chunks whose shard key bounds don't match the refined key pattern, writes an oplog
|
||||
* entry to invalidate collection metadata on secondaries, and updates the in-memory
|
||||
* CollectionShardingRuntime (CSR) with the new routing information.
|
||||
*/
|
||||
void commitRefineShardKeyLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
/**
|
||||
* Deletes the collection and chunk metadata from the shard catalog
|
||||
@ -99,14 +85,5 @@ void commitCollectionMetadataLocally(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
bool isDbPrimaryShard = false);
|
||||
|
||||
/**
|
||||
* Fetches the collection metadata from the global catalog, removes any existing chunk entries for
|
||||
* this collection UUID, persists a chunkless placeholder chunk and the updated collection document
|
||||
* to the shard catalog, writes an oplog entry to invalidate secondaries, and refreshes the
|
||||
* in-memory CollectionShardingRuntime. Used for shards that participate in a tracked collection
|
||||
* but own no chunks.
|
||||
*/
|
||||
void commitChunklessCollectionLocally(OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
} // namespace shard_catalog_commit
|
||||
} // namespace mongo
|
||||
|
||||
@ -184,7 +184,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyPersistsCollectionAndC
|
||||
auto [collType, chunks] = makeCollectionMetadata(3);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 3);
|
||||
@ -201,7 +201,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyUpdatesCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
auto metadata = scopedCsr->getCurrentMetadataIfKnown();
|
||||
@ -214,8 +214,8 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyIsIdempotent) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 2);
|
||||
@ -301,7 +301,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, ChunklessCollectionPersistsTokenToDi
|
||||
auto [collType, chunks] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, {});
|
||||
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
|
||||
@ -309,18 +309,15 @@ TEST_F(CommitCollectionMetadataLocallyTest, ChunklessCollectionPersistsTokenToDi
|
||||
ASSERT_EQ(collDocs.size(), 1u);
|
||||
ASSERT_EQ(UUID::fromCDR(collDocs[0].getField("uuid").uuid()), collType.getUuid());
|
||||
|
||||
// The placeholder chunk is persisted so the token survives restarts.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 1);
|
||||
auto chunkDocs = findLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
ASSERT_EQ(UUID::fromCDR(chunkDocs[0].getField(ChunkType::collectionUUID.name()).uuid()),
|
||||
collType.getUuid());
|
||||
// No chunk should be stored in the local chunks for a chunkless collection on the dbPrimary.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 0);
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, ChunklessCollectionUpdatesCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, {});
|
||||
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
auto metadata = scopedCsr->getCurrentMetadataIfKnown();
|
||||
@ -335,23 +332,21 @@ TEST_F(CommitCollectionMetadataLocallyTest, ChunklessCollectionIsIdempotent) {
|
||||
auto [collType, _] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, {});
|
||||
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
// Repeated calls must not accumulate placeholder rows; each call generates a fresh OID, so
|
||||
// this only holds if the helper deletes the prior placeholder before inserting.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 0);
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyChunklessPersistsCollectionWithNewEpoch) {
|
||||
// Seed a chunkless tracked collection at (epoch1, ts1).
|
||||
auto [collType1, _] = makeCollectionMetadata(0);
|
||||
mockCatalogClient()->setCollectionMetadata(collType1, {});
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 0);
|
||||
|
||||
// Simulate a refine to (epoch2, ts2) on the same UUID with an extended key pattern.
|
||||
const OID epoch2 = OID::gen();
|
||||
@ -361,18 +356,13 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyChunklessPersistsColle
|
||||
kTestNss, epoch2, ts2, Date_t::now(), collType1.getUuid(), newKeyPattern};
|
||||
mockCatalogClient()->setCollectionMetadata(collType2, {});
|
||||
|
||||
shard_catalog_commit::commitChunklessCollectionLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
// The collection doc reflects the new triple and exactly one placeholder chunk persists.
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 0);
|
||||
|
||||
auto collDocs = findLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
ASSERT_BSONOBJ_EQ(collDocs[0].getObjectField("key"), newKeyPattern);
|
||||
|
||||
auto chunkDocs = findLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
ASSERT_EQ(UUID::fromCDR(chunkDocs[0].getField(ChunkType::collectionUUID.name()).uuid()),
|
||||
collType1.getUuid());
|
||||
}
|
||||
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyRemovesStaleChunks) {
|
||||
@ -400,7 +390,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, RefineShardKeyRemovesStaleChunks) {
|
||||
}
|
||||
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss, true);
|
||||
|
||||
// The stale chunks should have been removed and replaced with the correct ones.
|
||||
auto chunkDocs = findLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
@ -415,7 +405,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, DropCollectionDeletesMetadata) {
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
// First commit the metadata so there's something to drop.
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogCollectionsNamespace), 1);
|
||||
ASSERT_EQ(countLocalDocs(NamespaceString::kConfigShardCatalogChunksNamespace), 3);
|
||||
|
||||
@ -430,7 +420,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, DropCollectionClearsCSR) {
|
||||
auto [collType, chunks] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType, chunks);
|
||||
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
{
|
||||
auto scopedCsr = CollectionShardingRuntime::acquireShared(operationContext(), kTestNss);
|
||||
@ -456,7 +446,7 @@ TEST_F(CommitCollectionMetadataLocallyTest, DropCollectionIsNoOpOnEmptyCatalog)
|
||||
TEST_F(CommitCollectionMetadataLocallyTest, DropCollectionOnlyDeletesTargetCollection) {
|
||||
auto [collType1, chunks1] = makeCollectionMetadata(2);
|
||||
mockCatalogClient()->setCollectionMetadata(collType1, chunks1);
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(operationContext(), kTestNss);
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(operationContext(), kTestNss);
|
||||
|
||||
// Insert a second collection's data directly.
|
||||
auto otherNss = NamespaceString::createNamespaceString_forTest("TestDB", "OtherColl");
|
||||
|
||||
@ -58,17 +58,13 @@ StorageEngine::TimestampMonitor::TimestampListener kShardCatalogHistoryCleanupTi
|
||||
|
||||
auto shardId = shardingState->shardId();
|
||||
|
||||
// TODO(SERVER-126200): Remove reference to ChunklessPlaceholder.
|
||||
PersistentTaskStore<ChunkType> chunkStore{
|
||||
NamespaceString::kConfigShardCatalogChunksNamespace};
|
||||
try {
|
||||
chunkStore.remove(
|
||||
opCtx,
|
||||
BSON(ChunkType::shard()
|
||||
<< BSON("$nin" << BSON_ARRAY(
|
||||
shardId.toString()
|
||||
<< shard_catalog_commit::kChunklessPlaceholderShardId.toString()))
|
||||
<< ChunkType::onCurrentShardSince() << BSON("$lt" << oldest)));
|
||||
chunkStore.remove(opCtx,
|
||||
BSON(ChunkType::shard()
|
||||
<< BSON("$ne" << shardId.toString())
|
||||
<< ChunkType::onCurrentShardSince() << BSON("$lt" << oldest)));
|
||||
} catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) {
|
||||
// Primary can be killed in the middle of the removal.
|
||||
return;
|
||||
|
||||
@ -77,7 +77,6 @@ public:
|
||||
TEST_F(ShardCatalogHistoryCleanupTest, ShardCatalogHistoryCleanupCalledOnTimestampMonitorAdvance) {
|
||||
const ShardId kShardName{"testShard"};
|
||||
const ShardId kAnotherShardName{"anotherTestShard"};
|
||||
const ShardId kChunklessPlaceholderShardId{"__chunkless_placeholder__"};
|
||||
|
||||
// Imagining the test is running on a primary in sharded cluster
|
||||
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
|
||||
@ -133,13 +132,6 @@ TEST_F(ShardCatalogHistoryCleanupTest, ShardCatalogHistoryCleanupCalledOnTimesta
|
||||
notOutdatedPreviouslyOwnedChunkType.setHistory(
|
||||
{ChunkHistory(newTimestamp, kAnotherShardName), ChunkHistory(oldestTimestamp, kShardName)});
|
||||
notOutdatedPreviouslyOwnedChunkType.setOnCurrentShardSince(newTimestamp);
|
||||
// Note: in reality placeholder would always store MINKEY, MAXKEY range, but in this unit test
|
||||
// we can disregard it
|
||||
auto placehoderOwnedChunkType =
|
||||
ChunkType(collectionUUID,
|
||||
ChunkRange(BSON("_id" << 300), BSON("_id" << MAXKEY)),
|
||||
ChunkVersion(CollectionGeneration{epoch, Timestamp(0, 0)}, placement),
|
||||
kChunklessPlaceholderShardId);
|
||||
|
||||
DBDirectClient client(opCtx.get());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
@ -148,8 +140,6 @@ TEST_F(ShardCatalogHistoryCleanupTest, ShardCatalogHistoryCleanupCalledOnTimesta
|
||||
outdatedPreviouslyOwnedChunkType.toConfigBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
notOutdatedPreviouslyOwnedChunkType.toConfigBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
placehoderOwnedChunkType.toConfigBSON());
|
||||
|
||||
waitForTimestampMonitorPass();
|
||||
waitForTimestampMonitorPass();
|
||||
@ -164,11 +154,11 @@ TEST_F(ShardCatalogHistoryCleanupTest, ShardCatalogHistoryCleanupCalledOnTimesta
|
||||
chunks.push_back(chunkRes.getValue());
|
||||
}
|
||||
|
||||
ASSERT_EQ(chunks.size(), 3u);
|
||||
ASSERT_EQ(chunks.size(), 2u);
|
||||
|
||||
// Note: Parsing from ChunkType overrode the chunk version, so we have to set it manually
|
||||
std::vector<ChunkType> expectedChunks = {
|
||||
currentlyOwnedChunkType, notOutdatedPreviouslyOwnedChunkType, placehoderOwnedChunkType};
|
||||
std::vector<ChunkType> expectedChunks = {currentlyOwnedChunkType,
|
||||
notOutdatedPreviouslyOwnedChunkType};
|
||||
for (auto& chunk : expectedChunks) {
|
||||
chunk.setVersion(ChunkVersion(CollectionGeneration(epoch, newTimestamp), placement));
|
||||
}
|
||||
|
||||
@ -999,21 +999,11 @@ TEST_F(AuthoritativeRefreshFixture, TrackedCollectionWithNoChunksOnDiskRecovered
|
||||
const Timestamp timestamp(Date_t::now());
|
||||
CollectionType collType{kTestNss, epoch, timestamp, Date_t::now(), uuid, kShardKeyPattern};
|
||||
|
||||
auto keyPattern = KeyPattern(kShardKeyPattern);
|
||||
auto range = ChunkRange(keyPattern.globalMin(), keyPattern.globalMax());
|
||||
ChunkType placeholder(uuid,
|
||||
std::move(range),
|
||||
ChunkVersion({epoch, timestamp}, {1, 0}),
|
||||
shard_catalog_commit::kChunklessPlaceholderShardId);
|
||||
placeholder.setName(OID::gen());
|
||||
|
||||
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
|
||||
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
|
||||
{
|
||||
DBDirectClient client(opCtx);
|
||||
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
|
||||
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
|
||||
placeholder.toConfigBSON());
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@ -95,7 +95,10 @@ public:
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(newOpCtx.get(), ns());
|
||||
bool isPrimaryDbShard =
|
||||
ShardingState::get(newOpCtx.get())->shardId() == request().getPrimaryShardId();
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(
|
||||
newOpCtx.get(), ns(), isPrimaryDbShard);
|
||||
}
|
||||
|
||||
LOGV2_INFO(12591501,
|
||||
|
||||
@ -95,7 +95,10 @@ public:
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(newOpCtx.get(), ns());
|
||||
bool isPrimaryDbShard =
|
||||
ShardingState::get(newOpCtx.get())->shardId() == request().getPrimaryShardId();
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(
|
||||
newOpCtx.get(), ns(), isPrimaryDbShard);
|
||||
}
|
||||
|
||||
LOGV2_INFO(
|
||||
|
||||
@ -94,7 +94,10 @@ public:
|
||||
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor());
|
||||
newOpCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
|
||||
|
||||
shard_catalog_commit::commitRefineShardKeyLocally(newOpCtx.get(), nss);
|
||||
bool isPrimaryDbShard =
|
||||
ShardingState::get(newOpCtx.get())->shardId() == request().getPrimaryShardId();
|
||||
shard_catalog_commit::commitCollectionMetadataLocally(
|
||||
newOpCtx.get(), nss, isPrimaryDbShard);
|
||||
}
|
||||
|
||||
LOGV2_INFO(10281504,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user