Revert "SERVER-113982 Fix DataWithLockFreeReads synchronization" (#45990)

GitOrigin-RevId: 666680178270a61811279b6ae0657d54490b42e9
This commit is contained in:
Vishnu K 2026-01-08 10:25:50 -05:00 committed by MongoDB Bot
parent 7d5698f1f3
commit ecdc668dfd
13 changed files with 22 additions and 637 deletions

View File

@ -42,9 +42,6 @@ IndexBuildTest.waitForIndexBuildToStart(secondaryDB, collName2);
IndexBuildTest.assertIndexesSoon(secondaryDB.getCollection(collName1), 2, ["_id_"], ["a_1"], {includeBuildUUIDs: true});
IndexBuildTest.assertIndexesSoon(secondaryDB.getCollection(collName2), 2, ["_id_"], ["a_1"], {includeBuildUUIDs: true});
// Make sure all writes are part of a committed snapshot on the secondary.
rst.awaitLastOpCommitted();
jsTestLog("Shutting down secondary");
rst.stop(secondary);

View File

@ -1121,7 +1121,6 @@ replication.replication_coordinator:
- src/mongo/db/command_can_run_here*
- src/mongo/db/repl/*always_allow_non_local_writes*
- src/mongo/db/repl/*isself*
- src/mongo/db/repl/data_with_lock_free_reads*
- src/mongo/db/repl/replication_waiter_list_bm.cpp
- src/mongo/db/modules/atlas/src/disagg_storage/replication_coordinator*

View File

@ -1739,7 +1739,6 @@ mongo_cc_unit_test(
"isself_test.cpp",
"member_config_test.cpp",
"optime_extract_test.cpp",
"optime_test.cpp",
"primary_only_service_test.cpp",
"primary_only_service_util_test.cpp",
"read_concern_args_test.cpp",
@ -1859,14 +1858,6 @@ mongo_cc_unit_test(
],
)
mongo_cc_unit_test(
name = "data_with_lock_free_reads_test",
srcs = [
"data_with_lock_free_reads_test.cpp",
],
tags = ["mongo_unittest_seventh_group"],
)
mongo_cc_unit_test(
name = "oplog_application_test",
srcs = [
@ -2201,14 +2192,6 @@ mongo_cc_benchmark(
],
)
mongo_cc_benchmark(
name = "data_with_lock_free_reads_bm",
srcs = [
"data_with_lock_free_reads_bm.cpp",
],
tags = ["repl_bm"],
)
# Auto-generated exports for moved files
exports_files([
"local_oplog_info.cpp",

View File

@ -1,112 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/platform/compiler.h"
#include "mongo/platform/rwmutex.h"
#include "mongo/stdx/new.h"
#include "mongo/util/concurrency/with_lock.h"
#include <cstdint>
#include <type_traits>
namespace mongo {
namespace repl {
template <size_t N>
using DataWithLockFreeReadsBuffer MONGO_MOD_PUB = std::array<uint64_t, N>;
template <typename T>
concept SerializableForLockFreeReads = requires(
T obj, DataWithLockFreeReadsBuffer<T::serializationForLockFreeReadsU64Count> serialized) {
{ T::serializationForLockFreeReadsU64Count } -> std::same_as<const size_t&>;
{ obj.serializeForLockFreeReads() };
{ T::parseForLockFreeReads(serialized) } -> std::same_as<T>;
};
/**
* Provides lock-free reads via a SeqLock for arbitrarily sized values, at
* the expense of possibly having to retry reads when the underlying atomics
* are updated while reading. Writes are not lock-free, and writers need to
* hold a mutex while updating the values.
*
* This is modeled in CppMem in data_with_lock_free_reads_X.cppmem. For more
* about CppMem, see http://svr-pes20-cppmem.cl.cam.ac.uk/cppmem/index.html.
*
* Prefer using other synchronization primitives over this one when possible.
*/
template <SerializableForLockFreeReads T>
class MONGO_MOD_PUB DataWithLockFreeReads {
public:
using value_type = T;
static constexpr size_t N = value_type::serializationForLockFreeReadsU64Count;
using Buffer = DataWithLockFreeReadsBuffer<N>;
void store(WithLock lk, const value_type& datum) noexcept {
auto buf = datum.serializeForLockFreeReads();
// Since writers must hold the mutex, we can avoid
// what would otherwise be an acquire load here.
auto curGen = _generation.loadRelaxed();
invariant(!(curGen & 1));
_generation.storeRelaxed(curGen + 1);
for (size_t i = 0; i < N; ++i) {
_buffer[i].store(buf[i]); // release
}
_generation.store(curGen + 2); // release
_generation.notifyAll();
}
value_type load() const noexcept {
uint64_t initialGen;
Buffer serialized;
do {
initialGen = _generation.load(); // acquire
while (MONGO_unlikely(initialGen & 1)) {
initialGen = _generation.wait(initialGen); // acquire
}
for (size_t i = 0; i < N; ++i) {
serialized[i] = _buffer[i].load(); // acquire
}
} while (initialGen != _generation.loadRelaxed());
return value_type::parseForLockFreeReads(serialized);
}
private:
static_assert(value_type::serializationForLockFreeReadsU64Count > 1,
"Anything that can fit in a single Atomic should use that instead");
alignas(stdx::hardware_destructive_interference_size) Atomic<uint64_t> _buffer[N];
// Low bit is 1 while there is a write in progress. Remaining bits are a
// count of completed modifications to _buffer.
WaitableAtomic<uint64_t> _generation;
};
} // namespace repl
} // namespace mongo

