SERVER-112119 hang OplogCapMaintainerThread during initial sampling, not after (#42726)

GitOrigin-RevId: e1e5c9586d607a8aaa8f71bf8c4babe4d888c287
This commit is contained in:
adelinexchen 2025-10-16 17:33:38 +11:00 committed by MongoDB Bot
parent e08e44656e
commit 58fa16f921
8 changed files with 131 additions and 81 deletions

View File

@ -8,6 +8,23 @@
import {kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
// Checks if a set of entries all exist or all do not exist in the oplog
// if allExist == true, I want to check that all entries exist in oplog
// if allExist == false, I want to check all entries do not exist in oplog
function entryInOplog(allExist, entries, oplog) {
const cursor = oplog.find({ns: "test.markers"});
while (cursor.hasNext()) {
const entry = cursor.next();
jsTest.log.info("Checking " + tojson(entry));
entries.forEach((id) => {
if (id == entry.o["_id"]) {
return false;
}
});
}
return true;
}
// Constants for replica set and test configuration
const oplogSizeMB = 1; // Small oplog size in MB
const longString = "a".repeat(450 * 1024); // Large document size (~500KB)
@ -23,7 +40,6 @@ const rst = new ReplSetTest({
setParameter: {
logComponentVerbosity: tojson({storage: 1}),
minOplogTruncationPoints: 2,
"failpoint.hangOplogCapMaintainerThread": tojson({mode: "alwaysOn"}),
internalQueryExecYieldPeriodMS: 86400000, // Disable yielding
},
},
@ -42,7 +58,13 @@ rst.stopSet(null, true);
jsTest.log.info("Replica set stopped for restart.");
clearRawMongoProgramOutput();
rst.startSet(null, true); // Restart replica set
rst.startSet({
restart: true,
setParameter: {
"failpoint.hangDuringOplogSampling": tojson({mode: "alwaysOn"}),
"oplogSamplingAsyncEnabled": true,
},
}); // Restart replica set
const restartedPrimary = rst.getPrimary();
const restartedPrimaryOplog = restartedPrimary.getDB("local").getCollection("oplog.rs");
jsTest.log.info("Replica set restarted.");
@ -50,7 +72,7 @@ jsTest.log.info("Replica set restarted.");
// // Verify that the oplog cap maintainer thread is paused.
assert.commandWorked(
restartedPrimary.adminCommand({
waitForFailPoint: "hangOplogCapMaintainerThread",
waitForFailPoint: "hangDuringOplogSampling",
timesEntered: 1,
maxTimeMS: kDefaultWaitForFailPointTimeout,
}),
@ -78,20 +100,32 @@ const secondInsertTimestamp =
.operationTime;
jsTest.log.info("Second insert timestamp: " + tojson(secondInsertTimestamp));
// Check inserts exists
assert.soon(() => {
let foundCount = 0;
const cursor = restartedPrimaryOplog.find({ns: "test.markers"});
while (cursor.hasNext()) {
const entry = cursor.next();
jsTest.log.info("Checking " + tojson(entry));
largeDocIDs.forEach((id) => {
if (id == entry.o["_id"]) {
foundCount++;
}
});
}
return foundCount == 2;
});
// Take a checkpoint
restartedPrimary.getDB("admin").runCommand({fsync: 1});
// Verify truncate marker creation resumes post-startup
checkLog.containsJson(restartedPrimary, 8423403); // Log ID for startup finished
// Fetch server status and verify truncation metrics
let serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
const truncationCount = serverStatus.oplogTruncation.truncateCount;
// Resume oplog truncate marker creation
jsTest.log.info("Resuming oplog truncate marker creation.");
assert.commandWorked(restartedPrimary.adminCommand(
{configureFailPoint: "hangOplogCapMaintainerThread", mode: "off"}));
assert.commandWorked(
restartedPrimary.adminCommand({configureFailPoint: "hangDuringOplogSampling", mode: "off"}));
// Verify truncate markers are created and logged
checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate markers calculated
@ -100,23 +134,24 @@ checkLog.containsJson(restartedPrimary, 22382); // Log ID: Oplog truncate marke
for (let i = 0; i < 50; i++) {
coll.insert({_id: nextId++, longString: longString});
}
restartedPrimary.getDB("admin").runCommand({fsync: 1});
// Wait for truncation to occur
// Verify large documents inserted during intial sampling are eventually truncated from the oplog
assert.soon(() => {
serverStatus = restartedPrimary.getDB("admin").runCommand({serverStatus: 1});
return serverStatus.oplogTruncation.truncateCount > truncationCount;
const cursor = restartedPrimaryOplog.find({ns: "test.markers"});
while (cursor.hasNext()) {
const entry = cursor.next();
jsTest.log.info("Checking " + tojson(entry));
largeDocIDs.forEach((id) => {
if (id == entry.o["_id"]) {
return false;
}
});
}
return true;
});
// Verify large documents were truncated from the oplog
const cursor = restartedPrimaryOplog.find({ns: "test.markers"});
while (cursor.hasNext()) {
const entry = cursor.next();
jsTest.log.info("Checking " + tojson(entry));
largeDocIDs.forEach((id) => {
assert.neq(id, entry.o["_id"], "Unexpected _id entry in oplog.");
});
}
jsTest.log.info("Test complete. Stopping replica set.");
rst.stopSet();

View File

@ -842,10 +842,10 @@ public:
ASSERT_FALSE(truncateMarkers.isEmpty());
// Confirm neither the whole marker or partial marker are expired.
const auto numWholeMarkersBefore = truncateMarkers.numMarkers_forTest();
const auto numWholeMarkersBefore = truncateMarkers.numMarkers();
assertNoExpiredMarker(truncateMarkers);
truncateMarkers.createPartialMarkerIfNecessary(opCtx());
const auto numWholeMarkersAfter = truncateMarkers.numMarkers_forTest();
const auto numWholeMarkersAfter = truncateMarkers.numMarkers();
ASSERT_EQ(numWholeMarkersBefore, numWholeMarkersAfter);
expirePreImage(kPreImage1, expireAfterSeconds);

View File

@ -274,15 +274,15 @@ public:
return _creationMethod;
}
//
// The following methods are public only for use in tests.
//
size_t numMarkers_forTest() const {
size_t numMarkers() const {
stdx::lock_guard<stdx::mutex> lk(_markersMutex);
return _markers.size();
}
//
// The following methods are public only for use in tests.
//
int64_t currentBytes_forTest() const {
return _currentBytes.load();
}

View File

@ -274,14 +274,14 @@ void createNewMarkerTest(CollectionMarkersTest* fixture, std::string collectionN
{
auto opCtx = fixture->getClient()->makeOperationContext();
ASSERT_EQ(0U, testMarkers->numMarkers_forTest());
ASSERT_EQ(0U, testMarkers->numMarkers());
// Inserting a record smaller than 'minBytesPerMarker' shouldn't create a new collection
// marker.
auto insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 99, Timestamp(1, 1), RecordId(1, 1));
ASSERT_EQ(insertedRecordId, RecordId(1, 1));
ASSERT_EQ(0U, testMarkers->numMarkers_forTest());
ASSERT_EQ(0U, testMarkers->numMarkers());
ASSERT_EQ(1, testMarkers->currentRecords_forTest());
ASSERT_EQ(99, testMarkers->currentBytes_forTest());
@ -290,7 +290,7 @@ void createNewMarkerTest(CollectionMarkersTest* fixture, std::string collectionN
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 51, Timestamp(1, 2), RecordId(1, 2));
ASSERT_EQ(insertedRecordId, RecordId(1, 2));
ASSERT_EQ(1U, testMarkers->numMarkers_forTest());
ASSERT_EQ(1U, testMarkers->numMarkers());
ASSERT_EQ(0, testMarkers->currentRecords_forTest());
ASSERT_EQ(0, testMarkers->currentBytes_forTest());
@ -300,7 +300,7 @@ void createNewMarkerTest(CollectionMarkersTest* fixture, std::string collectionN
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 50, Timestamp(1, 3), RecordId(1, 3));
ASSERT_EQ(insertedRecordId, RecordId(1, 3));
ASSERT_EQ(1U, testMarkers->numMarkers_forTest());
ASSERT_EQ(1U, testMarkers->numMarkers());
ASSERT_EQ(1, testMarkers->currentRecords_forTest());
ASSERT_EQ(50, testMarkers->currentBytes_forTest());
@ -309,7 +309,7 @@ void createNewMarkerTest(CollectionMarkersTest* fixture, std::string collectionN
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 50, Timestamp(1, 4), RecordId(1, 4));
ASSERT_EQ(insertedRecordId, RecordId(1, 4));
ASSERT_EQ(2U, testMarkers->numMarkers_forTest());
ASSERT_EQ(2U, testMarkers->numMarkers());
ASSERT_EQ(0, testMarkers->currentRecords_forTest());
ASSERT_EQ(0, testMarkers->currentBytes_forTest());
@ -318,7 +318,7 @@ void createNewMarkerTest(CollectionMarkersTest* fixture, std::string collectionN
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 101, Timestamp(1, 5), RecordId(1, 5));
ASSERT_EQ(insertedRecordId, RecordId(1, 5));
ASSERT_EQ(3U, testMarkers->numMarkers_forTest());
ASSERT_EQ(3U, testMarkers->numMarkers());
ASSERT_EQ(0, testMarkers->currentRecords_forTest());
ASSERT_EQ(0, testMarkers->currentBytes_forTest());
}
@ -344,11 +344,11 @@ void ascendingOrderTest(CollectionMarkersTest* fixture, std::string collectionNa
{
auto opCtx = fixture->getClient()->makeOperationContext();
ASSERT_EQ(0U, testMarkers->numMarkers_forTest());
ASSERT_EQ(0U, testMarkers->numMarkers());
auto insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 50, Timestamp(2, 2), RecordId(2, 2));
ASSERT_EQ(insertedRecordId, RecordId(2, 2));
ASSERT_EQ(0U, testMarkers->numMarkers_forTest());
ASSERT_EQ(0U, testMarkers->numMarkers());
ASSERT_EQ(1, testMarkers->currentRecords_forTest());
ASSERT_EQ(50, testMarkers->currentBytes_forTest());
@ -357,7 +357,7 @@ void ascendingOrderTest(CollectionMarkersTest* fixture, std::string collectionNa
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 50, Timestamp(2, 1), RecordId(2, 1));
ASSERT_EQ(insertedRecordId, RecordId(2, 1));
ASSERT_EQ(1U, testMarkers->numMarkers_forTest());
ASSERT_EQ(1U, testMarkers->numMarkers());
ASSERT_EQ(0, testMarkers->currentRecords_forTest());
ASSERT_EQ(0, testMarkers->currentBytes_forTest());
@ -367,7 +367,7 @@ void ascendingOrderTest(CollectionMarkersTest* fixture, std::string collectionNa
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 100, Timestamp(1, 1), RecordId(1, 1));
ASSERT_EQ(insertedRecordId, RecordId(1, 1));
ASSERT_EQ(1U, testMarkers->numMarkers_forTest());
ASSERT_EQ(1U, testMarkers->numMarkers());
ASSERT_EQ(1, testMarkers->currentRecords_forTest());
ASSERT_EQ(100, testMarkers->currentBytes_forTest());
@ -376,7 +376,7 @@ void ascendingOrderTest(CollectionMarkersTest* fixture, std::string collectionNa
insertedRecordId = fixture->insertWithSpecificTimestampAndRecordId(
opCtx.get(), collNs, *testMarkers, 50, Timestamp(2, 3), RecordId(2, 3));
ASSERT_EQ(insertedRecordId, RecordId(2, 3));
ASSERT_EQ(2U, testMarkers->numMarkers_forTest());
ASSERT_EQ(2U, testMarkers->numMarkers());
ASSERT_EQ(0, testMarkers->currentRecords_forTest());
ASSERT_EQ(0, testMarkers->currentBytes_forTest());
}

View File

@ -151,13 +151,13 @@ TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampleAndU
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
// Continue finishing the initial scan / sample
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// Confirm that some truncate markers were generated.
ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_LT(0U, oplogTruncateMarkers->numMarkers());
} // namespace repl
// In async mode, during startup but before sampling finishes,
@ -230,7 +230,7 @@ TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampling)
oplogTruncateMarkers->getMarkersCreationMethod());
// Confirm that some truncate markers were generated.
ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers();
ASSERT_GT(truncateMarkersBefore, 0U);
ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
}
@ -249,7 +249,7 @@ TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeCreateOplo
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}

