diff --git a/src/mongo/db/sorter/container_based_spiller.h b/src/mongo/db/sorter/container_based_spiller.h index 8c1f02a58e5..b0645f55679 100644 --- a/src/mongo/db/sorter/container_based_spiller.h +++ b/src/mongo/db/sorter/container_based_spiller.h @@ -128,7 +128,9 @@ public: _compareChecksums(); auto key = Key::deserializeForSorter(reader, _settings.first); - _deferredValue.emplace(result.data() + reader.offset(), result.size() - reader.offset()); + // Eagerly deserialize the value so it owns its own memory. Otherwise, it could become + // invalidated by a cursor reset. + _deferredValue = Value::deserializeForSorter(reader, _settings.second); return std::move(key); } @@ -137,11 +139,10 @@ public: uassert( 10896303, "Must precede getDeferredValue with nextWithDeferredValue", _deferredValue); - BufReader reader{_deferredValue->data(), static_cast(_deferredValue->size())}; - _deferredValue = boost::none; _consume(); - - return Value::deserializeForSorter(reader, _settings.second); + auto value = std::move(*_deferredValue); + _deferredValue = boost::none; + return value; } const Key& peek() override { @@ -214,7 +215,7 @@ private: int64_t _end; bool _positioned = false; Iterator::Settings _settings; - boost::optional> _deferredValue; + boost::optional _deferredValue; // Checksum value that is updated with each read of a data object from a container. We can // compare this value with _originalChecksum to check for data corruption if and only if the diff --git a/src/mongo/db/sorter/container_based_spiller_test.cpp b/src/mongo/db/sorter/container_based_spiller_test.cpp index c073074844c..09d50414fe2 100644 --- a/src/mongo/db/sorter/container_based_spiller_test.cpp +++ b/src/mongo/db/sorter/container_based_spiller_test.cpp @@ -38,12 +38,16 @@ #include "mongo/db/sorter/sorter_test_utils.h" #include "mongo/db/storage/container.h" #include "mongo/db/storage/ident.h" +#include "mongo/db/storage/key_format.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/record_store_test_harness.h" #include "mongo/db/storage/recovery_unit_noop.h" +#include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/db/storage/write_unit_of_work.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point.h" #include #include @@ -1376,6 +1380,87 @@ TEST(ContainerIteratorTest, RecoverCursorAfterAbandoningSnapshot) { EXPECT_FALSE(iter.more()); } +class ContainerBasedSpillerWriteConflictTest : public ServiceContextMongoDTest { +public: + // TODO (SERVER-116165): Remove. + RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true}; +}; + +// Calls mergeSpills() with a deterministic WCE on the first merged write. Exercises SERVER-126155 +// and SERVER-124271. +TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnderWCE) { + auto opCtx = makeOperationContext(); + auto replCoord = dynamic_cast( + repl::ReplicationCoordinator::get(opCtx.get())); + ASSERT(replCoord); + replCoord->alwaysAllowWrites(true); + + auto* storageEngine = getServiceContext()->getStorageEngine(); + auto tempRS = [&] { + WriteUnitOfWork wuow(opCtx.get()); + auto rs = storageEngine->makeInternalRecordStore( + opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long); + wuow.commit(); + return rs; + }(); + auto& container = + std::get>(tempRS->getContainer()).get(); + + SorterContainerStats stats{nullptr}; + auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get()); + + ContainerBasedSpiller spiller{ + *opCtx, + ru, + container, + stats, + boost::none, + sorter::kLatestChecksumVersion, + ContainerBasedSpiller::SpillCallbacks{}, + /*batchSize=*/1, + /*batchBytes=*/std::numeric_limits::max(), + testSpillingMinAvailableDiskSpaceBytes}; + + std::vector> data{ + {10, 100}, + {40, 400}, // range 0 + {20, 200}, + {50, 500}, // range 1 + {30, 300}, + {60, 600}, // range 2 + }; + std::span> span{data}; + using SpillerSettings = Spiller::Settings; + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(0, 2)); + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(2, 2)); + spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(4, 2)); + ASSERT_EQ(spiller.iterators().size(), 3); + + // Fires WCE on the first merged write. + auto writeConflict = enableWriteConflictForWrites( + FailPoint::ModeOptions{.mode = FailPoint::Mode::nTimes, .val = 1}); + + SorterStats sorterStats{nullptr}; + spiller.mergeSpills(SortOptions{}, + SpillerSettings{}, + sorterStats, + IWComparator(ASC), + /*numTargetedSpills=*/1, + /*maxSpillsPerMerge=*/3); + + ASSERT_EQ(spiller.iterators().size(), 1); + const std::vector> expected{ + {10, 100}, {20, 200}, {30, 300}, {40, 400}, {50, 500}, {60, 600}}; + auto it = spiller.iterators()[0]; + for (const auto& [k, v] : expected) { + ASSERT_TRUE(it->more()); + auto next = it->next(); + EXPECT_EQ(static_cast(next.first), k); + EXPECT_EQ(static_cast(next.second), v); + } + EXPECT_FALSE(it->more()); +} + class ContainerBasedSpillerCallbackTest : public ServiceContextMongoDTest { public: // TODO (SERVER-116165): Remove.