View File

@ -1,30 +0,0 @@
int main() {
atomic_int gen = 0;
atomic_int c0 = 0;
atomic_int c1 = 0;
{{{
{
// Write the value (33, 35)
// After this write, gen is 2.
gen.load(mo_relaxed).readsvalue(0);
gen.store(1, mo_relaxed);
c0.store(33, mo_release);
c1.store(35, mo_release);
gen.store(2, mo_release);
// Write the value (37, 39)
// After this write, gen is 4.
gen.load(mo_relaxed).readsvalue(2);
gen.store(3, mo_relaxed);
c0.store(37, mo_release);
c1.store(39, mo_release);
gen.store(4, mo_release);
} ||| {
// It would be incorrect to see (0, 35) before the first write.
gen.load(mo_acquire).readsvalue(0);
c0.load(mo_acquire).readsvalue(0);
c1.load(mo_acquire).readsvalue(35);
gen.load(mo_relaxed).readsvalue(0);
}
}}};
return 0;
}

View File

@ -1,30 +0,0 @@
int main() {
atomic_int gen = 0;
atomic_int c0 = 0;
atomic_int c1 = 0;
{{{
{
// Write the value (33, 35)
// After this write, gen is 2.
gen.load(mo_relaxed).readsvalue(0);
gen.store(1, mo_relaxed);
c0.store(33, mo_release);
c1.store(35, mo_release);
gen.store(2, mo_release);
// Write the value (37, 39)
// After this write, gen is 4.
gen.load(mo_relaxed).readsvalue(2);
gen.store(3, mo_relaxed);
c0.store(37, mo_release);
c1.store(39, mo_release);
gen.store(4, mo_release);
} ||| {
// It would be incorrect to see (33, 39) after the second write.
gen.load(mo_acquire).readsvalue(4);
c0.load(mo_acquire).readsvalue(33);
c1.load(mo_acquire).readsvalue(39);
gen.load(mo_relaxed).readsvalue(4);
}
}}};
return 0;
}

View File

