diff --git a/jstests/noPassthrough/index_builds/drop_unfinished_replicated_index_build_in_standalone.js b/jstests/noPassthrough/index_builds/drop_unfinished_replicated_index_build_in_standalone.js index 45a63c86ad5..436072c297f 100644 --- a/jstests/noPassthrough/index_builds/drop_unfinished_replicated_index_build_in_standalone.js +++ b/jstests/noPassthrough/index_builds/drop_unfinished_replicated_index_build_in_standalone.js @@ -42,9 +42,6 @@ IndexBuildTest.waitForIndexBuildToStart(secondaryDB, collName2); IndexBuildTest.assertIndexesSoon(secondaryDB.getCollection(collName1), 2, ["_id_"], ["a_1"], {includeBuildUUIDs: true}); IndexBuildTest.assertIndexesSoon(secondaryDB.getCollection(collName2), 2, ["_id_"], ["a_1"], {includeBuildUUIDs: true}); -// Make sure all writes are part of a committed snapshot on the secondary. -rst.awaitLastOpCommitted(); - jsTestLog("Shutting down secondary"); rst.stop(secondary); diff --git a/modules_poc/modules.yaml b/modules_poc/modules.yaml index 9afe0fd9f44..ab0eeb8cd93 100644 --- a/modules_poc/modules.yaml +++ b/modules_poc/modules.yaml @@ -1121,7 +1121,6 @@ replication.replication_coordinator: - src/mongo/db/command_can_run_here* - src/mongo/db/repl/*always_allow_non_local_writes* - src/mongo/db/repl/*isself* - - src/mongo/db/repl/data_with_lock_free_reads* - src/mongo/db/repl/replication_waiter_list_bm.cpp - src/mongo/db/modules/atlas/src/disagg_storage/replication_coordinator* diff --git a/src/mongo/db/repl/BUILD.bazel b/src/mongo/db/repl/BUILD.bazel index 9bec65f9e5c..d2dc5573cd3 100644 --- a/src/mongo/db/repl/BUILD.bazel +++ b/src/mongo/db/repl/BUILD.bazel @@ -1739,7 +1739,6 @@ mongo_cc_unit_test( "isself_test.cpp", "member_config_test.cpp", "optime_extract_test.cpp", - "optime_test.cpp", "primary_only_service_test.cpp", "primary_only_service_util_test.cpp", "read_concern_args_test.cpp", @@ -1859,14 +1858,6 @@ mongo_cc_unit_test( ], ) -mongo_cc_unit_test( - name = "data_with_lock_free_reads_test", - srcs = [ - "data_with_lock_free_reads_test.cpp", - ], - tags = ["mongo_unittest_seventh_group"], -) - mongo_cc_unit_test( name = "oplog_application_test", srcs = [ @@ -2201,14 +2192,6 @@ mongo_cc_benchmark( ], ) -mongo_cc_benchmark( - name = "data_with_lock_free_reads_bm", - srcs = [ - "data_with_lock_free_reads_bm.cpp", - ], - tags = ["repl_bm"], -) - # Auto-generated exports for moved files exports_files([ "local_oplog_info.cpp", diff --git a/src/mongo/db/repl/data_with_lock_free_reads.h b/src/mongo/db/repl/data_with_lock_free_reads.h deleted file mode 100644 index c32b4f55707..00000000000 --- a/src/mongo/db/repl/data_with_lock_free_reads.h +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright (C) 2025-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/platform/compiler.h" -#include "mongo/platform/rwmutex.h" -#include "mongo/stdx/new.h" -#include "mongo/util/concurrency/with_lock.h" - -#include -#include - -namespace mongo { -namespace repl { -template -using DataWithLockFreeReadsBuffer MONGO_MOD_PUB = std::array; - -template -concept SerializableForLockFreeReads = requires( - T obj, DataWithLockFreeReadsBuffer serialized) { - { T::serializationForLockFreeReadsU64Count } -> std::same_as; - { obj.serializeForLockFreeReads() }; - { T::parseForLockFreeReads(serialized) } -> std::same_as; -}; - -/** - * Provides lock-free reads via a SeqLock for arbitrarily sized values, at - * the expense of possibly having to retry reads when the underlying atomics - * are updated while reading. Writes are not lock-free, and writers need to - * hold a mutex while updating the values. - * - * This is modeled in CppMem in data_with_lock_free_reads_X.cppmem. For more - * about CppMem, see http://svr-pes20-cppmem.cl.cam.ac.uk/cppmem/index.html. - * - * Prefer using other synchronization primitives over this one when possible. - */ -template -class MONGO_MOD_PUB DataWithLockFreeReads { -public: - using value_type = T; - static constexpr size_t N = value_type::serializationForLockFreeReadsU64Count; - using Buffer = DataWithLockFreeReadsBuffer; - - void store(WithLock lk, const value_type& datum) noexcept { - auto buf = datum.serializeForLockFreeReads(); - // Since writers must hold the mutex, we can avoid - // what would otherwise be an acquire load here. - auto curGen = _generation.loadRelaxed(); - invariant(!(curGen & 1)); - _generation.storeRelaxed(curGen + 1); - for (size_t i = 0; i < N; ++i) { - _buffer[i].store(buf[i]); // release - } - _generation.store(curGen + 2); // release - _generation.notifyAll(); - } - - value_type load() const noexcept { - uint64_t initialGen; - Buffer serialized; - do { - initialGen = _generation.load(); // acquire - while (MONGO_unlikely(initialGen & 1)) { - initialGen = _generation.wait(initialGen); // acquire - } - for (size_t i = 0; i < N; ++i) { - serialized[i] = _buffer[i].load(); // acquire - } - } while (initialGen != _generation.loadRelaxed()); - return value_type::parseForLockFreeReads(serialized); - } - -private: - static_assert(value_type::serializationForLockFreeReadsU64Count > 1, - "Anything that can fit in a single Atomic should use that instead"); - - alignas(stdx::hardware_destructive_interference_size) Atomic _buffer[N]; - - // Low bit is 1 while there is a write in progress. Remaining bits are a - // count of completed modifications to _buffer. - WaitableAtomic _generation; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/data_with_lock_free_reads_0.cppmem b/src/mongo/db/repl/data_with_lock_free_reads_0.cppmem deleted file mode 100644 index a55adcfbcf8..00000000000 --- a/src/mongo/db/repl/data_with_lock_free_reads_0.cppmem +++ /dev/null @@ -1,30 +0,0 @@ -int main() { - atomic_int gen = 0; - atomic_int c0 = 0; - atomic_int c1 = 0; - {{{ - { - // Write the value (33, 35) - // After this write, gen is 2. - gen.load(mo_relaxed).readsvalue(0); - gen.store(1, mo_relaxed); - c0.store(33, mo_release); - c1.store(35, mo_release); - gen.store(2, mo_release); - // Write the value (37, 39) - // After this write, gen is 4. - gen.load(mo_relaxed).readsvalue(2); - gen.store(3, mo_relaxed); - c0.store(37, mo_release); - c1.store(39, mo_release); - gen.store(4, mo_release); - } ||| { - // It would be incorrect to see (0, 35) before the first write. - gen.load(mo_acquire).readsvalue(0); - c0.load(mo_acquire).readsvalue(0); - c1.load(mo_acquire).readsvalue(35); - gen.load(mo_relaxed).readsvalue(0); - } - }}}; - return 0; -} diff --git a/src/mongo/db/repl/data_with_lock_free_reads_1.cppmem b/src/mongo/db/repl/data_with_lock_free_reads_1.cppmem deleted file mode 100644 index bd8bd6c511a..00000000000 --- a/src/mongo/db/repl/data_with_lock_free_reads_1.cppmem +++ /dev/null @@ -1,30 +0,0 @@ -int main() { - atomic_int gen = 0; - atomic_int c0 = 0; - atomic_int c1 = 0; - {{{ - { - // Write the value (33, 35) - // After this write, gen is 2. - gen.load(mo_relaxed).readsvalue(0); - gen.store(1, mo_relaxed); - c0.store(33, mo_release); - c1.store(35, mo_release); - gen.store(2, mo_release); - // Write the value (37, 39) - // After this write, gen is 4. - gen.load(mo_relaxed).readsvalue(2); - gen.store(3, mo_relaxed); - c0.store(37, mo_release); - c1.store(39, mo_release); - gen.store(4, mo_release); - } ||| { - // It would be incorrect to see (33, 39) after the second write. - gen.load(mo_acquire).readsvalue(4); - c0.load(mo_acquire).readsvalue(33); - c1.load(mo_acquire).readsvalue(39); - gen.load(mo_relaxed).readsvalue(4); - } - }}}; - return 0; -} diff --git a/src/mongo/db/repl/data_with_lock_free_reads_bm.cpp b/src/mongo/db/repl/data_with_lock_free_reads_bm.cpp deleted file mode 100644 index 7f8f179db8e..00000000000 --- a/src/mongo/db/repl/data_with_lock_free_reads_bm.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Copyright (C) 2025-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/repl/data_with_lock_free_reads.h" - -#include - -#include - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -namespace mongo { -namespace repl { - -AtomicWord mismatches; -template -struct Data { - static constexpr size_t serializationForLockFreeReadsU64Count = N; - using Buffer = DataWithLockFreeReadsBuffer; - Buffer data; - - Data() { - data.fill(0); - } - - explicit Data(uint64_t value) { - data.fill(value); - } - - static Data parseForLockFreeReads(Buffer& data) { - auto d = Data(); - d.data = data; - return d; - } - - Buffer serializeForLockFreeReads() const { - return data; - } -}; - -static std::random_device rd; -static std::mt19937_64 gen(rd()); -static std::uniform_int_distribution dist(0, 99); - -template -void BM_LockFreeReads(benchmark::State& state, sync& test, uint64_t conflictChance) { - if (state.thread_index == 0) { - mismatches.store(0); - test.store(WithLock::withoutLock(), Data(0)); - } - - long i = 1; - for (auto keepRunning : state) { - auto res = dist(gen); - if (state.thread_index == 0 && res < conflictChance) { - test.store(WithLock::withoutLock(), Data(i)); - } else { - auto data = test.load(); - if (data.data[0] != data.data[data.data.size() - 1]) { - mismatches.addAndFetch(1); - } - benchmark::DoNotOptimize(data); - } - i++; - } - - if (state.thread_index == 0) { - auto m = mismatches.load(); - // Remove the following line to see how common or rare mismatches are. - invariant(m == 0); - if (m != 0) { - benchmark::Counter counter; - counter.value = mismatches.load(); - state.counters["mismatches"] = counter; - } - } -} - -DataWithLockFreeReads> seqlock_2; -DataWithLockFreeReads> seqlock_4; -DataWithLockFreeReads> seqlock_8; - -void BM_LockFreeReadsSeqLock(benchmark::State& state) { - auto dataWidth = state.range(0); - auto conflictChance = state.range(1); - switch (dataWidth) { - case 2: - BM_LockFreeReads(state, seqlock_2, conflictChance); - break; - case 4: - BM_LockFreeReads(state, seqlock_4, conflictChance); - break; - case 8: - BM_LockFreeReads(state, seqlock_8, conflictChance); - break; - } -} - -BENCHMARK(BM_LockFreeReadsSeqLock) - ->ArgsProduct({ - {2, 4, 8}, - {1, 5, 10, 20, 50, 80, 100}, - }) - ->ArgNames({"dataWidth", "conflictChance"}) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(32) - ->Threads(64); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/data_with_lock_free_reads_test.cpp b/src/mongo/db/repl/data_with_lock_free_reads_test.cpp deleted file mode 100644 index 052acc92844..00000000000 --- a/src/mongo/db/repl/data_with_lock_free_reads_test.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Copyright (C) 2025-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/repl/data_with_lock_free_reads.h" - -#include "mongo/stdx/new.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/with_lock.h" - -#include -#include - -#include - -namespace mongo { -namespace repl { - -template -struct Data { - static constexpr size_t serializationForLockFreeReadsU64Count = N; - using Buffer = DataWithLockFreeReadsBuffer; - - Data() { - _data.fill(0); - } - - explicit Data(uint64_t value) { - _data.fill(value); - } - - static Data parseForLockFreeReads(Buffer& data) { - auto d = Data(); - d._data = data; - return d; - } - - Buffer serializeForLockFreeReads() const { - return _data; - } - - const Buffer& getData() const { - return _data; - } - -private: - Buffer _data; -}; - -TEST(DataWithLockFreeReadsTest, DataSerDeser) { - const size_t dataSize = 4; - auto d0 = Data(7); - auto buf = d0.serializeForLockFreeReads(); - auto d1 = Data::parseForLockFreeReads(buf); - ASSERT_EQ(d0.getData(), d1.getData()); -} - -TEST(DataWithLockFreeReadsTest, ShouldStoreAndLoad) { - const size_t dataSize = 4; - DataWithLockFreeReads> dc; - auto last = Data(1); - dc.store(WithLock::withoutLock(), last); - ASSERT_EQ(last.getData(), dc.load().getData()); - last = Data(2); - dc.store(WithLock::withoutLock(), last); - ASSERT_EQ(last.getData(), dc.load().getData()); - last = Data(3); - dc.store(WithLock::withoutLock(), last); - last = Data(4); - dc.store(WithLock::withoutLock(), last); - last = Data(5); - dc.store(WithLock::withoutLock(), last); - ASSERT_EQ(last.getData(), dc.load().getData()); -} - -TEST(DataWithLockFreeReadsTest, ShouldNotSeeTornReads) { - const int numReaders = 8; - const long long totalReads = 1LL << 10; - AtomicWord stopWrites{false}; - DataWithLockFreeReads> dc; - dc.store(WithLock::withoutLock(), Data<4>(0)); - stdx::thread writer([&]() { - uint64_t j = 0; - while (!stopWrites.load()) { - // Simulate writers that are doing something else sometimes, not - // just hot-spinning. - sleepmillis(1); - dc.store(WithLock::withoutLock(), Data<4>(j++)); - } - }); - std::vector readers; - for (int i = 0; i < numReaders; ++i) { - readers.emplace_back([&]() { - auto readsRemaining = totalReads; - auto lastValue = dc.load(); - while (readsRemaining > 0) { - auto d = dc.load(); - ASSERT_EQ(d.getData()[0], d.getData()[d.getData().size() - 1]); - if (d.getData() != lastValue.getData()) { - --readsRemaining; - } - lastValue = d; - } - }); - } - for (auto& thread : readers) { - thread.join(); - } - stopWrites.store(true); - writer.join(); -} -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index e2377dd3504..20937228720 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -34,7 +34,6 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/timestamp.h" -#include "mongo/db/repl/data_with_lock_free_reads.h" #include "mongo/util/modules.h" #include "mongo/util/time_support.h" @@ -56,13 +55,6 @@ namespace MONGO_MOD_PUB repl { */ class MONGO_MOD_PUB OpTime { -public: - static constexpr size_t serializationForLockFreeReadsU64Count = 2; - -private: - using DataWithLockFreeReadsBufferT = - DataWithLockFreeReadsBuffer; - public: static constexpr auto kTermFieldName = "t"_sd; static constexpr auto kTimestampFieldName = "ts"_sd; @@ -115,10 +107,6 @@ public: static OpTime parse(const BSONObj& obj); static OpTime parse(const BSONElement& elem); - static OpTime parseForLockFreeReads(DataWithLockFreeReadsBufferT& serialized) { - return OpTime(Timestamp(serialized[0]), static_cast(serialized[1])); - } - std::string toString() const; // Returns true when this OpTime is not yet initialized. @@ -187,23 +175,12 @@ public: void appendAsQuery(BSONObjBuilder* builder) const; BSONObj asQuery() const; - DataWithLockFreeReadsBufferT serializeForLockFreeReads() const { - return {_timestamp.asULL(), static_cast(_term)}; - } - private: Timestamp _timestamp; long long _term = kInitialTerm; }; class MONGO_MOD_PUB OpTimeAndWallTime { -public: - static constexpr size_t serializationForLockFreeReadsU64Count = 3; - -private: - using DataWithLockFreeReadsBufferT = - DataWithLockFreeReadsBuffer; - public: static constexpr auto kWallClockTimeFieldName = "wall"_sd; @@ -225,12 +202,6 @@ public: OpTimeAndWallTime(OpTime optime, Date_t wall) : opTime(optime), wallTime(wall) {} - static OpTimeAndWallTime parseForLockFreeReads(DataWithLockFreeReadsBufferT& serialized) { - return OpTimeAndWallTime( - OpTime(Timestamp(serialized[0]), static_cast(serialized[1])), - Date_t::fromMillisSinceEpoch(serialized[2])); - } - inline bool operator==(const OpTimeAndWallTime& rhs) const { return opTime == rhs.opTime && wallTime == rhs.wallTime; } @@ -241,13 +212,8 @@ public: std::string toString() const { return opTime.toString() + ", " + wallTime.toString(); } - - DataWithLockFreeReadsBufferT serializeForLockFreeReads() const { - return {opTime.getTimestamp().asULL(), - static_cast(opTime.getTerm()), - static_cast(wallTime.toMillisSinceEpoch())}; - } }; + std::ostream& operator<<(std::ostream& out, const OpTimeAndWallTime& opTime); // A convenience class for holding both a Timestamp and a Date_t. diff --git a/src/mongo/db/repl/optime_test.cpp b/src/mongo/db/repl/optime_test.cpp deleted file mode 100644 index 1cdb2c8e896..00000000000 --- a/src/mongo/db/repl/optime_test.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright (C) 2025-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/repl/optime.h" - -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace repl { - -TEST(OpTimeTest, OpTimeSerDeser) { - auto o0 = OpTime(Timestamp(1357913579), 12); - auto buf = o0.serializeForLockFreeReads(); - auto o1 = OpTime::parseForLockFreeReads(buf); - ASSERT_EQ(o0, o1); -} - -TEST(OpTimeTest, OpTimeAndWallTimeSerDeser) { - auto o = OpTime(Timestamp(123456789), 17); - auto w = Date_t::fromMillisSinceEpoch(987654321); - auto owt0 = OpTimeAndWallTime(o, w); - auto buf = owt0.serializeForLockFreeReads(); - auto owt1 = OpTimeAndWallTime::parseForLockFreeReads(buf); - ASSERT_EQ(owt0, owt1); -} -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index eb792766931..ba3d928f7a4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -61,7 +61,6 @@ #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/repl/always_allow_non_local_writes.h" #include "mongo/db/repl/check_quorum_for_config_change.h" -#include "mongo/db/repl/clang_checked/mutex.h" #include "mongo/db/repl/collection_utils.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/data_replicator_external_state_initial_sync.h" @@ -453,6 +452,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _rsConfigState(kConfigPreStart), _rsConfig(std::make_shared()), // Initialize with empty configuration. _selfIndex(-1), + _sleptLastElection(false), _readWriteAbility(std::make_unique(!settings.isReplSet())), _replicationProcess(replicationProcess), _storage(storage), @@ -477,21 +477,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( invariant(_service); - { - clang_checked::lock_guard lk(_mutex); - - auto o0 = OpTimeAndWallTime(OpTime(), Date_t::min()); - _myLastAppliedOpTimeAndWallTimeCached.store(lk, o0); - auto o1 = OpTimeAndWallTime(OpTime(), Date_t::min()); - _myLastCommittedOpTimeAndWallTimeCached.store(lk, o1); - auto o2 = OpTimeAndWallTime(OpTime(), Date_t::min()); - _myLastDurableOpTimeAndWallTimeCached.store(lk, o2); - auto o3 = OpTimeAndWallTime(OpTime(), Date_t::min()); - _myLastWrittenOpTimeAndWallTimeCached.store(lk, o3); - - _currentCommittedSnapshotCached.store(lk, OpTime()); - } - if (!_settings.isReplSet()) { return; } @@ -562,7 +547,8 @@ int64_t ReplicationCoordinatorImpl::getLastHorizonChange_forTest() const { } OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { - return _currentCommittedSnapshotCached.load(); + stdx::lock_guard lk(_mutex); + return _getCurrentCommittedSnapshotOpTime(lk); } OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime(WithLock) const { @@ -1678,7 +1664,6 @@ void ReplicationCoordinatorImpl::_setMyLastWrittenOpTimeAndWallTime( _topCoord->setMyLastWrittenOpTimeAndWallTime( opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed); - _myLastWrittenOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime); // Signal anyone waiting on optime changes. _lastWrittenOpTimeWaiterList.setValueIf( @@ -1702,7 +1687,6 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime( // transaction, which may be delayed, but this should be fine. _topCoord->setMyLastDurableOpTimeAndWallTime( opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed); - _myLastDurableOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime); // If we are using durable times to calculate the commit level, update it now. if (_rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal()) { _updateLastCommittedOpTimeAndWallTime(lk); @@ -1752,18 +1736,12 @@ bool ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTimeForward( } OpTime ReplicationCoordinatorImpl::getMyLastWrittenOpTime() const { - return _myLastWrittenOpTimeAndWallTimeCached.load().opTime; + stdx::lock_guard lock(_mutex); + return _getMyLastWrittenOpTime(lock); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime( bool rollbackSafe) const { - // If !rollbackSafe, then we access only 1 member, so we don't need the - // lock. - if (!rollbackSafe) { - return _myLastWrittenOpTimeAndWallTimeCached.load(); - } - // Otherwise, we must take the lock since we might touch both _memberState - // and _lastWritten. stdx::lock_guard lock(_mutex); if (rollbackSafe && _getMemberState(lock).rollback()) { return {}; @@ -1772,19 +1750,23 @@ OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime( } OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const { - return getMyLastAppliedOpTimeAndWallTime().opTime; + stdx::lock_guard lock(_mutex); + return _getMyLastAppliedOpTime(lock); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const { - return _myLastAppliedOpTimeAndWallTimeCached.load(); + stdx::lock_guard lock(_mutex); + return _getMyLastAppliedOpTimeAndWallTime(lock); } OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const { - return _myLastDurableOpTimeAndWallTimeCached.load(); + stdx::lock_guard lock(_mutex); + return _getMyLastDurableOpTimeAndWallTime(lock); } OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const { - return getMyLastDurableOpTimeAndWallTime().opTime; + stdx::lock_guard lock(_mutex); + return _getMyLastDurableOpTime(lock); } Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx, @@ -4989,8 +4971,6 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSourceOnError void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) { if (_topCoord->updateLastCommittedOpTimeAndWallTime()) { _setStableTimestampForStorage(lk); - auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime(); - _myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime); } } @@ -5206,8 +5186,6 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint( bool forInitiate) { if (_topCoord->advanceLastCommittedOpTimeAndWallTime( committedOpTimeAndWallTime, fromSyncSource, forInitiate)) { - auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime(); - _myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime); if (_getMemberState(lk).arbiter()) { // Arbiters do not store replicated data, so we consider their data trivially // consistent. @@ -5222,11 +5200,13 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint( } OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { - return getLastCommittedOpTimeAndWallTime().opTime; + stdx::unique_lock lk(_mutex); + return _topCoord->getLastCommittedOpTime(); } OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const { - return _myLastCommittedOpTimeAndWallTimeCached.load(); + stdx::unique_lock lk(_mutex); + return _topCoord->getLastCommittedOpTimeAndWallTime(); } Status ReplicationCoordinatorImpl::processReplSetRequestVotes( @@ -5639,7 +5619,6 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot(WithLock lk, if (MONGO_unlikely(disableSnapshotting.shouldFail())) return false; _currentCommittedSnapshot = newCommittedSnapshot; - _currentCommittedSnapshotCached.store(lk, newCommittedSnapshot); _currentCommittedSnapshotCond.notify_all(); _externalState->updateCommittedSnapshot(newCommittedSnapshot); @@ -5656,9 +5635,8 @@ void ReplicationCoordinatorImpl::clearCommittedSnapshot() { _clearCommittedSnapshot(lock); } -void ReplicationCoordinatorImpl::_clearCommittedSnapshot(WithLock lk) { +void ReplicationCoordinatorImpl::_clearCommittedSnapshot(WithLock) { _currentCommittedSnapshot = boost::none; - _currentCommittedSnapshotCached.store(lk, OpTime()); _externalState->clearCommittedSnapshot(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 995a7d5b5aa..dde18a7df04 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -43,7 +43,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/auto_get_rstl_for_stepup_stepdown.h" -#include "mongo/db/repl/data_with_lock_free_reads.h" #include "mongo/db/repl/delayable_timeout_callback.h" #include "mongo/db/repl/hello/hello_response.h" #include "mongo/db/repl/initial_sync/initial_syncer.h" @@ -214,9 +213,6 @@ public: const ReplSettings& getSettings() const override; - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMemberState(WithLock). MemberState getMemberState() const override; std::vector getMemberData() const override; @@ -292,34 +288,13 @@ public: void setMyHeartbeatMessage(const std::string& msg) override; - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastWrittenOpTime(WithLock). OpTime getMyLastWrittenOpTime() const override; - - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastWrittenOpTimeAndWallTime(WithLock). OpTimeAndWallTime getMyLastWrittenOpTimeAndWallTime(bool rollbackSafe = false) const override; - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastAppliedOpTime(WithLock). OpTime getMyLastAppliedOpTime() const override; - - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastAppliedOpTimeAndWallTime(WithLock). OpTimeAndWallTime getMyLastAppliedOpTimeAndWallTime() const override; - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastDurableOpTime(WithLock). OpTime getMyLastDurableOpTime() const override; - - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _getMyLastDurableOpTimeAndWallTime(WithLock). OpTimeAndWallTime getMyLastDurableOpTimeAndWallTime() const override; Status waitUntilMajorityOpTime(OperationContext* opCtx, @@ -448,14 +423,7 @@ public: ChangeSyncSourceAction shouldChangeSyncSourceOnError( const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override; - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _topCoord->getLastCommittedOpTime(). OpTime getLastCommittedOpTime() const override; - - // This method makes no threading guarantees since it fetches a single - // value. If you are an internal caller working with multiple protected - // members, use _topCoord->getLastCommittedOpTimeAndWallTime(). OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; Status processReplSetRequestVotes(OperationContext* opCtx, @@ -1857,11 +1825,6 @@ private: // Pointer to the TopologyCoordinator owned by this ReplicationCoordinator. std::unique_ptr _topCoord; // (M) - DataWithLockFreeReads _myLastAppliedOpTimeAndWallTimeCached; // (S) - DataWithLockFreeReads _myLastCommittedOpTimeAndWallTimeCached; // (S) - DataWithLockFreeReads _myLastDurableOpTimeAndWallTimeCached; // (S) - DataWithLockFreeReads _myLastWrittenOpTimeAndWallTimeCached; // (S) - // Executor that drives the topology coordinator. std::shared_ptr _replExecutor; // (S) @@ -1923,6 +1886,9 @@ private: // This member's index position in the current config. int _selfIndex; // (M) + // Whether we slept last time we attempted an election but possibly tied with other nodes. + bool _sleptLastElection; // (M) + // Used to manage the concurrency around _canAcceptNonLocalWrites and _canServeNonLocalReads. std::unique_ptr _readWriteAbility; // (S) @@ -1940,8 +1906,6 @@ private: // When engaged, this must be <= _lastCommittedOpTime. boost::optional _currentCommittedSnapshot; // (M) - DataWithLockFreeReads _currentCommittedSnapshotCached; // (S) - // Used to signal threads that are waiting for a new value of _currentCommittedSnapshot. stdx::condition_variable _currentCommittedSnapshotCond; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_catchup.cpp b/src/mongo/db/repl/replication_coordinator_impl_catchup.cpp index 7c50e1cfdc5..8589e597988 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_catchup.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_catchup.cpp @@ -411,7 +411,6 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime( _topCoord->setMyLastAppliedOpTimeAndWallTime( opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed); - _myLastAppliedOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime); // No need to wake up replication waiters because there should not be any replication waiters // waiting on our own lastApplied.