View File

@ -117,6 +117,8 @@ public:
truncateMarkers->getCreationProcessingTime().count());
builder.append("processingMethod", "scanning");
}
builder.appendNumber("truncateMarkersCount",
static_cast<long long>(truncateMarkers->numMarkers()));
}
}

View File

@ -37,6 +37,7 @@
#include "mongo/db/transaction_resources.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/fail_point.h"
#include <memory>
@ -46,7 +47,8 @@ namespace mongo {
namespace {
const double kNumMSInHour = 1000 * 60 * 60;
}
MONGO_FAIL_POINT_DEFINE(hangDuringOplogSampling);
} // namespace
std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::createEmptyOplogTruncateMarkers(
RecordStore& rs) {
@ -88,6 +90,7 @@ std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::sampleAndUpdate(Oper
// We need to read the whole oplog, override the recoveryUnit's oplogVisibleTimestamp.
ScopedOplogVisibleTimestamp scopedOplogVisibleTimestamp(
shard_role_details::getRecoveryUnit(opCtx), boost::none);
std::unique_ptr<CollectionTruncateMarkers::CollectionIterator> iterator;
if (gOplogSamplingAsyncEnabled && gOplogSamplingAsyncYieldIntervalMs >= 0) {
iterator = std::make_unique<YieldableCollectionIterator>(
@ -95,6 +98,14 @@ std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::sampleAndUpdate(Oper
} else {
iterator = std::make_unique<UnyieldableCollectionIterator>(opCtx, &rs);
}
if (MONGO_unlikely(hangDuringOplogSampling.shouldFail())) {
LOGV2(11211900,
"Hanging the oplog cap maintainer thread during intial sampling due "
"to fail point");
hangDuringOplogSampling.pauseWhileSet(opCtx);
}
auto initialSetOfMarkers = CollectionTruncateMarkers::createFromCollectionIterator(
opCtx,
*iterator,
@ -109,6 +120,8 @@ std::shared_ptr<OplogTruncateMarkers> OplogTruncateMarkers::sampleAndUpdate(Oper
numTruncateMarkersToKeep);
LOGV2(22382,
"Record store oplog processing finished",
"markersCount"_attr = initialSetOfMarkers.markers.size(),
"markerCreationMethod"_attr = toString(initialSetOfMarkers.methodUsed),
"duration"_attr = duration_cast<Milliseconds>(initialSetOfMarkers.timeTaken));
LOGV2(
10621110, "Initial set of markers created.", "Oplog size (in bytes)"_attr = rs.dataSize());

View File

@ -172,19 +172,19 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
oplogTruncateMarkers->setMinBytesPerMarker(100);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
// Inserting a record smaller than 'minBytesPerTruncateMarker' shouldn't create a new oplog
// truncate marker.
insertOplog(1, 99);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(99, oplogTruncateMarkers->currentBytes_forTest());
// Inserting another record such that their combined size exceeds
// 'minBytesPerTruncateMarker' should cause a new truncate marker to be created.
insertOplog(2, 51);
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
@ -192,7 +192,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
// one exceed 'minBytesPerTruncateMarker' shouldn't cause a new truncate marker to be
// created because we've started filling a new truncate marker.
insertOplog(3, 50);
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
@ -200,14 +200,14 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
// one is exactly equal to 'minBytesPerTruncateMarker' should cause a new truncate marker to
// be created.
insertOplog(4, 50);
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
// Inserting a single record that exceeds 'minBytesPerTruncateMarker' should cause a new
// truncate marker to be created.
insertOplog(5, 101);
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -236,7 +236,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Truncate) {
insertOplog(t, size);
}
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(size, oplogTruncateMarkers->currentBytes_forTest());
@ -247,7 +247,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Truncate) {
ASSERT_EQ(0, rs->dataSize());
ASSERT_EQ(0, rs->numRecords());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -275,7 +275,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_UpdateRecord) {
auto obj2 = insertOplog(2, 50);
storage.oplogDiskLocRegister(opCtx, Timestamp{1, 2}, true);
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
@ -311,7 +311,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_UpdateRecord) {
TimestampedBSONObj update2 = {BSON("$set" << changed2), {}};
ASSERT_OK(storage.updateSingleton(opCtx, oplogNs, BSON("ts" << obj2["ts"]), update2));
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
}
@ -350,7 +350,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
ASSERT_EQ(9, rs->numRecords());
ASSERT_EQ(2600, rs->dataSize());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(3, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(300, oplogTruncateMarkers->currentBytes_forTest());
}
@ -370,7 +370,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
ASSERT_EQ(7, rs->numRecords());
ASSERT_EQ(2350, rs->dataSize());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
}
@ -388,7 +388,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
ASSERT_EQ(5, rs->numRecords());
ASSERT_EQ(1950, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(3, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(750, oplogTruncateMarkers->currentBytes_forTest());
}
@ -402,7 +402,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
recovery.truncateOplogToTimestamp(opCtx, Timestamp(1, 3));
ASSERT_EQ(3, rs->numRecords());
ASSERT_EQ(1400, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(200, oplogTruncateMarkers->currentBytes_forTest());
}
@ -414,7 +414,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
recovery.truncateOplogToTimestamp(opCtx, Timestamp(1, 2));
ASSERT_EQ(2, rs->numRecords());
ASSERT_EQ(1200, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -425,7 +425,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
recovery.truncateOplogToTimestamp(opCtx, Timestamp(1, 1));
ASSERT_EQ(1, rs->numRecords());
ASSERT_EQ(400, rs->dataSize());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(400, oplogTruncateMarkers->currentBytes_forTest());
}
@ -458,7 +458,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(3, rs->numRecords());
ASSERT_EQ(330, rs->dataSize());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -476,7 +476,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(3, rs->numRecords());
ASSERT_EQ(330, rs->dataSize());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -492,7 +492,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(2, rs->numRecords());
ASSERT_EQ(230, rs->dataSize());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -504,7 +504,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(5, rs->numRecords());
ASSERT_EQ(550, rs->dataSize());
ASSERT_EQ(4U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(4U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
}
@ -520,7 +520,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(2, rs->numRecords());
ASSERT_EQ(190, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
}
@ -536,7 +536,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(2, rs->numRecords());
ASSERT_EQ(190, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
}
@ -549,7 +549,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(4, rs->numRecords());
ASSERT_EQ(500, rs->dataSize());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -563,7 +563,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(3, rs->numRecords());
ASSERT_EQ(360, rs->dataSize());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -575,7 +575,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(5, rs->numRecords());
ASSERT_EQ(660, rs->dataSize());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(3U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -590,7 +590,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(2, rs->numRecords());
ASSERT_EQ(300, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -603,7 +603,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(3, rs->numRecords());
ASSERT_EQ(390, rs->dataSize());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(90, oplogTruncateMarkers->currentBytes_forTest());
}
@ -617,7 +617,7 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
ASSERT_EQ(1, rs->numRecords());
ASSERT_EQ(90, rs->dataSize());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(90, oplogTruncateMarkers->currentBytes_forTest());
}
@ -640,16 +640,16 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_AscendingOrder) {
oplogTruncateMarkers->setMinBytesPerMarker(100);
{
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
insertOplog(2, 2, 50); // Timestamp(2, 2)
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
// Inserting a record that has a smaller RecordId than the previously inserted record should
// be able to create a new truncate marker when no truncate markers already exist.
insertOplog(2, 1, 50); // Timestamp(2, 1)
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
@ -657,14 +657,14 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_AscendingOrder) {
// truncate marker's last record shouldn't cause a new truncate marker to be created, even
// if the size of the inserted record exceeds 'minBytesPerTruncateMarker'.
insertOplog(1, 100); // Timestamp(1, 1)
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(100, oplogTruncateMarkers->currentBytes_forTest());
// Inserting a record that has a larger RecordId than the most recently created truncate
// marker's last record should then cause a new truncate marker to be created.
insertOplog(2, 3, 50); // Timestamp(2, 3)
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
@ -704,7 +704,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_NoMarkersGeneratedFromScanning)
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Scanning,
oplogTruncateMarkers->getMarkersCreationMethod());
ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
auto numMarkers = oplogTruncateMarkers->numMarkers_forTest();
auto numMarkers = oplogTruncateMarkers->numMarkers();
ASSERT_EQ(numMarkers, 0U);
// A forced scan over the RecordStore should force the 'currentBytes' to be accurate in the
@ -752,7 +752,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
oplogTruncateMarkers->getMarkersCreationMethod());
ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers();
ASSERT_GT(truncateMarkersBefore, 0U);
ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
@ -761,7 +761,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
advanceStableTimestamp(Timestamp(1, 4));
auto mayTruncateUpTo = RecordId(engine->getPinnedOplog().asULL());
oplog_truncation::reclaimOplog(opCtx, *rs, mayTruncateUpTo);
ASSERT_EQ(truncateMarkersBefore, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(truncateMarkersBefore, oplogTruncateMarkers->numMarkers());
// Reduce the oplog size to ensure we create a truncate marker and truncate on the next
// insert.
@ -782,7 +782,7 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
advanceStableTimestamp(Timestamp(1, 6));
mayTruncateUpTo = RecordId(engine->getPinnedOplog().asULL());
oplog_truncation::reclaimOplog(opCtx, *rs, mayTruncateUpTo);
ASSERT_EQ(1, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1, oplogTruncateMarkers->numMarkers());
// The original oplog should have rolled over and the size and count should be accurate.
ASSERT_EQ(1, wtRS->numRecords());