@ -1,138 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/data_with_lock_free_reads.h"
#include <random>
#include <benchmark/benchmark.h>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
namespace mongo {
namespace repl {
AtomicWord<long> mismatches;
template <size_t N>
struct Data {
static constexpr size_t serializationForLockFreeReadsU64Count = N;
using Buffer = DataWithLockFreeReadsBuffer<serializationForLockFreeReadsU64Count>;
Buffer data;
Data() {
data.fill(0);
}
explicit Data(uint64_t value) {
data.fill(value);
}
static Data parseForLockFreeReads(Buffer& data) {
auto d = Data();
d.data = data;
return d;
}
Buffer serializeForLockFreeReads() const {
return data;
}
};
static std::random_device rd;
static std::mt19937_64 gen(rd());
static std::uniform_int_distribution<uint64_t> dist(0, 99);
template <typename sync>
void BM_LockFreeReads(benchmark::State& state, sync& test, uint64_t conflictChance) {
if (state.thread_index == 0) {
mismatches.store(0);
test.store(WithLock::withoutLock(), Data<sync::N>(0));
}
long i = 1;
for (auto keepRunning : state) {
auto res = dist(gen);
if (state.thread_index == 0 && res < conflictChance) {
test.store(WithLock::withoutLock(), Data<sync::N>(i));
} else {
auto data = test.load();
if (data.data[0] != data.data[data.data.size() - 1]) {
mismatches.addAndFetch(1);
}
benchmark::DoNotOptimize(data);
}
i++;
}
if (state.thread_index == 0) {
auto m = mismatches.load();
// Remove the following line to see how common or rare mismatches are.
invariant(m == 0);
if (m != 0) {
benchmark::Counter counter;
counter.value = mismatches.load();
state.counters["mismatches"] = counter;
}
}
}
DataWithLockFreeReads<Data<2>> seqlock_2;
DataWithLockFreeReads<Data<4>> seqlock_4;
DataWithLockFreeReads<Data<8>> seqlock_8;
void BM_LockFreeReadsSeqLock(benchmark::State& state) {
auto dataWidth = state.range(0);
auto conflictChance = state.range(1);
switch (dataWidth) {
case 2:
BM_LockFreeReads(state, seqlock_2, conflictChance);
break;
case 4:
BM_LockFreeReads(state, seqlock_4, conflictChance);
break;
case 8:
BM_LockFreeReads(state, seqlock_8, conflictChance);
break;
}
}
BENCHMARK(BM_LockFreeReadsSeqLock)
->ArgsProduct({
{2, 4, 8},
{1, 5, 10, 20, 50, 80, 100},
})
->ArgNames({"dataWidth", "conflictChance"})
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(32)
->Threads(64);
} // namespace repl
} // namespace mongo

View File

