SERVER-119964 Only stop oplog manager thread when the current Oplog is destroyed (#48857)

GitOrigin-RevId: dd2435edde76bca7823be683a6be995ebb88a1bb
This commit is contained in:
Thomas Goyne 2026-03-05 15:12:56 -08:00 committed by MongoDB Bot
parent 5cfe947381
commit f125434bc0
7 changed files with 246 additions and 43 deletions

View File

@ -0,0 +1,137 @@
/**
* Reproduces a scenario where:
* - A long-running agg holds a CollectionCatalog snapshot on a node.
* - That node goes through rollback (closeCatalog/openCatalog, OplogManager stop/start).
* - The node is later stepped up to primary.
* - The read resumes, gets InterruptedDueToReplStateChange, and releases the old RS.
*
* Prior to SERVER-119964, this resulted in the oplog manager thread exiting,
* and oplog visibility would never advance. This made majority writes hang
* indefinitely.
*
* The intent registry prevents proceeding through a rollback while a
* pre-rollback aggregation is hung, which breaks this test but also prevents
* the bug from occurring.
*
* @tags: [
* requires_replication,
* requires_mongobridge,
* requires_persistence,
* featureFlagIntentRegistration_incompatible
* ]
*/
import {RollbackTest} from "jstests/replsets/libs/rollback_test.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
const dbName = jsTestName();
const collName = "testColl";
const rollbackTest = new RollbackTest();
jsTestLog.info("Populate database and wait for steady state replication");
const primary = rollbackTest.getPrimary();
const primaryDB = primary.getDB(dbName);
const coll = primaryDB.getCollection(collName);
assert.commandWorked(
coll.insert(
Array.from({length: 10}, (_, i) => ({_id: i, x: i})),
{writeConcern: {w: "majority"}},
),
);
rollbackTest.awaitReplication();
// Enable debug logging for storage to check for oplog manager thread not stopping
assert.commandWorked(primary.setLogLevel(1, "storage"));
const rollbackNode = rollbackTest.transitionToRollbackOperations();
/**
* Start an aggregation on the rollback node and hang it during yield via
* setYieldAllLocksHang. This will release any locks but _not_ refresh the
* CollectionCatalog shared_ptr.
*/
jsTestLog.info("Enabling setYieldAllLocksHang on rollback node and starting aggregation");
// Force very frequent yields on the rollback node.
assert.commandWorked(rollbackNode.adminCommand({setParameter: 1, internalQueryExecYieldIterations: 1}));
const hangDuringYield = configureFailPoint(rollbackNode, "setYieldAllLocksHang", {
namespace: `${dbName}.${collName}`,
// Important: dont call checkForInterrupt at the hang site; we want to
// ignore repl state changes until the agg resumes.
checkForInterruptAfterHang: false,
});
const awaitAgg = startParallelShell(
funWithArgs(
(dbName, collName) => {
// This aggregation runs on the rollback node. It will:
// - Start a lock-free read / consistent catalog + storage snapshot.
// - Yield frequently; on each yield, setYieldAllLocksHang may fire and
// block the op while locks are dropped but the snapshot is pinned.
// When it finally resumes after rollback + step-up, it should see that the node's
// replication state has changed and fail with InterruptedDueToReplStateChange
// (or a network error, depending on timing).
try {
const res = db.getSiblingDB(dbName).runCommand({
aggregate: collName,
pipeline: [
{
$match: {
$expr: {
$in: ["$_id", [...Array(10).keys()]],
},
},
},
],
cursor: {},
});
jsTestLog.info("Aggregation on rollback node completed with: " + tojson(res));
} catch (e) {
jsTestLog.info("Aggregation on rollback node failed with exception: " + tojson(e));
}
},
dbName,
collName,
),
rollbackNode.port,
);
jsTestLog.info("Wait for aggregation to reach the setYieldAllLocksHang failpoint");
hangDuringYield.wait();
rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
rollbackTest.transitionToSyncSourceOperationsDuringRollback();
rollbackTest.transitionToSteadyStateOperations();
rollbackTest.stepUpNode(primary);
/**
* Unblock the paused aggregation.
*
* Turning off the failpoint lets the aggregation continue. Because the node's
* replication state changed (went through rollback and then was stepped up),
* the command should be interrupted due to repl state change at some later interrupt
* point. When the OperationContext unwinds, it releases its CollectionCatalog snapshot,
* which should drop the last reference to RS_old, run ~Oplog() on RS_old, and
* call WiredTigerOplogManager::stop() a second time in the "RS_new" state.
*/
jsTestLog.info("Turning off setYieldAllLocksHang so hung aggregation can resume");
hangDuringYield.off();
awaitAgg();
jsTestLog.info("Wait for oplog manager thread to not exit");
checkLog.containsJson(rollbackNode, 11996400, {});
jsTestLog.info("Perform a majority write, which would hang forever if the thread wasn't running");
assert.commandWorked(
coll.insert(
Array.from({length: 10}, (_, i) => ({_id: i + 10, x: i})),
{writeConcern: {w: "majority"}},
),
);
rollbackTest.stop();

View File

@ -943,7 +943,7 @@ void WiredTigerKVEngine::cleanShutdown(bool memLeakAllowed) {
return;
}
getOplogManager()->stop();
getOplogManager()->stop(nullptr);
if (_sessionSweeper) {
LOGV2(22318, "Shutting down session sweeper thread");
@ -1795,7 +1795,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getRecordStore(OperationContext
.isLogged = isLogged,
.forceUpdateWithFullDocument =
options.forceUpdateWithFullDocument});
getOplogManager()->stop();
getOplogManager()->stop(nullptr);
getOplogManager()->start(opCtx, *this, *ret, _isReplSet);
} else {
bool isLogged = [&] {

View File

@ -29,7 +29,6 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
// IWYU pragma: no_include "cxxabi.h"
#include "mongo/db/client.h"
#include "mongo/db/record_id.h"
#include "mongo/db/shard_role/transaction_resources.h"
@ -89,16 +88,16 @@ void WiredTigerOplogManager::start(OperationContext* opCtx,
_oplogIdent = std::string{oplog.getIdent()};
stdx::lock_guard<stdx::mutex> lk(_oplogVisibilityStateMutex);
invariant(!_running);
_running = true;
invariant(!_oplog);
_oplog = &oplog;
LOGV2(12035900, "Oplog visibility thread is starting");
_oplogVisibilityThread =
stdx::thread([this, service = opCtx->getServiceContext()->getService(), &engine, &oplog] {
stdx::thread([this, service = opCtx->getServiceContext()->getService(), &engine] {
Client::initThread("OplogVisibilityThread", service);
stdx::unique_lock<stdx::mutex> lk(_oplogVisibilityStateMutex);
while (true) {
switch (_updateVisibility(lk, engine, *oplog.capped())) {
while (_oplog) {
switch (_updateVisibility(lk, engine, *_oplog->capped())) {
case VisibilityUpdateResult::NotUpdated:
continue;
case VisibilityUpdateResult::Updated:
@ -107,22 +106,35 @@ void WiredTigerOplogManager::start(OperationContext* opCtx,
// oplog entries will not become visible immediately upon insert, so we
// notify waiters here as well, when new oplog entries actually become
// visible to cursors.
oplog.capped()->notifyWaitersIfNeeded();
_oplog->capped()->notifyWaitersIfNeeded();
continue;
case VisibilityUpdateResult::Stopped:
return;
}
}
LOGV2(22372, "Oplog visibility thread is stopping");
});
}
void WiredTigerOplogManager::stop() {
void WiredTigerOplogManager::stop(const RecordStore* oplog) {
{
stdx::lock_guard<stdx::mutex> lk(_oplogVisibilityStateMutex);
if (!_running) {
std::lock_guard lk(_oplogVisibilityStateMutex);
if (!_oplog) {
return;
}
_running = false;
// There are three things which stop the manager: clean shutdown, destroying a
// WiredTigerRecordStore::Oplog instance, and *creating* a WiredTigerRecordStore::Oplog
// instance. Creating a new Oplog instance stops the existing thread (if any) and starts a
// new one, and this may happen before the old Oplog instance is destroyed. If this happens,
// the destruction of the old instance needs to not stop the new thread.
if (oplog && oplog != _oplog) {
LOGV2_DEBUG(11996400,
1,
"Not stopping oplog visiblity thread because oplog pointer did not match");
return;
}
_oplog = nullptr;
}
if (_oplogVisibilityThread.joinable()) {
@ -207,8 +219,8 @@ WiredTigerOplogManager::VisibilityUpdateResult WiredTigerOplogManager::_updateVi
stdx::unique_lock<stdx::mutex>& lk, const KVEngine& engine, const RecordStore::Capped& oplog) {
{
MONGO_IDLE_THREAD_BLOCK;
_oplogVisibilityThreadCV.wait(
lk, [this] { return !_running || _triggerOplogVisibilityUpdate; });
_oplogVisibilityThreadCV.wait(lk,
[this] { return !_oplog || _triggerOplogVisibilityUpdate; });
// If we are not shutting down and nobody is actively waiting for the oplog to become
// visible, delay a bit to batch more requests into one update and reduce system load.
@ -222,15 +234,13 @@ WiredTigerOplogManager::VisibilityUpdateResult WiredTigerOplogManager::_updateVi
lk,
now.toSystemTimePoint(),
[this, &oplog] {
return !_running || _opsWaitingForOplogVisibilityUpdate ||
oplog.hasWaiters();
return !_oplog || _opsWaitingForOplogVisibilityUpdate || oplog.hasWaiters();
})) {
now += Milliseconds{1};
}
}
if (!_running) {
LOGV2(22372, "Oplog visibility thread is stopping");
if (!_oplog) {
return VisibilityUpdateResult::Stopped;
}

View File

@ -67,14 +67,19 @@ public:
/**
* Starts the oplog manager, initializing the oplog read timestamp with the highest oplog
* timestamp.
* timestamp. Every call to start() must be followed by at least one call to stop() before
* start() can be called again.
*/
void start(OperationContext*, const KVEngine&, RecordStore& oplog, bool isReplSet);
/**
* Stops the oplog manager.
* Stops the oplog manager if it is running for the given oplog.
*
* If `oplog` is non-null, the manager will only stop if `oplog` is the same record store as was
* passed to the most recent call to `start()`. If it is null, the manager will stop
* unconditionally.
*/
void stop();
void stop(const RecordStore* oplog);
/**
* Updates the oplog read timestamp if the visibility timestamp is behind the provided
@ -112,6 +117,11 @@ public:
*/
StringData getIdent() const;
bool isRunning_forTest() const {
std::lock_guard lk(_oplogVisibilityStateMutex);
return _oplog != nullptr;
}
private:
enum class VisibilityUpdateResult {
NotUpdated,
@ -139,8 +149,9 @@ private:
// Protects the state below.
mutable stdx::mutex _oplogVisibilityStateMutex;
// Whether this oplog manager is currently running.
bool _running = false;
// The record store which the thread is currently running for. May be null while the thread is
// running if the thread is in the process of shutting down.
RecordStore* _oplog = nullptr;
// Whether an oplog to oplog visibility is being triggered.
bool _triggerOplogVisibilityUpdate = false;

View File

@ -1463,7 +1463,7 @@ std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::Oplog::getRawCursor
}
WiredTigerRecordStore::Oplog::~Oplog() {
_kvEngine->getOplogManager()->stop();
_kvEngine->getOplogManager()->stop(this);
}
std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::Oplog::getCursor(

View File

@ -31,10 +31,8 @@
#include "mongo/base/checked_cast.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/init.h" // IWYU pragma: keep
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/bsontypes.h"
#include "mongo/bson/json.h"
@ -65,8 +63,6 @@
#include <wiredtiger.h>
#include <boost/move/utility_core.hpp>
#include <boost/none.hpp>
#include <boost/optional/optional.hpp>
namespace mongo {
@ -272,7 +268,7 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) {
std::unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
std::unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore());
auto engine = static_cast<WiredTigerKVEngine*>(harnessHelper->getEngine());
engine->getOplogManager()->stop();
engine->getOplogManager()->stop(nullptr);
auto isOpHidden = [&engine](const RecordId& id) {
return engine->getOplogManager()->getOplogReadTimestamp() <
@ -308,7 +304,7 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) {
std::unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
std::unique_ptr<RecordStore> rs(harnessHelper->newOplogRecordStore());
auto engine = static_cast<WiredTigerKVEngine*>(harnessHelper->getEngine());
engine->getOplogManager()->stop();
engine->getOplogManager()->stop(nullptr);
auto isOpHidden = [&engine](const RecordId& id) {
return engine->getOplogManager()->getOplogReadTimestamp() <
@ -560,6 +556,46 @@ TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) {
ASSERT_EQ(tsThree, wtRS->getLatestTimestamp(ru1));
}
TEST(WiredTigerRecordStoreTest, OplogDestructorAutomaticallyStopsOplogManager) {
auto harnessHelper = newRecordStoreHarnessHelper();
auto oplogManager =
static_cast<WiredTigerKVEngine*>(harnessHelper->getEngine())->getOplogManager();
ASSERT_FALSE(oplogManager->isRunning_forTest());
auto rs(harnessHelper->newOplogRecordStore());
ASSERT_TRUE(oplogManager->isRunning_forTest());
rs.reset();
ASSERT_FALSE(oplogManager->isRunning_forTest());
}
TEST(WiredTigerRecordStoreTest, OplogDestructorOnlyStopsCorrectOplogManager) {
auto harnessHelper = newRecordStoreHarnessHelper();
auto oplogManager =
static_cast<WiredTigerKVEngine*>(harnessHelper->getEngine())->getOplogManager();
ASSERT_FALSE(oplogManager->isRunning_forTest());
// Creating rs2 stops the thread for rs1 and starts it for itself, so destroying rs1 should not
// stop the thread.
auto rs1 = harnessHelper->newOplogRecordStore();
ASSERT_TRUE(oplogManager->isRunning_forTest());
auto rs2 = harnessHelper->newOplogRecordStore();
ASSERT_TRUE(oplogManager->isRunning_forTest());
rs1.reset();
ASSERT_TRUE(oplogManager->isRunning_forTest());
rs2.reset();
ASSERT_FALSE(oplogManager->isRunning_forTest());
// Destroying rs2 first should stop the thread even though rs1 still exists
rs1 = harnessHelper->newOplogRecordStore();
rs2 = harnessHelper->newOplogRecordStore();
ASSERT_TRUE(oplogManager->isRunning_forTest());
rs2.reset();
ASSERT_FALSE(oplogManager->isRunning_forTest());
rs1.reset();
ASSERT_FALSE(oplogManager->isRunning_forTest());
}
TEST(WiredTigerRecordStoreTest, CursorInActiveTxnAfterNext) {
std::unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
std::unique_ptr<RecordStore> rs(harnessHelper->newRecordStore());

View File

@ -95,27 +95,36 @@ std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newRecordStore(
return _engine->getRecordStore(opCtx.get(), nss, ident, recordStoreOptions, uuid);
}
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStore() {
auto ret = newOplogRecordStoreNoInit();
ServiceContext::UniqueOperationContext opCtx(newOperationContext());
auto oplog = static_cast<WiredTigerRecordStore::Oplog*>(ret.get());
_engine->getOplogManager()->start(opCtx.get(), *_engine, *oplog, _isReplSet);
return ret;
}
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit() {
std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace));
static const auto kOplogOptions = [] {
RecordStore::Options oplogRecordStoreOptions;
oplogRecordStoreOptions.isOplog = true;
oplogRecordStoreOptions.isCapped = true;
// Large enough not to exceed capped limits.
oplogRecordStoreOptions.oplogMaxSize = 1024 * 1024 * 1024;
return oplogRecordStoreOptions;
}();
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStore() {
std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace));
ServiceContext::UniqueOperationContext opCtx(newOperationContext());
auto& provider = rss::ReplicatedStorageService::get(opCtx.get()).getPersistenceProvider();
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
WriteUnitOfWork wuow(opCtx.get());
const auto res = _engine->createRecordStore(
provider, ru, NamespaceString::kRsOplogNamespace, ident, oplogRecordStoreOptions);
provider, ru, NamespaceString::kRsOplogNamespace, ident, kOplogOptions);
wuow.commit();
return _engine->getRecordStore(
opCtx.get(), NamespaceString::kRsOplogNamespace, ident, kOplogOptions, UUID::gen());
}
std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit() {
std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace));
ServiceContext::UniqueOperationContext opCtx(newOperationContext());
auto& provider = rss::ReplicatedStorageService::get(opCtx.get()).getPersistenceProvider();
auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get());
WriteUnitOfWork wuow(opCtx.get());
const auto res = _engine->createRecordStore(
provider, ru, NamespaceString::kRsOplogNamespace, ident, kOplogOptions);
wuow.commit();
// Cannot use 'getRecordStore', which automatically starts the the oplog manager.
@ -127,7 +136,7 @@ std::unique_ptr<RecordStore> WiredTigerHarnessHelper::newOplogRecordStoreNoInit(
.engineName = std::string{kWiredTigerEngineName},
.inMemory = false,
// Large enough not to exceed capped limits.
.oplogMaxSize = oplogRecordStoreOptions.oplogMaxSize,
.oplogMaxSize = kOplogOptions.oplogMaxSize,
.sizeStorer = nullptr,
.tracksSizeAdjustments = true,
.isLogged = true,