diff --git a/src/mongo/db/sorter/container_based_spiller_test.cpp b/src/mongo/db/sorter/container_based_spiller_test.cpp index 09d50414fe2..9f67f927dd5 100644 --- a/src/mongo/db/sorter/container_based_spiller_test.cpp +++ b/src/mongo/db/sorter/container_based_spiller_test.cpp @@ -1384,35 +1384,52 @@ class ContainerBasedSpillerWriteConflictTest : public ServiceContextMongoDTest { public: // TODO (SERVER-116165): Remove. RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true}; + +protected: + void setUp() override { + ServiceContextMongoDTest::setUp(); + _opCtx = makeOperationContext(); + auto* replCoord = dynamic_cast( + repl::ReplicationCoordinator::get(_opCtx.get())); + ASSERT(replCoord); + replCoord->alwaysAllowWrites(true); + + auto* storageEngine = getServiceContext()->getStorageEngine(); + { + WriteUnitOfWork wuow(_opCtx.get()); + _tempRS = storageEngine->makeInternalRecordStore( + _opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long); + wuow.commit(); + } + _container = + &std::get>(_tempRS->getContainer()).get(); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + IntegerKeyedContainer& container() { + return *_container; + } + RecoveryUnit& ru() { + return *shard_role_details::getRecoveryUnit(_opCtx.get()); + } + + SorterContainerStats stats{nullptr}; + +private: + ServiceContext::UniqueOperationContext _opCtx; + std::unique_ptr _tempRS; + IntegerKeyedContainer* _container = nullptr; }; // Calls mergeSpills() with a deterministic WCE on the first merged write. Exercises SERVER-126155 // and SERVER-124271. TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnderWCE) { - auto opCtx = makeOperationContext(); - auto replCoord = dynamic_cast( - repl::ReplicationCoordinator::get(opCtx.get())); - ASSERT(replCoord); - replCoord->alwaysAllowWrites(true); - - auto* storageEngine = getServiceContext()->getStorageEngine(); - auto tempRS = [&] { - WriteUnitOfWork wuow(opCtx.get()); - auto rs = storageEngine->makeInternalRecordStore( - opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long); - wuow.commit(); - return rs; - }(); - auto& container = - std::get>(tempRS->getContainer()).get(); - - SorterContainerStats stats{nullptr}; - auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get()); - ContainerBasedSpiller spiller{ - *opCtx, - ru, - container, + *opCtx(), + ru(), + container(), stats, boost::none, sorter::kLatestChecksumVersion, @@ -1461,6 +1478,84 @@ TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnd EXPECT_FALSE(it->more()); } +// Verifies that the writeConflictRetry block in mergeSpills_remove correctly retries when the +// storage engine throws a WCE on the first deletion. +TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsRemoveSurvivesWCE) { + using Spiller = ContainerBasedSpiller; + + std::unique_ptr writeConflict; + FailPoint::EntryCountT wceCountBefore = 0; + // onSpill fires on every spill() call as well as inside mergeSpills(); only arm during the + // merge so the nTimes=1 token is consumed by mergeSpills_remove rather than a later spill's + // insert. + bool armOnNextOnSpill = false; + + Spiller::SpillCallbacks callbacks{ + .onSpill = + [&] { + if (!armOnNextOnSpill) { + return; + } + // _runOnSpill() fires between mergeSpills_insert and mergeSpills_remove. Arming + // here guarantees the nTimes=1 token is consumed by the first remove() call. + writeConflict = enableWriteConflictForWrites( + FailPoint::ModeOptions{.mode = FailPoint::Mode::nTimes, .val = 1}); + wceCountBefore = writeConflict->initialTimesEntered(); + armOnNextOnSpill = false; + }, + }; + + Spiller spiller{*opCtx(), + ru(), + container(), + stats, + boost::none, + sorter::kLatestChecksumVersion, + std::move(callbacks), + /*batchSize=*/1, + /*batchBytes=*/std::numeric_limits::max(), + testSpillingMinAvailableDiskSpaceBytes}; + + std::vector> data{ + {10, 100}, + {40, 400}, // range 0 + {20, 200}, + {50, 500}, // range 1 + {30, 300}, + {60, 600}, // range 2 + }; + std::span> span{data}; + using SpillerSettings = Spiller::Settings; + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(0, 2)); + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(2, 2)); + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(4, 2)); + ASSERT_EQ(spiller.iterators().size(), 3); + + SorterStats sorterStats{nullptr}; + armOnNextOnSpill = true; + spiller.mergeSpills(SortOptions{}, + SpillerSettings{}, + sorterStats, + IWComparator(ASC), + /*numTargetedSpills=*/1, + /*maxSpillsPerMerge=*/3); + + ASSERT_EQ((*writeConflict)->waitForTimesEntered(wceCountBefore + 1), wceCountBefore + 1) + << "Expected exactly one WCE to fire inside mergeSpills_remove"; + + ASSERT_EQ(spiller.iterators().size(), 1); + const std::vector> expected{ + {10, 100}, {20, 200}, {30, 300}, {40, 400}, {50, 500}, {60, 600}}; + auto it = spiller.iterators()[0]; + for (const auto& [k, v] : expected) { + ASSERT_TRUE(it->more()); + auto next = it->next(); + EXPECT_EQ(static_cast(next.first), k); + EXPECT_EQ(static_cast(next.second), v); + } + EXPECT_FALSE(it->more()); +} + class ContainerBasedSpillerCallbackTest : public ServiceContextMongoDTest { public: // TODO (SERVER-116165): Remove.