@ -1,138 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/data_with_lock_free_reads.h"
#include "mongo/stdx/new.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/with_lock.h"
#include <cstdint>
#include <random>
#include <boost/functional/hash.hpp>
namespace mongo {
namespace repl {
template <size_t N>
struct Data {
static constexpr size_t serializationForLockFreeReadsU64Count = N;
using Buffer = DataWithLockFreeReadsBuffer<serializationForLockFreeReadsU64Count>;
Data() {
_data.fill(0);
}
explicit Data(uint64_t value) {
_data.fill(value);
}
static Data parseForLockFreeReads(Buffer& data) {
auto d = Data();
d._data = data;
return d;
}
Buffer serializeForLockFreeReads() const {
return _data;
}
const Buffer& getData() const {
return _data;
}
private:
Buffer _data;
};
TEST(DataWithLockFreeReadsTest, DataSerDeser) {
const size_t dataSize = 4;
auto d0 = Data<dataSize>(7);
auto buf = d0.serializeForLockFreeReads();
auto d1 = Data<dataSize>::parseForLockFreeReads(buf);
ASSERT_EQ(d0.getData(), d1.getData());
}
TEST(DataWithLockFreeReadsTest, ShouldStoreAndLoad) {
const size_t dataSize = 4;
DataWithLockFreeReads<Data<dataSize>> dc;
auto last = Data<dataSize>(1);
dc.store(WithLock::withoutLock(), last);
ASSERT_EQ(last.getData(), dc.load().getData());
last = Data<dataSize>(2);
dc.store(WithLock::withoutLock(), last);
ASSERT_EQ(last.getData(), dc.load().getData());
last = Data<dataSize>(3);
dc.store(WithLock::withoutLock(), last);
last = Data<dataSize>(4);
dc.store(WithLock::withoutLock(), last);
last = Data<dataSize>(5);
dc.store(WithLock::withoutLock(), last);
ASSERT_EQ(last.getData(), dc.load().getData());
}
TEST(DataWithLockFreeReadsTest, ShouldNotSeeTornReads) {
const int numReaders = 8;
const long long totalReads = 1LL << 10;
AtomicWord<bool> stopWrites{false};
DataWithLockFreeReads<Data<4>> dc;
dc.store(WithLock::withoutLock(), Data<4>(0));
stdx::thread writer([&]() {
uint64_t j = 0;
while (!stopWrites.load()) {
// Simulate writers that are doing something else sometimes, not
// just hot-spinning.
sleepmillis(1);
dc.store(WithLock::withoutLock(), Data<4>(j++));
}
});
std::vector<stdx::thread> readers;
for (int i = 0; i < numReaders; ++i) {
readers.emplace_back([&]() {
auto readsRemaining = totalReads;
auto lastValue = dc.load();
while (readsRemaining > 0) {
auto d = dc.load();
ASSERT_EQ(d.getData()[0], d.getData()[d.getData().size() - 1]);
if (d.getData() != lastValue.getData()) {
--readsRemaining;
}
lastValue = d;
}
});
}
for (auto& thread : readers) {
thread.join();
}
stopWrites.store(true);
writer.join();
}
} // namespace repl
} // namespace mongo

View File

@ -34,7 +34,6 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/data_with_lock_free_reads.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
@ -56,13 +55,6 @@ namespace MONGO_MOD_PUB repl {
*/
class MONGO_MOD_PUB OpTime {
public:
static constexpr size_t serializationForLockFreeReadsU64Count = 2;
private:
using DataWithLockFreeReadsBufferT =
DataWithLockFreeReadsBuffer<serializationForLockFreeReadsU64Count>;
public:
static constexpr auto kTermFieldName = "t"_sd;
static constexpr auto kTimestampFieldName = "ts"_sd;
@ -115,10 +107,6 @@ public:
static OpTime parse(const BSONObj& obj);
static OpTime parse(const BSONElement& elem);
static OpTime parseForLockFreeReads(DataWithLockFreeReadsBufferT& serialized) {
return OpTime(Timestamp(serialized[0]), static_cast<long long>(serialized[1]));
}
std::string toString() const;
// Returns true when this OpTime is not yet initialized.
@ -187,23 +175,12 @@ public:
void appendAsQuery(BSONObjBuilder* builder) const;
BSONObj asQuery() const;
DataWithLockFreeReadsBufferT serializeForLockFreeReads() const {
return {_timestamp.asULL(), static_cast<uint64_t>(_term)};
}
private:
Timestamp _timestamp;
long long _term = kInitialTerm;
};
class MONGO_MOD_PUB OpTimeAndWallTime {
public:
static constexpr size_t serializationForLockFreeReadsU64Count = 3;
private:
using DataWithLockFreeReadsBufferT =
DataWithLockFreeReadsBuffer<serializationForLockFreeReadsU64Count>;
public:
static constexpr auto kWallClockTimeFieldName = "wall"_sd;
@ -225,12 +202,6 @@ public:
OpTimeAndWallTime(OpTime optime, Date_t wall) : opTime(optime), wallTime(wall) {}
static OpTimeAndWallTime parseForLockFreeReads(DataWithLockFreeReadsBufferT& serialized) {
return OpTimeAndWallTime(
OpTime(Timestamp(serialized[0]), static_cast<long long>(serialized[1])),
Date_t::fromMillisSinceEpoch(serialized[2]));
}
inline bool operator==(const OpTimeAndWallTime& rhs) const {
return opTime == rhs.opTime && wallTime == rhs.wallTime;
}
@ -241,13 +212,8 @@ public:
std::string toString() const {
return opTime.toString() + ", " + wallTime.toString();
}
DataWithLockFreeReadsBufferT serializeForLockFreeReads() const {
return {opTime.getTimestamp().asULL(),
static_cast<uint64_t>(opTime.getTerm()),
static_cast<uint64_t>(wallTime.toMillisSinceEpoch())};
}
};
std::ostream& operator<<(std::ostream& out, const OpTimeAndWallTime& opTime);
// A convenience class for holding both a Timestamp and a Date_t.

View File

@ -1,53 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/optime.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace repl {
TEST(OpTimeTest, OpTimeSerDeser) {
auto o0 = OpTime(Timestamp(1357913579), 12);
auto buf = o0.serializeForLockFreeReads();
auto o1 = OpTime::parseForLockFreeReads(buf);
ASSERT_EQ(o0, o1);
}
TEST(OpTimeTest, OpTimeAndWallTimeSerDeser) {
auto o = OpTime(Timestamp(123456789), 17);
auto w = Date_t::fromMillisSinceEpoch(987654321);
auto owt0 = OpTimeAndWallTime(o, w);
auto buf = owt0.serializeForLockFreeReads();
auto owt1 = OpTimeAndWallTime::parseForLockFreeReads(buf);
ASSERT_EQ(owt0, owt1);
}
} // namespace repl
} // namespace mongo

