SERVER-110649 Move maintainer thread to start/stop storage controls (#41152)

Co-authored-by: Gregory Wlodarek <gregory.wlodarek@mongodb.com>
GitOrigin-RevId: 5b4df9850057bcca0bf0a66783847d4b0df6657a
This commit is contained in:
clarissecheah 2025-09-11 20:54:03 +10:00 committed by MongoDB Bot
parent f1ef2c7dae
commit 1b098d48d8
14 changed files with 478 additions and 299 deletions

View File

@ -38,6 +38,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/storage/oplog_truncate_marker_parameters_gen.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/storage/storage_options.h"
@ -87,11 +88,11 @@ RecordStore* LocalOplogInfo::getRecordStore() const {
void LocalOplogInfo::setRecordStore(OperationContext* opCtx, RecordStore* rs) {
stdx::lock_guard<stdx::mutex> lk(_rsMutex);
_rs = rs;
// If the server was started in read-only mode or if we are restoring the node, skip
// calculating the oplog truncate markers. The OplogCapMaintainerThread does not get started
// in this instance.
// If the server was started in read-only mode, if we are restoring the node, or if async
// sampling is enabled, skip calculating the oplog truncate markers here.
bool needsTruncateMarkers = opCtx->getServiceContext()->userWritesAllowed() &&
!storageGlobalParams.repair && !repl::ReplSettings::shouldSkipOplogSampling();
!storageGlobalParams.repair && !repl::ReplSettings::shouldSkipOplogSampling() &&
!gOplogSamplingAsyncEnabled;
if (needsTruncateMarkers) {
_truncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
}

View File

@ -1968,18 +1968,10 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
shutdownChangeCollectionExpiredDocumentsRemover(serviceContext);
}
{
SectionScopedTimer scopedTimer(serviceContext->getFastClockSource(),
TimedSectionId::shutDownOplogCapMaintainer,
&shutdownTimeElapsedBuilder);
OplogCapMaintainerThread::get(serviceContext)->shutdown();
}
// We should always be able to acquire the global lock at shutdown.
//
// For a Windows service, dbexit does not call exit(), so we must leak the lock outside
// of this function to prevent any operations from running that need a lock.
//
LOGV2(4784929, "Acquiring the global lock for shutdown");
shard_role_details::getLocker(opCtx)->lockGlobal(opCtx, MODE_X);

View File

@ -1086,8 +1086,6 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx,
return;
}
_storage->initializeStorageControlsForReplication(opCtx->getServiceContext());
// We are expected to be able to transition out of the kConfigStartingUp state by the end
// of this function. Any uncaught exceptions here leave us in an invalid state and we will
// not be able to shut down by normal means, as clean shutdown assumes we can leave that state.

View File

@ -478,14 +478,6 @@ public:
*/
virtual bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const = 0;
/**
* Responsible for initializing independent processes for replication that manage
* and interact with the storage layer.
*
* Initializes the OplogCapMaintainerThread to control deletion of oplog truncate markers.
*/
virtual void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const = 0;
/**
* Returns the stable timestamp that the storage engine recovered to on startup. If the
* recovery point was not stable, returns "none".

View File

@ -1509,18 +1509,6 @@ bool StorageInterfaceImpl::supportsRecoveryTimestamp(ServiceContext* serviceCtx)
return serviceCtx->getStorageEngine()->supportsRecoveryTimestamp();
}
void StorageInterfaceImpl::initializeStorageControlsForReplication(
ServiceContext* serviceCtx) const {
// The storage engine may support the use of OplogTruncateMarkers to more finely control
// oplog history deletion, in which case we need to start the thread to
// periodically execute deletion via oplog truncate markers. OplogTruncateMarkers are a
// replacement for capped collection deletion of the oplog collection history.
if (!ReplSettings::shouldSkipOplogSampling()) {
auto maintainerThread = OplogCapMaintainerThread::get(serviceCtx);
maintainerThread->start();
}
}
boost::optional<Timestamp> StorageInterfaceImpl::getRecoveryTimestamp(
ServiceContext* serviceCtx) const {
return serviceCtx->getStorageEngine()->getRecoveryTimestamp();

View File

@ -218,8 +218,6 @@ public:
bool supportsRecoveryTimestamp(ServiceContext* serviceCtx) const override;
void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override;
boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override;
Timestamp getAllDurableTimestamp(ServiceContext* serviceCtx) const override;

View File

@ -369,8 +369,6 @@ public:
return false;
}
void initializeStorageControlsForReplication(ServiceContext* serviceCtx) const override {}
boost::optional<Timestamp> getRecoveryTimestamp(ServiceContext* serviceCtx) const override {
return boost::none;
}

View File

@ -350,6 +350,7 @@ mongo_cc_library(
"//src/mongo:base",
"//src/mongo/db:service_context",
"//src/mongo/db/commands:test_commands_enabled", # TODO(SERVER-93876): Remove.
"//src/mongo/db/storage:oplog_truncation",
],
)
@ -758,6 +759,19 @@ mongo_cc_unit_test(
],
)
mongo_cc_unit_test(
name = "oplog_async_sampling_test",
srcs = [
"oplog_async_sampling_test.cpp",
],
tags = ["mongo_unittest_sixth_group"],
deps = [
"//src/mongo/db/repl:oplog_applier_impl_test_fixture",
"//src/mongo/db/repl:replication_recovery",
"//src/mongo/db/repl:storage_interface",
],
)
mongo_cc_library(
name = "sorted_data_interface_test_assert",
hdrs = [

View File

@ -178,6 +178,7 @@ void CollectionTruncateMarkers::setMinBytesPerMarker(int64_t size) {
void CollectionTruncateMarkers::initialSamplingFinished() {
stdx::lock_guard<stdx::mutex> lk(_markersMutex);
LOGV2_DEBUG(10167200, 2, "Initial sampling finished marked true.");
_initialSamplingFinished = true;
}

View File

@ -32,9 +32,12 @@
#include "mongo/base/status.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/assert_util.h"
@ -91,6 +94,18 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly) {
Checkpointer::set(serviceContext, std::move(checkpointer));
}
// TODO SERVER-110729 Remove the replication dependencies here.
if (!forTestOnly &&
repl::ReplicationCoordinator::get(serviceContext)->getSettings().isReplSet() &&
!repl::ReplSettings::shouldSkipOplogSampling() &&
!storageGlobalParams.queryableBackupMode && !storageGlobalParams.repair &&
serviceContext->userWritesAllowed()) {
std::unique_ptr<OplogCapMaintainerThread> maintainerThread =
std::make_unique<OplogCapMaintainerThread>();
OplogCapMaintainerThread::set(serviceContext, std::move(maintainerThread));
OplogCapMaintainerThread::get(serviceContext)->go();
}
areControlsStarted = true;
}
@ -108,6 +123,11 @@ void stopStorageControls(ServiceContext* serviceContext, const Status& reason, b
if (checkpointer) {
checkpointer->shutdown(reason);
}
if (OplogCapMaintainerThread* maintainerThread =
OplogCapMaintainerThread::get(serviceContext);
maintainerThread) {
maintainerThread->shutdown(reason);
}
areControlsStarted = false;
} else {

View File

@ -0,0 +1,257 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/client.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/local_catalog/catalog_raii.h"
#include "mongo/db/local_catalog/index_catalog.h"
#include "mongo/db/local_catalog/index_catalog_entry.h"
#include "mongo/db/local_catalog/index_descriptor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/storage/oplog_truncation.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
#include <algorithm>
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <utility>
#include <boost/container/vector.hpp>
#include <boost/move/utility_core.hpp>
#include <boost/none.hpp>
#include <boost/optional.hpp>
#include <fmt/format.h>
namespace mongo {
namespace repl {
const auto& oplogNs = NamespaceString::kRsOplogNamespace;
class AsyncOplogTruncationTest : public ServiceContextMongoDTest {
protected:
OperationContext* getOperationContext() {
return _opCtx.get();
}
StorageInterface& getStorage() {
return _storage;
}
BSONObj makeBSONObjWithSize(unsigned int seconds, unsigned int t, int size, char fill = 'x') {
Timestamp opTime{seconds, t};
Date_t wallTime = Date_t::fromMillisSinceEpoch(t);
BSONObj objTemplate = BSON("ts" << opTime << "wall" << wallTime << "str"
<< "");
ASSERT_LTE(objTemplate.objsize(), size);
std::string str(size - objTemplate.objsize(), fill);
BSONObj obj = BSON("ts" << opTime << "wall" << wallTime << "str" << str);
ASSERT_EQ(size, obj.objsize());
return obj;
}
BSONObj makeBSONObjWithSize(unsigned int t, int size, char fill = 'x') {
return makeBSONObjWithSize(1, t, size, fill);
}
BSONObj insertOplog(unsigned int seconds, unsigned int t, int size) {
auto obj = makeBSONObjWithSize(seconds, t, size);
AutoGetOplogFastPath oplogWrite(_opCtx.get(), OplogAccessMode::kWrite);
const auto& oplog = oplogWrite.getCollection();
std::vector<Record> records{{RecordId(), RecordData(obj.objdata(), obj.objsize())}};
std::vector<Timestamp> timestamps{Timestamp()};
WriteUnitOfWork wuow(_opCtx.get());
ASSERT_OK(internal::insertDocumentsForOplog(_opCtx.get(), oplog, &records, timestamps));
wuow.commit();
return obj;
}
BSONObj insertOplog(unsigned int t, int size) {
return insertOplog(1, t, size);
}
private:
void setUp() override {
ServiceContextMongoDTest::setUp();
_opCtx = cc().makeOperationContext();
auto service = getServiceContext();
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(service);
ReplicationCoordinator::set(service, std::move(replCoord));
// Turn on async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", true);
repl::createOplog(_opCtx.get());
}
void tearDown() override {
_opCtx.reset(nullptr);
ServiceContextMongoDTest::tearDown();
}
ServiceContext::UniqueOperationContext _opCtx;
StorageInterfaceImpl _storage;
};
// TODO SERVER-101672 Re-enable these tests.
// In async mode, sampleAndUpdate is called seperately from createOplogTruncateMarkers, and
// creates the initial set of markers.
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampleAndUpdate) {
// // Turn off async mode
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
// "oplogSamplingAsyncEnabled", true);
// auto opCtx = getOperationContext();
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// // Populate oplog to force marker creation to occur
// int realNumRecords = 4;
// int realSizePerRecord = 1024 * 1024;
// for (int i = 1; i <= realNumRecords; i++) {
// insertOplog(i, realSizePerRecord);
// }
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
// ASSERT(oplogTruncateMarkers);
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
// // Continue finishing the initial scan / sample
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// // Confirm that some truncate markers were generated.
// ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
// } // namespace repl
// In async mode, during startup but before sampling finishes, creation method is InProgress.
// This should then resolve to either the Scanning or Sampling method once initial marker
// creation has finished TEST_F(AsyncOplogTruncationTest,
// OplogTruncateMarkers_AsynchronousModeInProgressState) { Turn off async mode
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
// "oplogSamplingAsyncEnabled", true);
// auto opCtx = getOperationContext();
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// // Populate oplog to so that initial marker creation method is not EmptyCollection
// insertOplog(1, 100);
// // Note if in async mode, at this point we have not yet sampled.
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
// ASSERT(oplogTruncateMarkers);
// // Confirm that we are in InProgress state since sampling/scanning has not begun.
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::InProgress,
// oplogTruncateMarkers->getMarkersCreationMethod());
// // Continue finishing the initial scan / sample
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// // Check that the InProgress state has now been resolved.
// ASSERT(oplogTruncateMarkers->getMarkersCreationMethod() ==
// CollectionTruncateMarkers::MarkersCreationMethod::Scanning);
}
// In async mode, we are still able to sample when expected, and some markers can be created.
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampling) {
// Turn off async mode
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
// "oplogSamplingAsyncEnabled", true);
// auto opCtx = getOperationContext();
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
// {
// // Before initializing the RecordStore, populate with a few records.
// insertOplog(1, 100);
// insertOplog(2, 100);
// insertOplog(3, 100);
// insertOplog(4, 100);
// }
// {
// // Force initialize the oplog truncate markers to use sampling by providing very
// large,
// // inaccurate sizes. This should cause us to over sample the records in the oplog.
// ASSERT_OK(wtRS->updateSize(1024 * 1024 * 1024));
// wtRS->setNumRecords(1024 * 1024);
// wtRS->setDataSize(1024 * 1024 * 1024);
// }
// LocalOplogInfo::get(opCtx)->setRecordStore(opCtx, rs);
// // Note if in async mode, at this point we have not yet sampled.
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
// ASSERT(oplogTruncateMarkers);
// // Continue finishing the initial scan / sample
// oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// ASSERT(oplogTruncateMarkers);
// // Confirm that we can in fact sample
// ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
// oplogTruncateMarkers->getMarkersCreationMethod());
// // Confirm that some truncate markers were generated.
// ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
// auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
// ASSERT_GT(truncateMarkersBefore, 0U);
// ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
}
// In async mode, markers are not created during createOplogTruncateMarkers (which instead
// returns empty OplogTruncateMarkers object)
TEST_F(AsyncOplogTruncationTest, OplogTruncateMarkers_AsynchronousModeCreateOplogTruncateMarkers) {
// Turn off async mode
// RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
// "oplogSamplingAsyncEnabled", true);
// auto opCtx = getOperationContext();
// auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// // Note if in async mode, at this point we have not yet sampled.
// auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
// ASSERT(oplogTruncateMarkers);
// ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
// ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
// ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
} // namespace repl
} // namespace mongo

View File

@ -45,7 +45,6 @@
#include "mongo/db/storage/oplog_truncation.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
#include "mongo/util/assert_util.h"
@ -61,7 +60,14 @@ namespace mongo {
namespace {
const auto getMaintainerThread = ServiceContext::declareDecoration<OplogCapMaintainerThread>();
const auto getMaintainerThread =
ServiceContext::declareDecoration<std::unique_ptr<OplogCapMaintainerThread>>();
// Cumulative amount of time spent truncating the oplog.
AtomicWord<int64_t> totalTimeTruncating;
// Cumulative number of truncates of the oplog.
AtomicWord<int64_t> truncateCount;
MONGO_FAIL_POINT_DEFINE(hangOplogCapMaintainerThread);
@ -114,8 +120,9 @@ public:
builder.append("oplogMinRetentionHours", oplogMinRetentionHours);
}
auto& capMaintainer = getMaintainerThread(opCtx->getServiceContext());
capMaintainer.appendStats(builder);
builder.append("totalTimeTruncatingMicros", totalTimeTruncating.load());
builder.append("truncateCount", truncateCount.load());
return builder.obj();
}
};
@ -127,17 +134,32 @@ auto oplogTruncateMarkersStats =
} // namespace
OplogCapMaintainerThread* OplogCapMaintainerThread::get(ServiceContext* serviceCtx) {
return &getMaintainerThread(serviceCtx);
auto& maintainerThread = getMaintainerThread(serviceCtx);
if (maintainerThread) {
return maintainerThread.get();
}
return nullptr;
}
OplogCapMaintainerThread* OplogCapMaintainerThread::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}
void OplogCapMaintainerThread::set(
ServiceContext* serviceCtx,
std::unique_ptr<OplogCapMaintainerThread> oplogCapMaintainerThread) {
auto& maintainerThread = getMaintainerThread(serviceCtx);
if (maintainerThread) {
invariant(!maintainerThread->running(),
"Tried to reset the OplogCapMaintainerThread without shutting down the original "
"instance.");
}
invariant(oplogCapMaintainerThread);
maintainerThread = std::move(oplogCapMaintainerThread);
}
bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
// Maintaining the Oplog cap is crucial to the stability of the server so that we don't let the
// oplog grow unbounded. We mark the operation as having immediate priority to skip ticket
// acquisition and flow control.
ScopedAdmissionPriority<ExecutionAdmissionContext> priority(
opCtx, AdmissionContext::Priority::kExempt);
// A Global IX lock should be good enough to protect the oplog truncation from
// interruptions such as replication rollback. Database lock or collection lock is not
// needed. This improves concurrency if oplog truncation takes long time.
@ -188,8 +210,8 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
oplog_truncation::reclaimOplog(opCtx, *rs, RecordId(mayTruncateUpTo.asULL()));
auto elapsedMicros = timer.micros();
_totalTimeTruncating.fetchAndAdd(elapsedMicros);
_truncateCount.fetchAndAdd(1);
totalTimeTruncating.fetchAndAdd(elapsedMicros);
truncateCount.fetchAndAdd(1);
auto elapsedMillis = elapsedMicros / 1000;
LOGV2(22402,
@ -203,48 +225,66 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
return true;
}
void OplogCapMaintainerThread::start() {
massert(4204300, "OplogCapMaintainerThread already started", !_thread.joinable());
_thread = stdx::thread(&OplogCapMaintainerThread::_run, this);
}
void OplogCapMaintainerThread::_run() {
std::string name = std::string("OplogCapMaintainerThread-") +
toStringForLogging(NamespaceString::kRsOplogNamespace);
setThreadName(name);
LOGV2_DEBUG(
5295000, 1, "Oplog cap maintainer thread started and active", "threadName"_attr = name);
ThreadClient tc(name,
void OplogCapMaintainerThread::run() {
LOGV2(5295000, "Oplog cap maintainer thread started", "threadName"_attr = _name);
ThreadClient tc(_name,
getGlobalServiceContext()->getService(ClusterRole::ShardServer),
Client::noSession(),
ClientOperationKillableByStepdown{false});
ServiceContext::UniqueOperationContext opCtx;
boost::optional<ScopedAdmissionPriority<ExecutionAdmissionContext>> admissionPriority;
if (feature_flags::gOplogSamplingAsyncEnabled.isEnabled() && gOplogSamplingAsyncEnabled) {
{
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex);
// Initialize the thread's opCtx.
_uniqueCtx.emplace(tc->makeOperationContext());
// Maintaining the Oplog cap is crucial to the stability of the server so that we don't let
// the oplog grow unbounded. We mark the operation as having immediate priority to skip
// ticket acquisition and flow control.
admissionPriority.emplace(_uniqueCtx->get(), AdmissionContext::Priority::kExempt);
}
ON_BLOCK_EXIT([&] {
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex);
admissionPriority.reset();
_uniqueCtx.reset();
});
if (gOplogSamplingAsyncEnabled) {
try {
opCtx = tc->makeOperationContext();
boost::optional<AutoGetOplogFastPath> oplogRead;
RecordStore* rs = nullptr;
{
stdx::unique_lock<stdx::mutex> lk(_stateMutex);
if (_shuttingDown) {
return;
}
}
// Need the oplog to have been created first before we proceed.
do {
// Create the initial set of truncate markers as part of this thread before we
// attempt to delete excess markers. Ensure that the oplog has been created as part
// of restart before we attempt to create markers.
boost::optional<AutoGetOplogFastPath> oplogRead;
oplogRead.emplace(
opCtx.get(),
_uniqueCtx->get(),
OplogAccessMode::kRead,
Date_t::max(),
AutoGetOplogFastPathOptions{.skipRSTLLock = true,
.explicitIntent =
rss::consensus::IntentRegistry::Intent::Read});
const auto& oplog = oplogRead->getCollection();
if (oplog) {
rs = oplog->getRecordStore();
// Initial sampling and marker creation.
auto oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(
_uniqueCtx->get(), *oplog->getRecordStore());
invariant(oplogTruncateMarkers);
LocalOplogInfo::get(_uniqueCtx->get())
->setTruncateMarkers(std::move(oplogTruncateMarkers));
break;
}
// Wait a bit to give the oplog a chance to be created.
MONGO_IDLE_THREAD_BLOCK;
LOGV2_DEBUG(10621101, 1, "OplogCapMaintainerThread is idle");
@ -252,63 +292,64 @@ void OplogCapMaintainerThread::_run() {
oplogRead.reset();
sleepFor(Milliseconds(100));
LOGV2_DEBUG(10621109, 1, "OplogCapMaintainerThread is active");
} while (!rs);
// Initial sampling and marker creation.
auto oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx.get(), *rs);
invariant(oplogTruncateMarkers);
LocalOplogInfo::get(opCtx.get())->setTruncateMarkers(std::move(oplogTruncateMarkers));
} while (true);
} catch (const ExceptionFor<ErrorCategory::ShutdownError>& e) {
LOGV2_DEBUG(9468100,
1,
"Interrupted due to shutdown. OplogCapMaintainerThread Exiting!",
"error"_attr = e.what());
return;
} catch (const ExceptionFor<ErrorCodes::InterruptedDueToStorageChange>&) {
LOGV2_DEBUG(10167201,
1,
"Interrupted due to storage change. OplogCapMaintainerThread Exiting!");
return;
}
}
opCtx.reset();
while (true) {
// It's illegal to create a new opCtx while a thread already has one, so we reset the opCtx.
// Otherwise, it could lead to deadlocks in the production setup.
//
// For example, FCBIS requires switching storage engines. Before switching storage engines,
// we block the system from creating new opCtxs, kill all existing opCtxs, and wait for
// their destruction. If makeOperationContext() was called during this process, it could be
// blocked, which would in turn block the destruction of previous killed opCtx and block the
// FCBIS.
ON_BLOCK_EXIT([&] { opCtx.reset(); });
try {
opCtx = tc->makeOperationContext();
// We need this check since the first check to _shuttingDown is guarded by
// gOplogSamplingAsyncEnabled and we will never check this value if async is disabled.
{
stdx::unique_lock<stdx::mutex> lk(_stateMutex);
if (_shuttingDown) {
return;
}
}
try {
if (MONGO_unlikely(hangOplogCapMaintainerThread.shouldFail())) {
LOGV2(5095500, "Hanging the oplog cap maintainer thread due to fail point");
hangOplogCapMaintainerThread.pauseWhileSet(opCtx.get());
hangOplogCapMaintainerThread.pauseWhileSet(_uniqueCtx->get());
}
if (_deleteExcessDocuments(opCtx.get())) {
if (_deleteExcessDocuments(_uniqueCtx->get())) {
continue;
}
opCtx->sleepFor(Seconds(1)); // Back off in case there were problems deleting.
_uniqueCtx->get()->sleepFor(
Seconds(1)); // Back off in case there were problems deleting.
} catch (const ExceptionFor<ErrorCategory::ShutdownError>& e) {
LOGV2_DEBUG(9259900,
1,
"Interrupted due to shutdown. OplogCapMaintainerThread Exiting",
"error"_attr = e);
return;
} catch (const ExceptionFor<ErrorCodes::InterruptedDueToStorageChange>&) {
LOGV2_DEBUG(10167202,
1,
"Interrupted due to storage change. OplogCapMaintainerThread Exiting!");
return;
} catch (...) {
const auto& err = mongo::exceptionToStatus();
if (opCtx->checkForInterruptNoAssert().isOK()) {
if (_uniqueCtx->get()->checkForInterruptNoAssert().isOK()) {
LOGV2_FATAL_NOTRACE(
6761100, "Error in OplogCapMaintainerThread", "error"_attr = err);
}
// Since we make this operation unkillable by stepdown, the opCtx can't be interrupted
// by repl state transitions - stepdown, stepup, and rollback.
// It can only be interrupted by shutdown, killOp, or storage change
// (causes ErrorCodes::InterruptedDueToStorageChange) due to FCBIS. The shutdown case is
// Since we make this operation unkillable by stepdown, the opCtx can't be
// interrupted by repl state transitions - stepdown, stepup, and rollback. It can
// only be interrupted by shutdown, killOp, or storage change (causes
// ErrorCodes::InterruptedDueToStorageChange) due to FCBIS. The shutdown case is
// handled above. We reach here for the last two cases, and it's safe to continue.
LOGV2_DEBUG(9064301,
1,
@ -320,17 +361,23 @@ void OplogCapMaintainerThread::_run() {
MONGO_UNREACHABLE;
}
void OplogCapMaintainerThread::appendStats(BSONObjBuilder& builder) const {
builder.append("totalTimeTruncatingMicros", _totalTimeTruncating.load());
builder.append("truncateCount", _truncateCount.load());
}
void OplogCapMaintainerThread::shutdown() {
if (_thread.joinable()) {
LOGV2_INFO(7474902, "Shutting down oplog cap maintainer thread");
_thread.join();
LOGV2(7474901, "Finished shutting down oplog cap maintainer thread");
void OplogCapMaintainerThread::shutdown(const Status& reason) {
LOGV2_INFO(7474902, "Shutting down oplog cap maintainer thread");
{
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex);
if (_uniqueCtx) {
stdx::lock_guard<Client> lk(*_uniqueCtx->get()->getClient());
_uniqueCtx->get()->markKilled(reason.code());
}
}
{
stdx::lock_guard<stdx::mutex> lk(_stateMutex);
_shuttingDown = true;
_shutdownReason = reason;
}
wait();
LOGV2(7474901, "Finished shutting down oplog cap maintainer thread");
}
} // namespace mongo

View File

@ -30,44 +30,51 @@
#pragma once
#include "mongo/db/service_context.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/background.h"
namespace mongo {
/**
* Responsible for deleting oplog truncate markers once their max capacity has been reached.
*/
class OplogCapMaintainerThread {
class OplogCapMaintainerThread : public BackgroundJob {
public:
static OplogCapMaintainerThread* get(ServiceContext* serviceCtx);
OplogCapMaintainerThread() : BackgroundJob(false /* deleteSelf */) {}
/**
* Create the maintainer thread. Must be called at most once.
*/
void start();
static OplogCapMaintainerThread* get(ServiceContext* serviceCtx);
static OplogCapMaintainerThread* get(OperationContext* opCtx);
static void set(ServiceContext* serviceCtx,
std::unique_ptr<OplogCapMaintainerThread> oplogCapMaintainerThread);
std::string name() const override {
return _name;
}
void run() override;
/**
* Waits until the maintainer thread finishes. Must not be called concurrently with start().
*/
void shutdown();
void appendStats(BSONObjBuilder& builder) const;
void shutdown(const Status& reason);
private:
void _run();
/**
* Returns true iff there was an oplog to delete from.
*/
bool _deleteExcessDocuments(OperationContext* opCtx);
stdx::thread _thread;
// Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
mutable stdx::mutex _opCtxMutex;
// Cumulative amount of time spent truncating the oplog.
AtomicWord<int64_t> _totalTimeTruncating;
// Saves a reference to the cap maintainer thread's operation context.
boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
// Cumulative number of truncates of the oplog.
AtomicWord<int64_t> _truncateCount;
mutable stdx::mutex _stateMutex;
bool _shuttingDown = false;
Status _shutdownReason = Status::OK();
std::string _name = std::string("OplogCapMaintainerThread-") +
toStringForLogging(NamespaceString::kRsOplogNamespace);
};
} // namespace mongo

View File

@ -139,6 +139,10 @@ private:
auto service = getServiceContext();
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(service);
ReplicationCoordinator::set(service, std::move(replCoord));
// Turn off async sampling before creating the oplog so we get the right value of
// needsTruncateMarkers in setRecordStore.
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
repl::createOplog(_opCtx.get());
}
@ -157,6 +161,11 @@ private:
* Insert records into an oplog and verify the number of truncate markers that are created.
*/
TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto oplogTruncateMarkers = LocalOplogInfo::get(opCtx)->getTruncateMarkers();
@ -209,6 +218,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CreateNewMarker) {
* should leave no truncate markers, including the partially filled one.
*/
TEST_F(OplogTruncationTest, OplogTruncateMarkers_Truncate) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto& storage = getStorage();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
@ -245,6 +258,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Truncate) {
* record is changed.
*/
TEST_F(OplogTruncationTest, OplogTruncateMarkers_UpdateRecord) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto& storage = getStorage();
@ -307,6 +324,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_UpdateRecord) {
* partially truncated, then it should become the truncate marker currently being filled.
*/
TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto& storage = getStorage();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
@ -415,6 +436,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_CappedTruncateAfter) {
* Verify that oplog truncate markers are reclaimed when cappedMaxSize is exceeded.
*/
TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
auto engine = getServiceContext()->getStorageEngine();
@ -604,6 +629,10 @@ TEST_F(OplogTruncationTest, ReclaimTruncateMarkers) {
* of the records to not be in increasing order.
*/
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AscendingOrder) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto oplogTruncateMarkers = LocalOplogInfo::get(opCtx)->getTruncateMarkers();
@ -649,6 +678,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_AscendingOrder) {
// (2) OplogTruncateMarkers::currentBytes_forTest() reflects the actual size of the oplog instead
// of the estimated size.
TEST_F(OplogTruncationTest, OplogTruncateMarkers_NoMarkersGeneratedFromScanning) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
@ -687,6 +720,10 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_NoMarkersGeneratedFromScanning)
// sampled multiple times during startup, which can be very likely if the size storer is very
// inaccurate.
TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
// Turn off async mode
RAIIServerParameterControllerForTest oplogSamplingAsyncEnabledController(
"oplogSamplingAsyncEnabled", false);
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
@ -754,176 +791,5 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
}
}
// In async mode, markers are created as expected.
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AsynchronousModeMarkerCreation) {
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// Turn on async mode
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagOplogSamplingAsyncEnabled", true);
auto oplogTruncateMarkers = LocalOplogInfo::get(opCtx)->getTruncateMarkers();
ASSERT(oplogTruncateMarkers);
oplogTruncateMarkers->setMinBytesPerMarker(100);
// Inserting a record smaller than 'minBytesPerTruncateMarker' shouldn't create a new oplog
// truncate marker.
insertOplog(1, 99);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(99, oplogTruncateMarkers->currentBytes_forTest());
// Inserting another record such that their combined size exceeds
// 'minBytesPerTruncateMarker' should cause a new truncate marker to be created.
insertOplog(2, 51);
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
// Inserting a record such that the combined size of this record and the previously inserted
// one exceed 'minBytesPerTruncateMarker' shouldn't cause a new truncate marker to be
// created because we've started filling a new truncate marker.
insertOplog(3, 50);
ASSERT_EQ(1U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(1, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(50, oplogTruncateMarkers->currentBytes_forTest());
// Inserting a record such that the combined size of this record and the previously inserted
// one is exactly equal to 'minBytesPerTruncateMarker' should cause a new truncate marker to
// be created.
insertOplog(4, 50);
ASSERT_EQ(2U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
ASSERT_EQ(4, rs->numRecords());
ASSERT_EQ(250, rs->dataSize());
}
// In async mode, sampleAndUpdate is called seperately from createOplogTruncateMarkers, and
// creates the initial set of markers.
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampleAndUpdate) {
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// Turn on async mode
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagOplogSamplingAsyncEnabled", true);
// Populate oplog to force marker creation to occur
int realNumRecords = 4;
int realSizePerRecord = 1024 * 1024;
for (int i = 1; i <= realNumRecords; i++) {
insertOplog(i, realSizePerRecord);
}
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
// Continue finishing the initial scan / sample
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// Confirm that some truncate markers were generated.
ASSERT_LT(0U, oplogTruncateMarkers->numMarkers_forTest());
}
// In async mode, during startup but before sampling finishes, creation method is InProgress. This
// should then resolve to either the Scanning or Sampling method once initial marker creation has
// finished
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AsynchronousModeInProgressState) {
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// Turn on async mode
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagOplogSamplingAsyncEnabled", true);
// Populate oplog to so that initial marker creation method is not EmptyCollection
insertOplog(1, 100);
// Note if in async mode, at this point we have not yet sampled.
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
// Confirm that we are in InProgress state since sampling/scanning has not begun.
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::InProgress,
oplogTruncateMarkers->getMarkersCreationMethod());
// Continue finishing the initial scan / sample
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
// Check that the InProgress state has now been resolved.
ASSERT(oplogTruncateMarkers->getMarkersCreationMethod() ==
CollectionTruncateMarkers::MarkersCreationMethod::Scanning);
}
// In async mode, we are still able to sample when expected, and some markers can be created.
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AsynchronousModeSampling) {
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
auto wtRS = static_cast<WiredTigerRecordStore::Oplog*>(rs);
// Turn on async mode
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagOplogSamplingAsyncEnabled", true);
{
// Before initializing the RecordStore, populate with a few records.
insertOplog(1, 100);
insertOplog(2, 100);
insertOplog(3, 100);
insertOplog(4, 100);
}
{
// Force initialize the oplog truncate markers to use sampling by providing very large,
// inaccurate sizes. This should cause us to over sample the records in the oplog.
ASSERT_OK(wtRS->updateSize(1024 * 1024 * 1024));
wtRS->setNumRecords(1024 * 1024);
wtRS->setDataSize(1024 * 1024 * 1024);
}
LocalOplogInfo::get(opCtx)->setRecordStore(opCtx, rs);
// Note if in async mode, at this point we have not yet sampled.
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
// Continue finishing the initial scan / sample
oplogTruncateMarkers = OplogTruncateMarkers::sampleAndUpdate(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
// Confirm that we can in fact sample
ASSERT_EQ(CollectionTruncateMarkers::MarkersCreationMethod::Sampling,
oplogTruncateMarkers->getMarkersCreationMethod());
// Confirm that some truncate markers were generated.
ASSERT_GTE(oplogTruncateMarkers->getCreationProcessingTime().count(), 0);
auto truncateMarkersBefore = oplogTruncateMarkers->numMarkers_forTest();
ASSERT_GT(truncateMarkersBefore, 0U);
ASSERT_GT(oplogTruncateMarkers->currentBytes_forTest(), 0);
}
// In async mode, markers are not created during createOplogTruncateMarkers (which instead returns
// empty OplogTruncateMarkers object)
TEST_F(OplogTruncationTest, OplogTruncateMarkers_AsynchronousModeCreateOplogTruncateMarkers) {
auto opCtx = getOperationContext();
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
// Turn on async mode
RAIIServerParameterControllerForTest featureFlagController(
"featureFlagOplogSamplingAsyncEnabled", true);
// Note if in async mode, at this point we have not yet sampled.
auto oplogTruncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
ASSERT(oplogTruncateMarkers);
ASSERT_EQ(0U, oplogTruncateMarkers->numMarkers_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentRecords_forTest());
ASSERT_EQ(0, oplogTruncateMarkers->currentBytes_forTest());
}
} // namespace repl
} // namespace mongo