diff --git a/jstests/noPassthrough/wt_integration/oplog_manager_thread_survives_rollback.js b/jstests/noPassthrough/wt_integration/oplog_manager_thread_survives_rollback.js new file mode 100644 index 00000000000..32d338f8a58 --- /dev/null +++ b/jstests/noPassthrough/wt_integration/oplog_manager_thread_survives_rollback.js @@ -0,0 +1,137 @@ +/** + * Reproduces a scenario where: + * - A long-running agg holds a CollectionCatalog snapshot on a node. + * - That node goes through rollback (closeCatalog/openCatalog, OplogManager stop/start). + * - The node is later stepped up to primary. + * - The read resumes, gets InterruptedDueToReplStateChange, and releases the old RS. + * + * Prior to SERVER-119964, this resulted in the oplog manager thread exiting, + * and oplog visibility would never advance. This made majority writes hang + * indefinitely. + * + * The intent registry prevents proceeding through a rollback while a + * pre-rollback aggregation is hung, which breaks this test but also prevents + * the bug from occurring. + * + * @tags: [ + * requires_replication, + * requires_mongobridge, + * requires_persistence, + * featureFlagIntentRegistration_incompatible + * ] + */ + +import {RollbackTest} from "jstests/replsets/libs/rollback_test.js"; +import {configureFailPoint} from "jstests/libs/fail_point_util.js"; +import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js"; + +const dbName = jsTestName(); +const collName = "testColl"; + +const rollbackTest = new RollbackTest(); + +jsTestLog.info("Populate database and wait for steady state replication"); +const primary = rollbackTest.getPrimary(); +const primaryDB = primary.getDB(dbName); +const coll = primaryDB.getCollection(collName); + +assert.commandWorked( + coll.insert( + Array.from({length: 10}, (_, i) => ({_id: i, x: i})), + {writeConcern: {w: "majority"}}, + ), +); + +rollbackTest.awaitReplication(); + +// Enable debug logging for storage to check for oplog manager thread not stopping +assert.commandWorked(primary.setLogLevel(1, "storage")); + +const rollbackNode = rollbackTest.transitionToRollbackOperations(); + +/** + * Start an aggregation on the rollback node and hang it during yield via + * setYieldAllLocksHang. This will release any locks but _not_ refresh the + * CollectionCatalog shared_ptr. + */ +jsTestLog.info("Enabling setYieldAllLocksHang on rollback node and starting aggregation"); + +// Force very frequent yields on the rollback node. +assert.commandWorked(rollbackNode.adminCommand({setParameter: 1, internalQueryExecYieldIterations: 1})); + +const hangDuringYield = configureFailPoint(rollbackNode, "setYieldAllLocksHang", { + namespace: `${dbName}.${collName}`, + // Important: don’t call checkForInterrupt at the hang site; we want to + // ignore repl state changes until the agg resumes. + checkForInterruptAfterHang: false, +}); + +const awaitAgg = startParallelShell( + funWithArgs( + (dbName, collName) => { + // This aggregation runs on the rollback node. It will: + // - Start a lock-free read / consistent catalog + storage snapshot. + // - Yield frequently; on each yield, setYieldAllLocksHang may fire and + // block the op while locks are dropped but the snapshot is pinned. + // When it finally resumes after rollback + step-up, it should see that the node's + // replication state has changed and fail with InterruptedDueToReplStateChange + // (or a network error, depending on timing). + try { + const res = db.getSiblingDB(dbName).runCommand({ + aggregate: collName, + pipeline: [ + { + $match: { + $expr: { + $in: ["$_id", [...Array(10).keys()]], + }, + }, + }, + ], + cursor: {}, + }); + jsTestLog.info("Aggregation on rollback node completed with: " + tojson(res)); + } catch (e) { + jsTestLog.info("Aggregation on rollback node failed with exception: " + tojson(e)); + } + }, + dbName, + collName, + ), + rollbackNode.port, +); + +jsTestLog.info("Wait for aggregation to reach the setYieldAllLocksHang failpoint"); +hangDuringYield.wait(); + +rollbackTest.transitionToSyncSourceOperationsBeforeRollback(); +rollbackTest.transitionToSyncSourceOperationsDuringRollback(); +rollbackTest.transitionToSteadyStateOperations(); +rollbackTest.stepUpNode(primary); + +/** + * Unblock the paused aggregation. + * + * Turning off the failpoint lets the aggregation continue. Because the node's + * replication state changed (went through rollback and then was stepped up), + * the command should be interrupted due to repl state change at some later interrupt + * point. When the OperationContext unwinds, it releases its CollectionCatalog snapshot, + * which should drop the last reference to RS_old, run ~Oplog() on RS_old, and + * call WiredTigerOplogManager::stop() a second time in the "RS_new" state. + */ +jsTestLog.info("Turning off setYieldAllLocksHang so hung aggregation can resume"); +hangDuringYield.off(); +awaitAgg(); + +jsTestLog.info("Wait for oplog manager thread to not exit"); +checkLog.containsJson(rollbackNode, 11996400, {}); + +jsTestLog.info("Perform a majority write, which would hang forever if the thread wasn't running"); +assert.commandWorked( + coll.insert( + Array.from({length: 10}, (_, i) => ({_id: i + 10, x: i})), + {writeConcern: {w: "majority"}}, + ), +); + +rollbackTest.stop(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index d213d54a740..1699fc3238a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -943,7 +943,7 @@ void WiredTigerKVEngine::cleanShutdown(bool memLeakAllowed) { return; } - getOplogManager()->stop(); + getOplogManager()->stop(nullptr); if (_sessionSweeper) { LOGV2(22318, "Shutting down session sweeper thread"); @@ -1795,7 +1795,7 @@ std::unique_ptr WiredTigerKVEngine::getRecordStore(OperationContext .isLogged = isLogged, .forceUpdateWithFullDocument = options.forceUpdateWithFullDocument}); - getOplogManager()->stop(); + getOplogManager()->stop(nullptr); getOplogManager()->start(opCtx, *this, *ret, _isReplSet); } else { bool isLogged = [&] { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp index d146020f9da..fe452b7253d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp @@ -29,7 +29,6 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h" -// IWYU pragma: no_include "cxxabi.h" #include "mongo/db/client.h" #include "mongo/db/record_id.h" #include "mongo/db/shard_role/transaction_resources.h" @@ -89,16 +88,16 @@ void WiredTigerOplogManager::start(OperationContext* opCtx, _oplogIdent = std::string{oplog.getIdent()}; stdx::lock_guard lk(_oplogVisibilityStateMutex); - invariant(!_running); - _running = true; + invariant(!_oplog); + _oplog = &oplog; LOGV2(12035900, "Oplog visibility thread is starting"); _oplogVisibilityThread = - stdx::thread([this, service = opCtx->getServiceContext()->getService(), &engine, &oplog] { + stdx::thread([this, service = opCtx->getServiceContext()->getService(), &engine] { Client::initThread("OplogVisibilityThread", service); stdx::unique_lock lk(_oplogVisibilityStateMutex); - while (true) { - switch (_updateVisibility(lk, engine, *oplog.capped())) { + while (_oplog) { + switch (_updateVisibility(lk, engine, *_oplog->capped())) { case VisibilityUpdateResult::NotUpdated: continue; case VisibilityUpdateResult::Updated: @@ -107,22 +106,35 @@ void WiredTigerOplogManager::start(OperationContext* opCtx, // oplog entries will not become visible immediately upon insert, so we // notify waiters here as well, when new oplog entries actually become // visible to cursors. - oplog.capped()->notifyWaitersIfNeeded(); + _oplog->capped()->notifyWaitersIfNeeded(); continue; case VisibilityUpdateResult::Stopped: return; } } + LOGV2(22372, "Oplog visibility thread is stopping"); }); } -void WiredTigerOplogManager::stop() { +void WiredTigerOplogManager::stop(const RecordStore* oplog) { { - stdx::lock_guard lk(_oplogVisibilityStateMutex); - if (!_running) { + std::lock_guard lk(_oplogVisibilityStateMutex); + if (!_oplog) { return; } - _running = false; + + // There are three things which stop the manager: clean shutdown, destroying a + // WiredTigerRecordStore::Oplog instance, and *creating* a WiredTigerRecordStore::Oplog + // instance. Creating a new Oplog instance stops the existing thread (if any) and starts a + // new one, and this may happen before the old Oplog instance is destroyed. If this happens, + // the destruction of the old instance needs to not stop the new thread. + if (oplog && oplog != _oplog) { + LOGV2_DEBUG(11996400, + 1, + "Not stopping oplog visiblity thread because oplog pointer did not match"); + return; + } + _oplog = nullptr; } if (_oplogVisibilityThread.joinable()) { @@ -207,8 +219,8 @@ WiredTigerOplogManager::VisibilityUpdateResult WiredTigerOplogManager::_updateVi stdx::unique_lock& lk, const KVEngine& engine, const RecordStore::Capped& oplog) { { MONGO_IDLE_THREAD_BLOCK; - _oplogVisibilityThreadCV.wait( - lk, [this] { return !_running || _triggerOplogVisibilityUpdate; }); + _oplogVisibilityThreadCV.wait(lk, + [this] { return !_oplog || _triggerOplogVisibilityUpdate; }); // If we are not shutting down and nobody is actively waiting for the oplog to become // visible, delay a bit to batch more requests into one update and reduce system load. @@ -222,15 +234,13 @@ WiredTigerOplogManager::VisibilityUpdateResult WiredTigerOplogManager::_updateVi lk, now.toSystemTimePoint(), [this, &oplog] { - return !_running || _opsWaitingForOplogVisibilityUpdate || - oplog.hasWaiters(); + return !_oplog || _opsWaitingForOplogVisibilityUpdate || oplog.hasWaiters(); })) { now += Milliseconds{1}; } } - if (!_running) { - LOGV2(22372, "Oplog visibility thread is stopping"); + if (!_oplog) { return VisibilityUpdateResult::Stopped; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h index 32a913e8c1d..433554c50b2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h @@ -67,14 +67,19 @@ public: /** * Starts the oplog manager, initializing the oplog read timestamp with the highest oplog - * timestamp. + * timestamp. Every call to start() must be followed by at least one call to stop() before + * start() can be called again. */ void start(OperationContext*, const KVEngine&, RecordStore& oplog, bool isReplSet); /** - * Stops the oplog manager. + * Stops the oplog manager if it is running for the given oplog. + * + * If `oplog` is non-null, the manager will only stop if `oplog` is the same record store as was + * passed to the most recent call to `start()`. If it is null, the manager will stop + * unconditionally. */ - void stop(); + void stop(const RecordStore* oplog); /** * Updates the oplog read timestamp if the visibility timestamp is behind the provided @@ -112,6 +117,11 @@ public: */ StringData getIdent() const; + bool isRunning_forTest() const { + std::lock_guard lk(_oplogVisibilityStateMutex); + return _oplog != nullptr; + } + private: enum class VisibilityUpdateResult { NotUpdated, @@ -139,8 +149,9 @@ private: // Protects the state below. mutable stdx::mutex _oplogVisibilityStateMutex; - // Whether this oplog manager is currently running. - bool _running = false; + // The record store which the thread is currently running for. May be null while the thread is + // running if the thread is in the process of shutting down. + RecordStore* _oplog = nullptr; // Whether an oplog to oplog visibility is being triggered. bool _triggerOplogVisibilityUpdate = false; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 8c37ed48dd9..d7bce17c885 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1463,7 +1463,7 @@ std::unique_ptr WiredTigerRecordStore::Oplog::getRawCursor } WiredTigerRecordStore::Oplog::~Oplog() { - _kvEngine->getOplogManager()->stop(); + _kvEngine->getOplogManager()->stop(this); } std::unique_ptr WiredTigerRecordStore::Oplog::getCursor( diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index b717f291df2..21d5eff3f8a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -31,10 +31,8 @@ #include "mongo/base/checked_cast.h" #include "mongo/base/error_codes.h" -#include "mongo/base/init.h" // IWYU pragma: keep #include "mongo/base/string_data.h" #include "mongo/bson/bsonelement.h" -#include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsontypes.h" #include "mongo/bson/json.h" @@ -65,8 +63,6 @@ #include -#include -#include #include namespace mongo { @@ -272,7 +268,7 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) { std::unique_ptr harnessHelper(newRecordStoreHarnessHelper()); std::unique_ptr rs(harnessHelper->newOplogRecordStore()); auto engine = static_cast(harnessHelper->getEngine()); - engine->getOplogManager()->stop(); + engine->getOplogManager()->stop(nullptr); auto isOpHidden = [&engine](const RecordId& id) { return engine->getOplogManager()->getOplogReadTimestamp() < @@ -308,7 +304,7 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) { std::unique_ptr harnessHelper(newRecordStoreHarnessHelper()); std::unique_ptr rs(harnessHelper->newOplogRecordStore()); auto engine = static_cast(harnessHelper->getEngine()); - engine->getOplogManager()->stop(); + engine->getOplogManager()->stop(nullptr); auto isOpHidden = [&engine](const RecordId& id) { return engine->getOplogManager()->getOplogReadTimestamp() < @@ -560,6 +556,46 @@ TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) { ASSERT_EQ(tsThree, wtRS->getLatestTimestamp(ru1)); } +TEST(WiredTigerRecordStoreTest, OplogDestructorAutomaticallyStopsOplogManager) { + auto harnessHelper = newRecordStoreHarnessHelper(); + auto oplogManager = + static_cast(harnessHelper->getEngine())->getOplogManager(); + + ASSERT_FALSE(oplogManager->isRunning_forTest()); + auto rs(harnessHelper->newOplogRecordStore()); + ASSERT_TRUE(oplogManager->isRunning_forTest()); + rs.reset(); + ASSERT_FALSE(oplogManager->isRunning_forTest()); +} + +TEST(WiredTigerRecordStoreTest, OplogDestructorOnlyStopsCorrectOplogManager) { + auto harnessHelper = newRecordStoreHarnessHelper(); + auto oplogManager = + static_cast(harnessHelper->getEngine())->getOplogManager(); + + ASSERT_FALSE(oplogManager->isRunning_forTest()); + + // Creating rs2 stops the thread for rs1 and starts it for itself, so destroying rs1 should not + // stop the thread. + auto rs1 = harnessHelper->newOplogRecordStore(); + ASSERT_TRUE(oplogManager->isRunning_forTest()); + auto rs2 = harnessHelper->newOplogRecordStore(); + ASSERT_TRUE(oplogManager->isRunning_forTest()); + rs1.reset(); + ASSERT_TRUE(oplogManager->isRunning_forTest()); + rs2.reset(); + ASSERT_FALSE(oplogManager->isRunning_forTest()); + + // Destroying rs2 first should stop the thread even though rs1 still exists + rs1 = harnessHelper->newOplogRecordStore(); + rs2 = harnessHelper->newOplogRecordStore(); + ASSERT_TRUE(oplogManager->isRunning_forTest()); + rs2.reset(); + ASSERT_FALSE(oplogManager->isRunning_forTest()); + rs1.reset(); + ASSERT_FALSE(oplogManager->isRunning_forTest()); +} + TEST(WiredTigerRecordStoreTest, CursorInActiveTxnAfterNext) { std::unique_ptr harnessHelper(newRecordStoreHarnessHelper()); std::unique_ptr rs(harnessHelper->newRecordStore()); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test_harness.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test_harness.cpp index 19945010c3f..4602e5c89c5 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test_harness.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test_harness.cpp @@ -95,27 +95,36 @@ std::unique_ptr WiredTigerHarnessHelper::newRecordStore( return _engine->getRecordStore(opCtx.get(), nss, ident, recordStoreOptions, uuid); } -std::unique_ptr WiredTigerHarnessHelper::newOplogRecordStore() { - auto ret = newOplogRecordStoreNoInit(); - ServiceContext::UniqueOperationContext opCtx(newOperationContext()); - auto oplog = static_cast(ret.get()); - _engine->getOplogManager()->start(opCtx.get(), *_engine, *oplog, _isReplSet); - return ret; -} - -std::unique_ptr WiredTigerHarnessHelper::newOplogRecordStoreNoInit() { - std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace)); +static const auto kOplogOptions = [] { RecordStore::Options oplogRecordStoreOptions; oplogRecordStoreOptions.isOplog = true; oplogRecordStoreOptions.isCapped = true; // Large enough not to exceed capped limits. oplogRecordStoreOptions.oplogMaxSize = 1024 * 1024 * 1024; + return oplogRecordStoreOptions; +}(); + +std::unique_ptr WiredTigerHarnessHelper::newOplogRecordStore() { + std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace)); ServiceContext::UniqueOperationContext opCtx(newOperationContext()); auto& provider = rss::ReplicatedStorageService::get(opCtx.get()).getPersistenceProvider(); auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get()); WriteUnitOfWork wuow(opCtx.get()); const auto res = _engine->createRecordStore( - provider, ru, NamespaceString::kRsOplogNamespace, ident, oplogRecordStoreOptions); + provider, ru, NamespaceString::kRsOplogNamespace, ident, kOplogOptions); + wuow.commit(); + return _engine->getRecordStore( + opCtx.get(), NamespaceString::kRsOplogNamespace, ident, kOplogOptions, UUID::gen()); +} + +std::unique_ptr WiredTigerHarnessHelper::newOplogRecordStoreNoInit() { + std::string ident = _identForNs(redactTenant(NamespaceString::kRsOplogNamespace)); + ServiceContext::UniqueOperationContext opCtx(newOperationContext()); + auto& provider = rss::ReplicatedStorageService::get(opCtx.get()).getPersistenceProvider(); + auto& ru = *shard_role_details::getRecoveryUnit(opCtx.get()); + WriteUnitOfWork wuow(opCtx.get()); + const auto res = _engine->createRecordStore( + provider, ru, NamespaceString::kRsOplogNamespace, ident, kOplogOptions); wuow.commit(); // Cannot use 'getRecordStore', which automatically starts the the oplog manager. @@ -127,7 +136,7 @@ std::unique_ptr WiredTigerHarnessHelper::newOplogRecordStoreNoInit( .engineName = std::string{kWiredTigerEngineName}, .inMemory = false, // Large enough not to exceed capped limits. - .oplogMaxSize = oplogRecordStoreOptions.oplogMaxSize, + .oplogMaxSize = kOplogOptions.oplogMaxSize, .sizeStorer = nullptr, .tracksSizeAdjustments = true, .isLogged = true,