SERVER-127232 Perform last index build spill before transition to load (#54108)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> GitOrigin-RevId: 5bc778a8a8d6a763d89023dc28c087b13dfd988d
This commit is contained in:
parent
4c0895b375
commit
dda936f1df
@ -964,6 +964,8 @@ public:
|
||||
const OnSuppressedErrorFn& onSuppressedError = nullptr,
|
||||
const ShouldRelaxConstraintsFn& shouldRelaxConstraints = nullptr) final;
|
||||
|
||||
void done() final;
|
||||
|
||||
Status commit(OperationContext* opCtx,
|
||||
RecoveryUnit& ru,
|
||||
const CollectionPtr* collection,
|
||||
@ -1036,6 +1038,7 @@ private:
|
||||
MultikeyPaths _indexMultikeyPaths;
|
||||
|
||||
std::unique_ptr<Sorter> _sorter;
|
||||
std::unique_ptr<Iterator> _sortedIterator;
|
||||
ContainerWriteBehavior _containerWriteBehavior;
|
||||
// We start out with container::ExistingKeyPolicy::reject because it's not safe to write blindly
|
||||
// unless we know for certain that we're inserting something that is definitely not already in
|
||||
@ -1217,6 +1220,12 @@ Status BulkBuilderImpl::insert(OperationContext* opCtx,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BulkBuilderImpl::done() {
|
||||
invariant(_sorter);
|
||||
tassert(12723200, "BulkBuilder::done called more than once", !_sortedIterator);
|
||||
_sortedIterator = _sorter->done();
|
||||
}
|
||||
|
||||
Status BulkBuilderImpl::commit(OperationContext* opCtx,
|
||||
RecoveryUnit& ru,
|
||||
const CollectionPtr* collection,
|
||||
@ -1232,10 +1241,11 @@ Status BulkBuilderImpl::commit(OperationContext* opCtx,
|
||||
const size_t keyBatchBytes) {
|
||||
uassert(
|
||||
ErrorCodes::BadValue, "onNKeysLoadedFnInterval must be >= 1", onNKeysLoadedFnInterval >= 1);
|
||||
tassert(12723201, "BulkBuilder::done must be called before commit", _sortedIterator);
|
||||
Timer timer;
|
||||
|
||||
_ns = entry->getNSSFromCatalog(opCtx);
|
||||
auto it = _sorter->done();
|
||||
auto it = std::move(_sortedIterator);
|
||||
|
||||
ProgressMeterHolder pm;
|
||||
{
|
||||
|
||||
@ -263,7 +263,14 @@ public:
|
||||
const ShouldRelaxConstraintsFn& shouldRelaxConstraints = nullptr) = 0;
|
||||
|
||||
/**
|
||||
* Call this when you are ready to finish your bulk work.
|
||||
* Signals to the builder that inserting has been completed. Must be called exactly once,
|
||||
* before commit().
|
||||
*/
|
||||
virtual void done() = 0;
|
||||
|
||||
/**
|
||||
* Commit the data that was inserted. done() must have been called first.
|
||||
*
|
||||
* @param dupsAllowed - If false and 'dupRecords' is not null, append with the RecordIds of
|
||||
* the uninserted duplicates.
|
||||
* @param yieldIterations - The number of iterations run before each yielding. Will not
|
||||
|
||||
@ -417,6 +417,85 @@ TEST_F(IndexAccessMethodBulkBuilder, CommitRejectsZeroInterval) {
|
||||
ErrorCodes::BadValue);
|
||||
}
|
||||
|
||||
TEST_F(IndexAccessMethodBulkBuilder, DoneCalledTwiceThrows) {
|
||||
ServiceContext::UniqueOperationContext opCtxRaii = cc().makeOperationContext();
|
||||
auto* opCtx = opCtxRaii.get();
|
||||
auto nss = NamespaceString::createNamespaceString_forTest(
|
||||
"IndexAccessMethodBulkBuilder.DoneCalledTwiceThrows");
|
||||
auto indexName = "a_1";
|
||||
auto indexSpec = BSON("name" << indexName << "key" << BSON("a" << 1) << "v"
|
||||
<< static_cast<int>(IndexDescriptor::IndexVersion::kV2));
|
||||
ASSERT_OK(createIndexFromSpec(opCtx, nss.ns_forTest(), indexSpec));
|
||||
|
||||
AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_X);
|
||||
auto indexEntry = autoColl->getIndexCatalog()->findIndexByName(opCtx, indexName);
|
||||
auto indexAccessMethod = indexEntry->accessMethod()->asSortedData();
|
||||
|
||||
auto bulk = indexAccessMethod->initiateBulk(opCtx,
|
||||
*autoColl,
|
||||
indexEntry,
|
||||
/*spiller=*/nullptr,
|
||||
/*maxMemoryUsageBytes=*/128 * 1024 * 1024,
|
||||
/*stateInfo=*/boost::none,
|
||||
nss.dbName(),
|
||||
ContainerWriteBehavior::kDoNotReplicate);
|
||||
ASSERT(bulk);
|
||||
|
||||
bulk->done();
|
||||
ASSERT_THROWS_WITH_CHECK(bulk->done(), DBException, [](const DBException& ex) {
|
||||
EXPECT_EQ(ex.code(), 12723200);
|
||||
assertionCount.tripwire.subtractAndFetch(1);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(IndexAccessMethodBulkBuilder, CommitWithoutDoneThrows) {
|
||||
ServiceContext::UniqueOperationContext opCtxRaii = cc().makeOperationContext();
|
||||
auto* opCtx = opCtxRaii.get();
|
||||
auto nss = NamespaceString::createNamespaceString_forTest(
|
||||
"IndexAccessMethodBulkBuilder.CommitWithoutDoneThrows");
|
||||
auto indexName = "a_1";
|
||||
auto indexSpec = BSON("name" << indexName << "key" << BSON("a" << 1) << "v"
|
||||
<< static_cast<int>(IndexDescriptor::IndexVersion::kV2));
|
||||
ASSERT_OK(createIndexFromSpec(opCtx, nss.ns_forTest(), indexSpec));
|
||||
|
||||
AutoGetCollection autoColl{opCtx, nss, LockMode::MODE_X};
|
||||
auto indexEntry = autoColl->getIndexCatalog()->findIndexByName(opCtx, indexName);
|
||||
auto indexAccessMethod = indexEntry->accessMethod()->asSortedData();
|
||||
|
||||
auto bulk = indexAccessMethod->initiateBulk(opCtx,
|
||||
*autoColl,
|
||||
indexEntry,
|
||||
/*spiller=*/nullptr,
|
||||
/*maxMemoryUsageBytes=*/128 * 1024 * 1024,
|
||||
/*stateInfo=*/boost::none,
|
||||
nss.dbName(),
|
||||
ContainerWriteBehavior::kDoNotReplicate);
|
||||
ASSERT(bulk);
|
||||
|
||||
ASSERT_THROWS_WITH_CHECK(bulk->commit(opCtx,
|
||||
*shard_role_details::getRecoveryUnit(opCtx),
|
||||
&*autoColl,
|
||||
indexEntry,
|
||||
/*dupsAllowed=*/true,
|
||||
/*yieldIterations=*/0,
|
||||
IndexAccessMethod::KeyHandlerFn{
|
||||
[](const CollectionPtr&, const key_string::View&) {
|
||||
return Status::OK();
|
||||
}},
|
||||
IndexAccessMethod::RecordIdHandlerFn{},
|
||||
IndexAccessMethod::YieldFn{},
|
||||
IndexAccessMethod::OnNKeysLoadedFn{[]() {
|
||||
}},
|
||||
/*onNKeysLoadedFnInterval=*/1,
|
||||
/*keyBatchSize=*/1,
|
||||
/*keyBatchBytes=*/1024),
|
||||
DBException,
|
||||
[](const DBException& ex) {
|
||||
EXPECT_EQ(ex.code(), 12723201);
|
||||
assertionCount.tripwire.subtractAndFetch(1);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -1137,6 +1137,17 @@ Status MultiIndexBlock::dumpInsertsFromBulk(
|
||||
_phase == IndexBuildPhaseEnum::kCollectionScan ||
|
||||
_phase == IndexBuildPhaseEnum::kBulkLoad,
|
||||
idl::serialize(_phase));
|
||||
|
||||
// Finalize all builders (which may perform a final spill) before transitioning to the load
|
||||
// phase.
|
||||
try {
|
||||
for (auto&& index : _indexes) {
|
||||
index.bulk->done();
|
||||
}
|
||||
} catch (...) {
|
||||
return exceptionToStatus();
|
||||
}
|
||||
|
||||
_phase = IndexBuildPhaseEnum::kBulkLoad;
|
||||
|
||||
// Doesn't allow yielding when in a foreground index build.
|
||||
|
||||
@ -2745,6 +2745,78 @@ TEST_F(MultiIndexBlockTest, DoNotWriteStateToContainerOnSpillWhenNotResumable) {
|
||||
|
||||
indexer.abortIndexBuild(operationContext(), coll, MultiIndexBlock::kNoopOnCleanUpFn);
|
||||
}
|
||||
|
||||
TEST_F(MultiIndexBlockTest, AllSpillsDuringScanPersistScanPhase) {
|
||||
RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true};
|
||||
RAIIServerParameterControllerForTest ffPDIB{"featureFlagPrimaryDrivenIndexBuilds", true};
|
||||
RAIIServerParameterControllerForTest ffResumable{"featureFlagResumablePrimaryDrivenIndexBuilds",
|
||||
true};
|
||||
RAIIServerParameterControllerForTest memUsage{"maxIndexBuildMemoryUsageMegabytes", 2};
|
||||
RAIIServerParameterControllerForTest iteratorsMemoryPct{"maxIteratorsMemoryUsagePercentage", 1};
|
||||
|
||||
promoteMockReplCoordToPrimary(getServiceContext());
|
||||
auto& observer = installResumeStateContainerObserver(operationContext());
|
||||
|
||||
auto& indexer = *getIndexer();
|
||||
AutoGetCollection autoColl{operationContext(), getNSS(), MODE_X};
|
||||
CollectionWriter coll{operationContext(), autoColl};
|
||||
|
||||
auto buildUUID = UUID::gen();
|
||||
indexer.setBuildUUID(buildUUID);
|
||||
indexer.setIndexBuildMethod(IndexBuildMethodEnum::kPrimaryDriven);
|
||||
indexer.setContainerWriteBehavior(ContainerWriteBehavior::kReplicate);
|
||||
indexer.setIsResumable(true);
|
||||
|
||||
{
|
||||
WriteUnitOfWork wuow{operationContext()};
|
||||
std::string val(64 * 1024, 'a');
|
||||
for (auto i = 0; i < 300; ++i) {
|
||||
ASSERT_OK(
|
||||
Helpers::insert(operationContext(), *autoColl, BSON("_id" << i << "a" << val)));
|
||||
}
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
auto indexBuildInfo =
|
||||
IndexBuildInfo(BSON("key" << BSON("a" << 1) << "name"
|
||||
<< "a_1"
|
||||
<< "v" << static_cast<int>(IndexConfig::kLatestIndexVersion)),
|
||||
"index-1",
|
||||
*operationContext()->getServiceContext()->getStorageEngine());
|
||||
|
||||
ASSERT_OK(indexer.init(operationContext(),
|
||||
coll,
|
||||
{indexBuildInfo},
|
||||
MultiIndexBlock::kNoopOnInitFn,
|
||||
MultiIndexBlock::InitMode::SteadyState,
|
||||
boost::none));
|
||||
|
||||
auto indexBuildIdent = ident::generateNewIndexBuildIdent(buildUUID);
|
||||
|
||||
ASSERT_OK(indexer.insertAllDocumentsInCollection(operationContext(), getNSS()));
|
||||
EXPECT_GE(observer.countInsertsForIdent(indexBuildIdent) +
|
||||
observer.countUpdatesForIdent(indexBuildIdent),
|
||||
1);
|
||||
|
||||
// Every write must persist that the index build is in the scan phase.
|
||||
auto check = [&](const std::vector<ResumeStateContainerInsertObserver::Op>& ops) {
|
||||
for (const auto& op : ops) {
|
||||
if (op.ident != indexBuildIdent) {
|
||||
continue;
|
||||
}
|
||||
auto info = ResumeIndexInfo::parse(BSONObj(op.value.data()),
|
||||
IDLParserContext("ResumeIndexInfo"));
|
||||
EXPECT_EQ(IndexBuildPhaseEnum::kCollectionScan, info.getPhase());
|
||||
EXPECT_EQ(buildUUID, info.getBuildUUID());
|
||||
}
|
||||
};
|
||||
check(observer.inserts);
|
||||
check(observer.updates);
|
||||
|
||||
indexer.abortIndexBuild(operationContext(), coll, MultiIndexBlock::kNoopOnCleanUpFn);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(MultiIndexBlockTest, LoadWritesResumeStatePeriodicallyForPrimaryDrivenBuild) {
|
||||
RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true};
|
||||
RAIIServerParameterControllerForTest ffPDIB{"featureFlagPrimaryDrivenIndexBuilds", true};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user