SERVER-124271 Sort spill merge can read freed WT cursor memory after a WriteConflictException (#54035)
Co-authored-by: Gregory Noma <gregory.noma@gmail.com> GitOrigin-RevId: be7f2ad13b9e198d5cbcebc873e65a72c31398e7
This commit is contained in:
parent
00fbf91ff5
commit
2e622cf6b0
@ -128,7 +128,9 @@ public:
|
||||
_compareChecksums();
|
||||
|
||||
auto key = Key::deserializeForSorter(reader, _settings.first);
|
||||
_deferredValue.emplace(result.data() + reader.offset(), result.size() - reader.offset());
|
||||
// Eagerly deserialize the value so it owns its own memory. Otherwise, it could become
|
||||
// invalidated by a cursor reset.
|
||||
_deferredValue = Value::deserializeForSorter(reader, _settings.second);
|
||||
|
||||
return std::move(key);
|
||||
}
|
||||
@ -137,11 +139,10 @@ public:
|
||||
uassert(
|
||||
10896303, "Must precede getDeferredValue with nextWithDeferredValue", _deferredValue);
|
||||
|
||||
BufReader reader{_deferredValue->data(), static_cast<unsigned>(_deferredValue->size())};
|
||||
_deferredValue = boost::none;
|
||||
_consume();
|
||||
|
||||
return Value::deserializeForSorter(reader, _settings.second);
|
||||
auto value = std::move(*_deferredValue);
|
||||
_deferredValue = boost::none;
|
||||
return value;
|
||||
}
|
||||
|
||||
const Key& peek() override {
|
||||
@ -214,7 +215,7 @@ private:
|
||||
int64_t _end;
|
||||
bool _positioned = false;
|
||||
Iterator<Key, Value>::Settings _settings;
|
||||
boost::optional<std::span<const char>> _deferredValue;
|
||||
boost::optional<Value> _deferredValue;
|
||||
|
||||
// Checksum value that is updated with each read of a data object from a container. We can
|
||||
// compare this value with _originalChecksum to check for data corruption if and only if the
|
||||
|
||||
@ -38,12 +38,16 @@
|
||||
#include "mongo/db/sorter/sorter_test_utils.h"
|
||||
#include "mongo/db/storage/container.h"
|
||||
#include "mongo/db/storage/ident.h"
|
||||
#include "mongo/db/storage/key_format.h"
|
||||
#include "mongo/db/storage/record_store.h"
|
||||
#include "mongo/db/storage/record_store_test_harness.h"
|
||||
#include "mongo/db/storage/recovery_unit_noop.h"
|
||||
#include "mongo/db/storage/storage_engine.h"
|
||||
#include "mongo/db/storage/storage_options.h"
|
||||
#include "mongo/db/storage/write_unit_of_work.h"
|
||||
#include "mongo/unittest/death_test.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/fail_point.h"
|
||||
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
@ -1376,6 +1380,87 @@ TEST(ContainerIteratorTest, RecoverCursorAfterAbandoningSnapshot) {
|
||||
EXPECT_FALSE(iter.more());
|
||||
}
|
||||
|
||||
class ContainerBasedSpillerWriteConflictTest : public ServiceContextMongoDTest {
|
||||
public:
|
||||
// TODO (SERVER-116165): Remove.
|
||||
RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true};
|
||||
};
|
||||
|
||||
// Calls mergeSpills() with a deterministic WCE on the first merged write. Exercises SERVER-126155
|
||||
// and SERVER-124271.
|
||||
TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnderWCE) {
|
||||
auto opCtx = makeOperationContext();
|
||||
auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
|
||||
repl::ReplicationCoordinator::get(opCtx.get()));
|
||||
ASSERT(replCoord);
|
||||
replCoord->alwaysAllowWrites(true);
|
||||
|
||||
auto* storageEngine = getServiceContext()->getStorageEngine();
|
||||
auto tempRS = [&] {
|
||||
WriteUnitOfWork wuow(opCtx.get());
|
||||
auto rs = storageEngine->makeInternalRecordStore(
|
||||
opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long);
|
||||
wuow.commit();
|
||||
return rs;
|
||||
}();
|
||||
auto& container =
|
||||
std::get<std::reference_wrapper<IntegerKeyedContainer>>(tempRS->getContainer()).get();
|
||||
|
||||
SorterContainerStats stats{nullptr};
|
||||
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
|
||||
|
||||
ContainerBasedSpiller<IntWrapper, IntWrapper, IWComparator> spiller{
|
||||
*opCtx,
|
||||
ru,
|
||||
container,
|
||||
stats,
|
||||
boost::none,
|
||||
sorter::kLatestChecksumVersion,
|
||||
ContainerBasedSpiller<IntWrapper, IntWrapper, IWComparator>::SpillCallbacks{},
|
||||
/*batchSize=*/1,
|
||||
/*batchBytes=*/std::numeric_limits<int64_t>::max(),
|
||||
testSpillingMinAvailableDiskSpaceBytes};
|
||||
|
||||
std::vector<std::pair<IntWrapper, IntWrapper>> data{
|
||||
{10, 100},
|
||||
{40, 400}, // range 0
|
||||
{20, 200},
|
||||
{50, 500}, // range 1
|
||||
{30, 300},
|
||||
{60, 600}, // range 2
|
||||
};
|
||||
std::span<std::pair<IntWrapper, IntWrapper>> span{data};
|
||||
using SpillerSettings = Spiller<IntWrapper, IntWrapper, IWComparator>::Settings;
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(0, 2));
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(2, 2));
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(4, 2));
|
||||
ASSERT_EQ(spiller.iterators().size(), 3);
|
||||
|
||||
// Fires WCE on the first merged write.
|
||||
auto writeConflict = enableWriteConflictForWrites(
|
||||
FailPoint::ModeOptions{.mode = FailPoint::Mode::nTimes, .val = 1});
|
||||
|
||||
SorterStats sorterStats{nullptr};
|
||||
spiller.mergeSpills(SortOptions{},
|
||||
SpillerSettings{},
|
||||
sorterStats,
|
||||
IWComparator(ASC),
|
||||
/*numTargetedSpills=*/1,
|
||||
/*maxSpillsPerMerge=*/3);
|
||||
|
||||
ASSERT_EQ(spiller.iterators().size(), 1);
|
||||
const std::vector<std::pair<int, int>> expected{
|
||||
{10, 100}, {20, 200}, {30, 300}, {40, 400}, {50, 500}, {60, 600}};
|
||||
auto it = spiller.iterators()[0];
|
||||
for (const auto& [k, v] : expected) {
|
||||
ASSERT_TRUE(it->more());
|
||||
auto next = it->next();
|
||||
EXPECT_EQ(static_cast<int>(next.first), k);
|
||||
EXPECT_EQ(static_cast<int>(next.second), v);
|
||||
}
|
||||
EXPECT_FALSE(it->more());
|
||||
}
|
||||
|
||||
class ContainerBasedSpillerCallbackTest : public ServiceContextMongoDTest {
|
||||
public:
|
||||
// TODO (SERVER-116165): Remove.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user