diff --git a/jstests/multiVersion/genericSetFCVUsage/fcv_core/3_upgrade_replset.js b/jstests/multiVersion/genericSetFCVUsage/fcv_core/3_upgrade_replset.js index 4a38b57b7de..589c2df1a2e 100644 --- a/jstests/multiVersion/genericSetFCVUsage/fcv_core/3_upgrade_replset.js +++ b/jstests/multiVersion/genericSetFCVUsage/fcv_core/3_upgrade_replset.js @@ -71,6 +71,17 @@ for (let oldVersion of ["last-lts", "last-continuous"]) { // Allow more valid writes to go through sleep(10 * 1000); + // Newer server binaries should always have gossiped their lastStableRecoveryTimestamp to each + // other via heartbeats. The presence of this information won't negatively affect heartbeats to + // older server versions. + assert.soon(() => { + const rsStatus = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1})); + return ( + rsStatus.members[0].lastStableRecoveryTimestamp !== undefined && + rsStatus.members[1].lastStableRecoveryTimestamp !== undefined + ); + }); + jsTest.log("Downgrading replica set from latest to " + oldVersion); rst.upgradeSet({binVersion: oldVersion}); jsTest.log("Replica set downgraded."); diff --git a/src/mongo/db/repl/member_data.cpp b/src/mongo/db/repl/member_data.cpp index a1f5b3dd24f..d5edd3608df 100644 --- a/src/mongo/db/repl/member_data.cpp +++ b/src/mongo/db/repl/member_data.cpp @@ -100,6 +100,10 @@ MemberData::HeartbeatChanges MemberData::setUpValues(Date_t now, _configVersion = hbResponse.getConfigVersion(); } + if (hbResponse.hasLastStableRecoveryTimestamp()) { + _lastStableRecoveryTimestamp = hbResponse.getLastStableRecoveryTimestamp(); + } + _lastResponse = std::move(hbResponse); return {opTimeAdvanced, configChanged, memberStateChanged}; diff --git a/src/mongo/db/repl/member_data.h b/src/mongo/db/repl/member_data.h index f87fbbc2777..f881b975078 100644 --- a/src/mongo/db/repl/member_data.h +++ b/src/mongo/db/repl/member_data.h @@ -42,6 +42,8 @@ #include +#include + namespace MONGO_MOD_PUB mongo { namespace repl { @@ -202,6 +204,10 @@ public: return _priorityPort; } + boost::optional getLastStableRecoveryTimestamp() const { + return _lastStableRecoveryTimestamp; + } + /* * Returns true if the last heartbeat data explicilty stated that the node is not electable. */ @@ -394,6 +400,9 @@ private: // Optional priority port for this member. boost::optional _priorityPort; + + // Last known lastStableRecoveryTimestamp gossiped from this member via heartbeat. + boost::optional _lastStableRecoveryTimestamp; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.cpp b/src/mongo/db/repl/repl_set_heartbeat_response.cpp index 84f7ddd22e3..16642ced5ae 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response.cpp @@ -67,6 +67,7 @@ const std::string kSyncSourceFieldName = "syncingTo"; const std::string kTermFieldName = "term"; const std::string kTimestampFieldName = "ts"; const std::string kIsElectableFieldName = "electable"; +const std::string kLastStableRecoveryTimestampFieldName = "lastStableRecoveryTimestamp"; } // namespace @@ -121,6 +122,9 @@ void ReplSetHeartbeatResponse::addToBSON(BSONObjBuilder* builder) const { if (_electableSet) { *builder << kIsElectableFieldName << _electable; } + if (_lastStableRecoveryTimestamp) { + builder->append(kLastStableRecoveryTimestampFieldName, *_lastStableRecoveryTimestamp); + } } BSONObj ReplSetHeartbeatResponse::toBSON() const { @@ -295,6 +299,20 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term) _syncingTo = HostAndPort(syncingToElement.String()); } + const BSONElement lastStableRecoveryTimestampElement = + doc[kLastStableRecoveryTimestampFieldName]; + if (!lastStableRecoveryTimestampElement.eoo()) { + if (lastStableRecoveryTimestampElement.type() != BSONType::timestamp) { + return Status(ErrorCodes::TypeMismatch, + str::stream() + << "Expected \"" << kLastStableRecoveryTimestampFieldName + << "\" field in response to replSetHeartbeat to have type Timestamp, " + "but found " + << typeName(lastStableRecoveryTimestampElement.type())); + } + _lastStableRecoveryTimestamp = lastStableRecoveryTimestampElement.timestamp(); + } + const BSONElement rsConfigElement = doc[kConfigFieldName]; if (rsConfigElement.eoo()) { _configSet = false; diff --git a/src/mongo/db/repl/repl_set_heartbeat_response.h b/src/mongo/db/repl/repl_set_heartbeat_response.h index 871ff2007df..f594555be52 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response.h +++ b/src/mongo/db/repl/repl_set_heartbeat_response.h @@ -43,6 +43,8 @@ #include +#include + namespace mongo { class BSONObj; @@ -131,6 +133,12 @@ public: return _electableSet; } bool isElectable() const; + bool hasLastStableRecoveryTimestamp() const { + return _lastStableRecoveryTimestamp.has_value(); + } + Timestamp getLastStableRecoveryTimestamp() const { + return *_lastStableRecoveryTimestamp; + } /** * Sets _setName to "name". @@ -215,6 +223,9 @@ public: _electableSet = true; _electable = electable; } + void setLastStableRecoveryTimestamp(Timestamp ts) { + _lastStableRecoveryTimestamp = ts; + } private: bool _electionTimeSet = false; @@ -249,6 +260,8 @@ private: bool _electableSet = false; bool _electable = false; + + boost::optional _lastStableRecoveryTimestamp; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp index ed52899b648..4c2277ef6e5 100644 --- a/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp +++ b/src/mongo/db/repl/repl_set_heartbeat_response_test.cpp @@ -57,6 +57,7 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { Date_t appliedWallTime = Date_t() + Seconds(appliedOpTime.getSecs()); OpTime writtenOpTime = OpTime(Timestamp(50), 0); Date_t writtenWallTime = Date_t() + Seconds(writtenOpTime.getSecs()); + Timestamp lastStableRecoveryTimestamp = Timestamp(9); ASSERT_EQUALS(false, hbResponse.hasState()); ASSERT_EQUALS(false, hbResponse.hasElectionTime()); ASSERT_EQUALS(false, hbResponse.hasDurableOpTime()); @@ -94,6 +95,8 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { // set writtenOpTime hbResponse.setWrittenOpTimeAndWallTime({writtenOpTime, writtenWallTime}); fieldsSet += 2; // OpTime and WallTime are separate fields + hbResponse.setLastStableRecoveryTimestamp(lastStableRecoveryTimestamp); + ++fieldsSet; // set config ReplSetConfig config; hbResponse.setConfig(config); @@ -122,6 +125,7 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) { ASSERT_EQUALS(appliedWallTime, hbResponse.getAppliedOpTimeAndWallTime().wallTime); ASSERT_EQUALS(writtenOpTime, hbResponse.getWrittenOpTime()); ASSERT_EQUALS(writtenWallTime, hbResponse.getWrittenOpTimeAndWallTime().wallTime); + ASSERT_EQUALS(lastStableRecoveryTimestamp, hbResponse.getLastStableRecoveryTimestamp()); ASSERT_EQUALS(config.toBSON().toString(), hbResponse.getConfig().toBSON().toString()); hbResponseObj = hbResponse.toBSON(); @@ -260,6 +264,31 @@ TEST(ReplSetHeartbeatResponse, InitializeNoWrittenWallTime) { ASSERT_EQUALS("", result.reason()); } +TEST(ReplSetHeartbeatResponse, InitializeWrongLastStableRecoveryTimestampType) { + ReplSetHeartbeatResponse hbResponse; + BSONObj initializerObj = + BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() + << "durableWallTime" << Date_t() + Seconds(100) << "opTime" + << OpTime(Timestamp(100, 0), 0).toBSON() << "wallTime" << Date_t() + Seconds(100) + << "v" << 2 << "lastStableRecoveryTimestamp" << 12345LL); + Status result = hbResponse.initialize(initializerObj, 0); + ASSERT_EQUALS(ErrorCodes::TypeMismatch, result); + ASSERT_STRING_CONTAINS(result.reason(), "Expected \"lastStableRecoveryTimestamp\" field"); +} + +TEST(ReplSetHeartbeatResponse, InitializeNoLastStableRecoveryTimestamp) { + // Verifies backward compatibility: responses which don't report lastStableRecoveryTimestamp + // (older versions) still initialize successfully and hasLastStableRecoveryTimestamp() returns + // false. + ReplSetHeartbeatResponse hbResponse; + BSONObj initializerObj = BSON( + "ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime" + << Date_t() + Seconds(100) << "opTime" << OpTime(Timestamp(100, 0), 0).toBSON() + << "wallTime" << Date_t() + Seconds(100) << "v" << 2); + ASSERT_OK(hbResponse.initialize(initializerObj, 0)); + ASSERT_FALSE(hbResponse.hasLastStableRecoveryTimestamp()); +} + TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) { ReplSetHeartbeatResponse hbResponse; BSONObj initializerObj = diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a5e6f6a1c31..57ddfe3f861 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3179,6 +3179,8 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus( return Status(ErrorCodes::ShutdownInProgress, "shutdown in progress"); } + _topCoord->setCachedLastStableRecoveryTimestamp(lastStableRecoveryTimestamp); + Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); _topCoord->prepareStatusResponse( TopologyCoordinator::ReplSetStatusArgs{ @@ -5181,6 +5183,31 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { // Update our understanding of the oldest available snapshot timestamp. setOldestTimestampMetric(_storage->getOldestTimestamp(getServiceContext())); } + + _maybeUpdateCachedLastStableRecoveryTimestamp(lk, _replExecutor->now()); +} + +void ReplicationCoordinatorImpl::_maybeUpdateCachedLastStableRecoveryTimestamp(WithLock lk, + Date_t now) { + // Periodically refresh the cached lastStableRecoveryTimestamp so it can be gossiped to other + // replica set members via heartbeats. Once a value has been established, throttle to once per + // two checkpoint intervals to avoid querying the storage engine too frequently. + const bool needsRefresh = [&] { + if (!_topCoord->hasCachedLastStableRecoveryTimestamp()) { + return true; + } + const double syncDelaySeconds = storageGlobalParams.syncdelay.load(); + const Seconds effectiveSyncDelay{ + syncDelaySeconds > 0 ? static_cast(syncDelaySeconds) : 60LL}; + return now - _lastStableRecoveryTimestampRefreshTime >= 2 * effectiveSyncDelay; + }(); + if (needsRefresh) { + const auto ts = _storage->getLastStableRecoveryTimestamp(getServiceContext()); + if (ts && *ts != Timestamp::min()) { + _topCoord->setCachedLastStableRecoveryTimestamp(ts); + } + _lastStableRecoveryTimestampRefreshTime = now; + } } void ReplicationCoordinatorImpl::finishRecoveryIfEligible(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index c0437cbff96..a4e589ba81f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1604,9 +1604,18 @@ private: /** * Calculates and sets the value of the 'stable' replication optime for the storage engine. + * + * This function may also update the topology coordinator's cached value of the storage engine's + * recovery timestamp. */ void _setStableTimestampForStorage(WithLock lk); + /** + * Updates the topology coordinator's cached value of the storage engine's recovery timestamp if + * enough wall-clock time has elapsed that a checkpoint is certain to have occurred. + */ + void _maybeUpdateCachedLastStableRecoveryTimestamp(WithLock lk, Date_t now); + /** * Clears the current committed snapshot. */ @@ -2018,6 +2027,10 @@ private: // The cached value of the topology from the most recent SplitHorizonChange. int64_t _lastHorizonTopologyChange{-1}; // (M) + // The last time we queried the storage engine for lastStableRecoveryTimestamp and cached it in + // _topCoord. Used to throttle calls to at most once per two checkpoint intervals. + Date_t _lastStableRecoveryTimestampRefreshTime; // (M) + // This should be set during sharding initialization except on config shard. boost::optional _wasCWWCSetOnConfigServerOnStartup; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 09809470e4f..3600ed47e76 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -6219,6 +6219,104 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) { ASSERT_EQUALS(Timestamp(3, 2), stableTimestamp); } +class LastStableRecoveryTimestampTest : public ReplCoordTest { +public: + void setUp() override { + ReplCoordTest::setUp(); + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess(BSON("_id" << "mySet" + << "protocolVersion" << 1 << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "test1:1234") + << BSON("_id" << 1 << "host" + << "test2:1234") + << BSON("_id" << 2 << "host" + << "test3:1234"))), + HostAndPort("test2", 1234)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + replCoordSetMyLastWrittenAndAppliedAndDurableOpTime(OpTimeWithTermOne(1, 1), + Date_t() + Seconds(100)); + simulateSuccessfulV1Election(); + getStorageInterface()->allDurableTimestamp = Timestamp(10, 1); + replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100)); + } +}; + +TEST_F(LastStableRecoveryTimestampTest, CachesRealTimestamp) { + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1); + + replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false); + + ASSERT_TRUE(getTopoCoord().hasCachedLastStableRecoveryTimestamp()); + ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp()); +} + +TEST_F(LastStableRecoveryTimestampTest, DoesNotCacheMinTimestamp) { + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp::min(); + + replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false); + + ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp()); +} + +TEST_F(LastStableRecoveryTimestampTest, DoesNotCacheAbsentTimestamp) { + getStorageInterface()->lastStableRecoveryTimestamp = boost::none; + + replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false); + + ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp()); +} + +TEST_F(LastStableRecoveryTimestampTest, ThrottlesCacheRefreshAfterValueEstablished) { + const double origSyncDelay = storageGlobalParams.syncdelay.load(); + // --syncdelay=1 second means the throttle window is 2 seconds. + storageGlobalParams.syncdelay.store(1.0); + ON_BLOCK_EXIT([origSyncDelay] { storageGlobalParams.syncdelay.store(origSyncDelay); }); + + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false); + ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp()); + + // A second call within the throttle window should not update the cached value. + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 2); + replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100)); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100), false); + ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp()); + + // After the throttle window expires, the next call should update the cached value. + enterNetwork(); + getNet()->runUntil(getNet()->now() + Seconds(3)); + exitNetwork(); + replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100)); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100), false); + ASSERT_EQUALS(Timestamp(100, 2), *getTopoCoord().getCachedLastStableRecoveryTimestamp()); +} + +TEST_F(LastStableRecoveryTimestampTest, AggressivelyRefreshesUntilValueEstablished) { + // Use a throttle window long enough that it cannot expire during the test. Any refreshing of + // the cached value will be driven by the cached value being unset, not by the timer. + const double origSyncDelay = storageGlobalParams.syncdelay.load(); + storageGlobalParams.syncdelay.store(3600.0); + ON_BLOCK_EXIT([origSyncDelay] { storageGlobalParams.syncdelay.store(origSyncDelay); }); + + // The first call returns Timestamp::min() and means no value should be cached. + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp::min(); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false); + ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp()); + + // Even within the throttle window, the next call should still update the cached value because + // it was unset. + getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1); + replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100)); + replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100)); + replCoordAdvanceCommitPoint(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100), false); + ASSERT_TRUE(getTopoCoord().hasCachedLastStableRecoveryTimestamp()); + ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp()); +} + TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDuringShutdown) { assertStartSuccess(BSON("_id" << "mySet" << "version" << 2 << "members" diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 9dfe43f7966..00bd6dc3fd8 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -399,7 +399,7 @@ public: boost::optional getLastStableRecoveryTimestamp( ServiceContext* serviceCtx) const override { - return boost::none; + return lastStableRecoveryTimestamp; } Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override { @@ -484,6 +484,7 @@ public: Timestamp allDurableTimestamp = Timestamp::min(); Timestamp oldestOpenReadTimestamp = Timestamp::min(); Timestamp earliestOplogTimestamp = Timestamp::min(); + boost::optional lastStableRecoveryTimestamp = boost::none; private: mutable std::mutex _mutex; diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index cd6d5cb7f5e..8f34cbd2361 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1015,6 +1015,9 @@ StatusWith TopologyCoordinator::prepareHeartbeatResponseV1( response->setAppliedOpTimeAndWallTime(lastOpApplied); response->setWrittenOpTimeAndWallTime(lastOpWritten); response->setDurableOpTimeAndWallTime(lastOpDurable); + if (_cachedLastStableRecoveryTimestamp) { + response->setLastStableRecoveryTimestamp(*_cachedLastStableRecoveryTimestamp); + } if (_currentPrimaryIndex != -1) { response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId().getData()); @@ -2166,6 +2169,9 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu if (featureFlagMajorityWriteLatency) { bb.appendDate("lastWrittenWallTime", it->getLastWrittenWallTime()); } + if (lastStableRecoveryTimestamp) { + bb.append("lastStableRecoveryTimestamp", *lastStableRecoveryTimestamp); + } } if (!_syncSource.empty() && !_iAmPrimary()) { @@ -2240,6 +2246,9 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu if (featureFlagMajorityWriteLatency) { bb.appendDate("lastWrittenWallTime", it->getLastWrittenWallTime()); } + if (const auto ts = it->getLastStableRecoveryTimestamp()) { + bb.append("lastStableRecoveryTimestamp", *ts); + } } bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 8ecd5426482..718b90ec0aa 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -249,6 +249,14 @@ public: // Returns _electionIdTerm. MONGO_MOD_PRIVATE long long getElectionIdTerm() const; + MONGO_MOD_PRIVATE bool hasCachedLastStableRecoveryTimestamp() const { + return _cachedLastStableRecoveryTimestamp.has_value(); + } + + MONGO_MOD_PRIVATE boost::optional getCachedLastStableRecoveryTimestamp() const { + return _cachedLastStableRecoveryTimestamp; + } + //////////////////////////////////////////////////////////// // // Basic state manipulation methods. @@ -397,6 +405,13 @@ public: */ MONGO_MOD_PRIVATE void resetMaintenanceCount(); + /** + * Update the cached last stable recovery timestamp used to gossip to other nodes. + */ + MONGO_MOD_PRIVATE void setCachedLastStableRecoveryTimestamp(boost::optional ts) { + _cachedLastStableRecoveryTimestamp = ts; + } + //////////////////////////////////////////////////////////// // // Methods that prepare responses to command requests. @@ -1267,6 +1282,12 @@ private: // point. This allows us to skip more costly checks of if this parameter was recently turned // on/off in the normal case. bool _priorityPortUsageEverDisabled = false; + + // Last stable recovery timestamp queried from the storage engine. The value is cached here so + // it can be included in heartbeat responses and gossiped to other replica set members. The + // cached value is updated periodically during _setStableTimestampForStorage(), but is throttled + // to update at most once per two checkpoint intervals. + boost::optional _cachedLastStableRecoveryTimestamp; }; /** diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index b312f07d3d5..242fc04a1f8 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -413,6 +413,10 @@ protected: int selfIndex, bool expectPriority); void testReplSetGetStatus(BSONObj config, bool expectPriority); + void testReplSetGetStatusLastStableRecoveryTimestamp(BSONObj config, + int selfIndex, + boost::optional selfTs, + boost::optional remoteTs); void testMemberReplicationProgressAfterReconfig(BSONObj initialConfig, BSONObj changedConfig, int selfIndex, @@ -2278,6 +2282,67 @@ TEST_F(TopoCoordTest, ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String()); } +void TopoCoordTest::testReplSetGetStatusLastStableRecoveryTimestamp( + BSONObj config, + int selfIndex, + boost::optional selfTs, + boost::optional remoteTs) { + updateConfig(config, selfIndex); + setSelfMemberState(MemberState::RS_SECONDARY); + + OpTime remoteOpTime(Timestamp(3, 1), 20); + Date_t remoteWallTime = Date_t() + Seconds(remoteOpTime.getSecs()); + ReplSetHeartbeatResponse hb; + hb.setConfigVersion(1); + hb.setState(MemberState::RS_SECONDARY); + hb.setElectionTime(Timestamp()); + hb.setAppliedOpTimeAndWallTime({remoteOpTime, remoteWallTime}); + hb.setWrittenOpTimeAndWallTime({remoteOpTime, remoteWallTime}); + hb.setDurableOpTimeAndWallTime({remoteOpTime, remoteWallTime}); + hb.setTerm(getTopoCoord().getTerm()); + if (remoteTs) { + hb.setLastStableRecoveryTimestamp(*remoteTs); + } + + getTopoCoord().prepareHeartbeatRequestV1(now()++, "rs0", HostAndPort("h2")); + getTopoCoord().processHeartbeatResponse( + now()++, Milliseconds(1), HostAndPort("h2"), StatusWith(hb)); + + getTopoCoord().setCachedLastStableRecoveryTimestamp(selfTs); + + BSONObjBuilder statusBuilder; + Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result"); + getTopoCoord().prepareStatusResponse( + TopologyCoordinator::ReplSetStatusArgs{ + now()++, 10, OpTime(), BSONObj(), BSONObj(), BSONObj(), selfTs}, + &statusBuilder, + &resultStatus); + ASSERT_OK(resultStatus); + BSONObj rsStatus = statusBuilder.obj(); + + std::vector members = rsStatus["members"].Array(); + ASSERT_EQUALS(2U, members.size()); + + BSONObj selfStatus = members[selfIndex].Obj(); + BSONObj h2Status = members[1 - selfIndex].Obj(); + ASSERT_EQUALS("hself:27017", selfStatus["name"].str()); + ASSERT_EQUALS("h2:27017", h2Status["name"].str()); + + if (selfTs) { + ASSERT_TRUE(selfStatus.hasField("lastStableRecoveryTimestamp")); + ASSERT_EQUALS(*selfTs, selfStatus["lastStableRecoveryTimestamp"].timestamp()); + } else { + ASSERT_FALSE(selfStatus.hasField("lastStableRecoveryTimestamp")); + } + + if (remoteTs) { + ASSERT_TRUE(h2Status.hasField("lastStableRecoveryTimestamp")); + ASSERT_EQUALS(*remoteTs, h2Status["lastStableRecoveryTimestamp"].timestamp()); + } else { + ASSERT_FALSE(h2Status.hasField("lastStableRecoveryTimestamp")); + } +} + void TopoCoordTest::testReplSetGetStatus(BSONObj config, bool expectPriority) { // This test starts by configuring a TopologyCoordinator as a member of a 4 node replica // set, with each node in a different state. @@ -2481,7 +2546,8 @@ void TopoCoordTest::testReplSetGetStatus(BSONObj config, bool expectPriority) { ASSERT_TRUE(member2Status.hasField("optimeDurableDate")); ASSERT_TRUE(member2Status.hasField("optimeWritten")); ASSERT_TRUE(selfStatus.hasField("optimeWrittenDate")); - ASSERT_FALSE(selfStatus.hasField("lastStableRecoveryTimestamp")); + ASSERT_EQUALS(lastStableRecoveryTimestamp, + selfStatus["lastStableRecoveryTimestamp"].timestamp()); ASSERT_EQUALS(electionTime, selfStatus["electionTime"].timestamp()); ASSERT_FALSE(selfStatus.hasField("pingMs")); @@ -2536,6 +2602,26 @@ TEST_F(TopoCoordTest, ReplSetGetStatusPriority) { true); } +TEST_F(TopoCoordTest, ReplSetGetStatusShowsLastStableRecoveryTimestamp) { + testReplSetGetStatusLastStableRecoveryTimestamp( + BSON("_id" << "rs0" << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" << "hself") + << BSON("_id" << 1 << "host" << "h2"))), + 0, + Timestamp(15, 1), + Timestamp(20, 3)); +} + +TEST_F(TopoCoordTest, ReplSetGetStatusOmitsLastStableRecoveryTimestampWhenAbsentFromHeartbeat) { + testReplSetGetStatusLastStableRecoveryTimestamp( + BSON("_id" << "rs0" << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" << "hself") + << BSON("_id" << 1 << "host" << "h2"))), + 0, + boost::none, + boost::none); +} + TEST_F(TopoCoordTest, ReplSetGetStatusWriteMajorityDifferentFromMajorityVoteCount) { // This tests that writeMajorityCount differs from majorityVoteCount in replSetGetStatus when // the number of non-arbiter voters is less than majorityVoteCount. @@ -3173,6 +3259,35 @@ TEST_F(TopoCoordTest, RespondToHeartbeatsWithNullLastAppliedAndLastDurableWhileI ASSERT_EQUALS(OpTime(), response.getDurableOpTime()); } +TEST_F(PrepareHeartbeatResponseV1Test, + HeartbeatResponseIncludesLastStableRecoveryTimestampWhenCacheIsSet) { + getTopoCoord().setCachedLastStableRecoveryTimestamp(Timestamp(10, 5)); + + ReplSetHeartbeatArgsV1 args; + args.setConfigVersion(1); + args.setSetName("rs0"); + args.setSenderId(20); + ReplSetHeartbeatResponse response; + Status result(ErrorCodes::InternalError, "prepareHeartbeatResponse didn't set result"); + prepareHeartbeatResponseV1(args, &response, &result); + ASSERT_OK(result); + ASSERT_TRUE(response.hasLastStableRecoveryTimestamp()); + ASSERT_EQUALS(Timestamp(10, 5), response.getLastStableRecoveryTimestamp()); +} + +TEST_F(PrepareHeartbeatResponseV1Test, + HeartbeatResponseOmitsLastStableRecoveryTimestampWhenCacheIsNotSet) { + ReplSetHeartbeatArgsV1 args; + args.setConfigVersion(1); + args.setSetName("rs0"); + args.setSenderId(20); + ReplSetHeartbeatResponse response; + Status result(ErrorCodes::InternalError, "prepareHeartbeatResponse didn't set result"); + prepareHeartbeatResponseV1(args, &response, &result); + ASSERT_OK(result); + ASSERT_FALSE(response.hasLastStableRecoveryTimestamp()); +} + void TopoCoordTest::testMemberReplicationProgressAfterReconfig(BSONObj initialConfig, BSONObj changedConfig, int selfIndex,