View File

@ -61,7 +61,6 @@
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/repl/always_allow_non_local_writes.h"
#include "mongo/db/repl/check_quorum_for_config_change.h"
#include "mongo/db/repl/clang_checked/mutex.h"
#include "mongo/db/repl/collection_utils.h"
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/data_replicator_external_state_initial_sync.h"
@ -453,6 +452,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
_rsConfigState(kConfigPreStart),
_rsConfig(std::make_shared<ReplSetConfig>()), // Initialize with empty configuration.
_selfIndex(-1),
_sleptLastElection(false),
_readWriteAbility(std::make_unique<ReadWriteAbility>(!settings.isReplSet())),
_replicationProcess(replicationProcess),
_storage(storage),
@ -477,21 +477,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
invariant(_service);
{
clang_checked::lock_guard lk(_mutex);
auto o0 = OpTimeAndWallTime(OpTime(), Date_t::min());
_myLastAppliedOpTimeAndWallTimeCached.store(lk, o0);
auto o1 = OpTimeAndWallTime(OpTime(), Date_t::min());
_myLastCommittedOpTimeAndWallTimeCached.store(lk, o1);
auto o2 = OpTimeAndWallTime(OpTime(), Date_t::min());
_myLastDurableOpTimeAndWallTimeCached.store(lk, o2);
auto o3 = OpTimeAndWallTime(OpTime(), Date_t::min());
_myLastWrittenOpTimeAndWallTimeCached.store(lk, o3);
_currentCommittedSnapshotCached.store(lk, OpTime());
}
if (!_settings.isReplSet()) {
return;
}
@ -562,7 +547,8 @@ int64_t ReplicationCoordinatorImpl::getLastHorizonChange_forTest() const {
}
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
return _currentCommittedSnapshotCached.load();
stdx::lock_guard lk(_mutex);
return _getCurrentCommittedSnapshotOpTime(lk);
}
OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime(WithLock) const {
@ -1678,7 +1664,6 @@ void ReplicationCoordinatorImpl::_setMyLastWrittenOpTimeAndWallTime(
_topCoord->setMyLastWrittenOpTimeAndWallTime(
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
_myLastWrittenOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
// Signal anyone waiting on optime changes.
_lastWrittenOpTimeWaiterList.setValueIf(
@ -1702,7 +1687,6 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime(
// transaction, which may be delayed, but this should be fine.
_topCoord->setMyLastDurableOpTimeAndWallTime(
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
_myLastDurableOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
// If we are using durable times to calculate the commit level, update it now.
if (_rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal()) {
_updateLastCommittedOpTimeAndWallTime(lk);
@ -1752,18 +1736,12 @@ bool ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTimeForward(
}
OpTime ReplicationCoordinatorImpl::getMyLastWrittenOpTime() const {
return _myLastWrittenOpTimeAndWallTimeCached.load().opTime;
stdx::lock_guard lock(_mutex);
return _getMyLastWrittenOpTime(lock);
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime(
bool rollbackSafe) const {
// If !rollbackSafe, then we access only 1 member, so we don't need the
// lock.
if (!rollbackSafe) {
return _myLastWrittenOpTimeAndWallTimeCached.load();
}
// Otherwise, we must take the lock since we might touch both _memberState
// and _lastWritten.
stdx::lock_guard lock(_mutex);
if (rollbackSafe && _getMemberState(lock).rollback()) {
return {};
@ -1772,19 +1750,23 @@ OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime(
}
OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
return getMyLastAppliedOpTimeAndWallTime().opTime;
stdx::lock_guard lock(_mutex);
return _getMyLastAppliedOpTime(lock);
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const {
return _myLastAppliedOpTimeAndWallTimeCached.load();
stdx::lock_guard lock(_mutex);
return _getMyLastAppliedOpTimeAndWallTime(lock);
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const {
return _myLastDurableOpTimeAndWallTimeCached.load();
stdx::lock_guard lock(_mutex);
return _getMyLastDurableOpTimeAndWallTime(lock);
}
OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
return getMyLastDurableOpTimeAndWallTime().opTime;
stdx::lock_guard lock(_mutex);
return _getMyLastDurableOpTime(lock);
}
Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
@ -4989,8 +4971,6 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSourceOnError
void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) {
if (_topCoord->updateLastCommittedOpTimeAndWallTime()) {
_setStableTimestampForStorage(lk);
auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime();
_myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime);
}
}
@ -5206,8 +5186,6 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
bool forInitiate) {
if (_topCoord->advanceLastCommittedOpTimeAndWallTime(
committedOpTimeAndWallTime, fromSyncSource, forInitiate)) {
auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime();
_myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime);
if (_getMemberState(lk).arbiter()) {
// Arbiters do not store replicated data, so we consider their data trivially
// consistent.
@ -5222,11 +5200,13 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
}
OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
return getLastCommittedOpTimeAndWallTime().opTime;
stdx::unique_lock lk(_mutex);
return _topCoord->getLastCommittedOpTime();
}
OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const {
return _myLastCommittedOpTimeAndWallTimeCached.load();
stdx::unique_lock lk(_mutex);
return _topCoord->getLastCommittedOpTimeAndWallTime();
}
Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
@ -5639,7 +5619,6 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot(WithLock lk,
if (MONGO_unlikely(disableSnapshotting.shouldFail()))
return false;
_currentCommittedSnapshot = newCommittedSnapshot;
_currentCommittedSnapshotCached.store(lk, newCommittedSnapshot);
_currentCommittedSnapshotCond.notify_all();
_externalState->updateCommittedSnapshot(newCommittedSnapshot);
@ -5656,9 +5635,8 @@ void ReplicationCoordinatorImpl::clearCommittedSnapshot() {
_clearCommittedSnapshot(lock);
}
void ReplicationCoordinatorImpl::_clearCommittedSnapshot(WithLock lk) {
void ReplicationCoordinatorImpl::_clearCommittedSnapshot(WithLock) {
_currentCommittedSnapshot = boost::none;
_currentCommittedSnapshotCached.store(lk, OpTime());
_externalState->clearCommittedSnapshot();
}

View File

@ -43,7 +43,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/auto_get_rstl_for_stepup_stepdown.h"
#include "mongo/db/repl/data_with_lock_free_reads.h"
#include "mongo/db/repl/delayable_timeout_callback.h"
#include "mongo/db/repl/hello/hello_response.h"
#include "mongo/db/repl/initial_sync/initial_syncer.h"
@ -214,9 +213,6 @@ public:
const ReplSettings& getSettings() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMemberState(WithLock).
MemberState getMemberState() const override;
std::vector<MemberData> getMemberData() const override;
@ -292,34 +288,13 @@ public:
void setMyHeartbeatMessage(const std::string& msg) override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastWrittenOpTime(WithLock).
OpTime getMyLastWrittenOpTime() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastWrittenOpTimeAndWallTime(WithLock).
OpTimeAndWallTime getMyLastWrittenOpTimeAndWallTime(bool rollbackSafe = false) const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastAppliedOpTime(WithLock).
OpTime getMyLastAppliedOpTime() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastAppliedOpTimeAndWallTime(WithLock).
OpTimeAndWallTime getMyLastAppliedOpTimeAndWallTime() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastDurableOpTime(WithLock).
OpTime getMyLastDurableOpTime() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _getMyLastDurableOpTimeAndWallTime(WithLock).
OpTimeAndWallTime getMyLastDurableOpTimeAndWallTime() const override;
Status waitUntilMajorityOpTime(OperationContext* opCtx,
@ -448,14 +423,7 @@ public:
ChangeSyncSourceAction shouldChangeSyncSourceOnError(
const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _topCoord->getLastCommittedOpTime().
OpTime getLastCommittedOpTime() const override;
// This method makes no threading guarantees since it fetches a single
// value. If you are an internal caller working with multiple protected
// members, use _topCoord->getLastCommittedOpTimeAndWallTime().
OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override;
Status processReplSetRequestVotes(OperationContext* opCtx,
@ -1857,11 +1825,6 @@ private:
// Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
std::unique_ptr<TopologyCoordinator> _topCoord; // (M)
DataWithLockFreeReads<OpTimeAndWallTime> _myLastAppliedOpTimeAndWallTimeCached; // (S)
DataWithLockFreeReads<OpTimeAndWallTime> _myLastCommittedOpTimeAndWallTimeCached; // (S)
DataWithLockFreeReads<OpTimeAndWallTime> _myLastDurableOpTimeAndWallTimeCached; // (S)
DataWithLockFreeReads<OpTimeAndWallTime> _myLastWrittenOpTimeAndWallTimeCached; // (S)
// Executor that drives the topology coordinator.
std::shared_ptr<executor::TaskExecutor> _replExecutor; // (S)
@ -1923,6 +1886,9 @@ private:
// This member's index position in the current config.
int _selfIndex; // (M)
// Whether we slept last time we attempted an election but possibly tied with other nodes.
bool _sleptLastElection; // (M)
// Used to manage the concurrency around _canAcceptNonLocalWrites and _canServeNonLocalReads.
std::unique_ptr<ReadWriteAbility> _readWriteAbility; // (S)
@ -1940,8 +1906,6 @@ private:
// When engaged, this must be <= _lastCommittedOpTime.
boost::optional<OpTime> _currentCommittedSnapshot; // (M)
DataWithLockFreeReads<OpTime> _currentCommittedSnapshotCached; // (S)
// Used to signal threads that are waiting for a new value of _currentCommittedSnapshot.
stdx::condition_variable _currentCommittedSnapshotCond; // (M)

View File

@ -411,7 +411,6 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime(
_topCoord->setMyLastAppliedOpTimeAndWallTime(
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
_myLastAppliedOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
// No need to wake up replication waiters because there should not be any replication waiters
// waiting on our own lastApplied.