SERVER-126451 Test deleting entries from sorter table with a write conflict (#54159)
GitOrigin-RevId: 812e9013c48bcc1c0fed5c8d1da26c2d790559d2
This commit is contained in:
parent
0722eba8f4
commit
bac4b3272d
@ -1384,35 +1384,52 @@ class ContainerBasedSpillerWriteConflictTest : public ServiceContextMongoDTest {
|
||||
public:
|
||||
// TODO (SERVER-116165): Remove.
|
||||
RAIIServerParameterControllerForTest ffContainerWrites{"featureFlagContainerWrites", true};
|
||||
|
||||
protected:
|
||||
void setUp() override {
|
||||
ServiceContextMongoDTest::setUp();
|
||||
_opCtx = makeOperationContext();
|
||||
auto* replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
|
||||
repl::ReplicationCoordinator::get(_opCtx.get()));
|
||||
ASSERT(replCoord);
|
||||
replCoord->alwaysAllowWrites(true);
|
||||
|
||||
auto* storageEngine = getServiceContext()->getStorageEngine();
|
||||
{
|
||||
WriteUnitOfWork wuow(_opCtx.get());
|
||||
_tempRS = storageEngine->makeInternalRecordStore(
|
||||
_opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long);
|
||||
wuow.commit();
|
||||
}
|
||||
_container =
|
||||
&std::get<std::reference_wrapper<IntegerKeyedContainer>>(_tempRS->getContainer()).get();
|
||||
}
|
||||
|
||||
OperationContext* opCtx() {
|
||||
return _opCtx.get();
|
||||
}
|
||||
IntegerKeyedContainer& container() {
|
||||
return *_container;
|
||||
}
|
||||
RecoveryUnit& ru() {
|
||||
return *shard_role_details::getRecoveryUnit(_opCtx.get());
|
||||
}
|
||||
|
||||
SorterContainerStats stats{nullptr};
|
||||
|
||||
private:
|
||||
ServiceContext::UniqueOperationContext _opCtx;
|
||||
std::unique_ptr<RecordStore> _tempRS;
|
||||
IntegerKeyedContainer* _container = nullptr;
|
||||
};
|
||||
|
||||
// Calls mergeSpills() with a deterministic WCE on the first merged write. Exercises SERVER-126155
|
||||
// and SERVER-124271.
|
||||
TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnderWCE) {
|
||||
auto opCtx = makeOperationContext();
|
||||
auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
|
||||
repl::ReplicationCoordinator::get(opCtx.get()));
|
||||
ASSERT(replCoord);
|
||||
replCoord->alwaysAllowWrites(true);
|
||||
|
||||
auto* storageEngine = getServiceContext()->getStorageEngine();
|
||||
auto tempRS = [&] {
|
||||
WriteUnitOfWork wuow(opCtx.get());
|
||||
auto rs = storageEngine->makeInternalRecordStore(
|
||||
opCtx.get(), storageEngine->generateNewInternalIdent(), KeyFormat::Long);
|
||||
wuow.commit();
|
||||
return rs;
|
||||
}();
|
||||
auto& container =
|
||||
std::get<std::reference_wrapper<IntegerKeyedContainer>>(tempRS->getContainer()).get();
|
||||
|
||||
SorterContainerStats stats{nullptr};
|
||||
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
|
||||
|
||||
ContainerBasedSpiller<IntWrapper, IntWrapper, IWComparator> spiller{
|
||||
*opCtx,
|
||||
ru,
|
||||
container,
|
||||
*opCtx(),
|
||||
ru(),
|
||||
container(),
|
||||
stats,
|
||||
boost::none,
|
||||
sorter::kLatestChecksumVersion,
|
||||
@ -1461,6 +1478,84 @@ TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsSurvivesCursorResetUnd
|
||||
EXPECT_FALSE(it->more());
|
||||
}
|
||||
|
||||
// Verifies that the writeConflictRetry block in mergeSpills_remove correctly retries when the
|
||||
// storage engine throws a WCE on the first deletion.
|
||||
TEST_F(ContainerBasedSpillerWriteConflictTest, MergeSpillsRemoveSurvivesWCE) {
|
||||
using Spiller = ContainerBasedSpiller<IntWrapper, IntWrapper, IWComparator>;
|
||||
|
||||
std::unique_ptr<FailPointEnableBlock> writeConflict;
|
||||
FailPoint::EntryCountT wceCountBefore = 0;
|
||||
// onSpill fires on every spill() call as well as inside mergeSpills(); only arm during the
|
||||
// merge so the nTimes=1 token is consumed by mergeSpills_remove rather than a later spill's
|
||||
// insert.
|
||||
bool armOnNextOnSpill = false;
|
||||
|
||||
Spiller::SpillCallbacks callbacks{
|
||||
.onSpill =
|
||||
[&] {
|
||||
if (!armOnNextOnSpill) {
|
||||
return;
|
||||
}
|
||||
// _runOnSpill() fires between mergeSpills_insert and mergeSpills_remove. Arming
|
||||
// here guarantees the nTimes=1 token is consumed by the first remove() call.
|
||||
writeConflict = enableWriteConflictForWrites(
|
||||
FailPoint::ModeOptions{.mode = FailPoint::Mode::nTimes, .val = 1});
|
||||
wceCountBefore = writeConflict->initialTimesEntered();
|
||||
armOnNextOnSpill = false;
|
||||
},
|
||||
};
|
||||
|
||||
Spiller spiller{*opCtx(),
|
||||
ru(),
|
||||
container(),
|
||||
stats,
|
||||
boost::none,
|
||||
sorter::kLatestChecksumVersion,
|
||||
std::move(callbacks),
|
||||
/*batchSize=*/1,
|
||||
/*batchBytes=*/std::numeric_limits<int64_t>::max(),
|
||||
testSpillingMinAvailableDiskSpaceBytes};
|
||||
|
||||
std::vector<std::pair<IntWrapper, IntWrapper>> data{
|
||||
{10, 100},
|
||||
{40, 400}, // range 0
|
||||
{20, 200},
|
||||
{50, 500}, // range 1
|
||||
{30, 300},
|
||||
{60, 600}, // range 2
|
||||
};
|
||||
std::span<std::pair<IntWrapper, IntWrapper>> span{data};
|
||||
using SpillerSettings = Spiller::Settings;
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(0, 2));
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(2, 2));
|
||||
spiller.spill(SortOptions{}, SpillerSettings{}, span.subspan(4, 2));
|
||||
ASSERT_EQ(spiller.iterators().size(), 3);
|
||||
|
||||
SorterStats sorterStats{nullptr};
|
||||
armOnNextOnSpill = true;
|
||||
spiller.mergeSpills(SortOptions{},
|
||||
SpillerSettings{},
|
||||
sorterStats,
|
||||
IWComparator(ASC),
|
||||
/*numTargetedSpills=*/1,
|
||||
/*maxSpillsPerMerge=*/3);
|
||||
|
||||
ASSERT_EQ((*writeConflict)->waitForTimesEntered(wceCountBefore + 1), wceCountBefore + 1)
|
||||
<< "Expected exactly one WCE to fire inside mergeSpills_remove";
|
||||
|
||||
ASSERT_EQ(spiller.iterators().size(), 1);
|
||||
const std::vector<std::pair<int, int>> expected{
|
||||
{10, 100}, {20, 200}, {30, 300}, {40, 400}, {50, 500}, {60, 600}};
|
||||
auto it = spiller.iterators()[0];
|
||||
for (const auto& [k, v] : expected) {
|
||||
ASSERT_TRUE(it->more());
|
||||
auto next = it->next();
|
||||
EXPECT_EQ(static_cast<int>(next.first), k);
|
||||
EXPECT_EQ(static_cast<int>(next.second), v);
|
||||
}
|
||||
EXPECT_FALSE(it->more());
|
||||
}
|
||||
|
||||
class ContainerBasedSpillerCallbackTest : public ServiceContextMongoDTest {
|
||||
public:
|
||||
// TODO (SERVER-116165): Remove.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user