SERVER-125888 Gossip lastStableRecoveryTimestamp between repl members. (#53168)
GitOrigin-RevId: 0179ce4b54c4d0a55bf3f05af578d0106f659111
This commit is contained in:
parent
6833d54ea3
commit
ca6c3c8352
@ -71,6 +71,17 @@ for (let oldVersion of ["last-lts", "last-continuous"]) {
|
||||
// Allow more valid writes to go through
|
||||
sleep(10 * 1000);
|
||||
|
||||
// Newer server binaries should always have gossiped their lastStableRecoveryTimestamp to each
|
||||
// other via heartbeats. The presence of this information won't negatively affect heartbeats to
|
||||
// older server versions.
|
||||
assert.soon(() => {
|
||||
const rsStatus = assert.commandWorked(primary.adminCommand({replSetGetStatus: 1}));
|
||||
return (
|
||||
rsStatus.members[0].lastStableRecoveryTimestamp !== undefined &&
|
||||
rsStatus.members[1].lastStableRecoveryTimestamp !== undefined
|
||||
);
|
||||
});
|
||||
|
||||
jsTest.log("Downgrading replica set from latest to " + oldVersion);
|
||||
rst.upgradeSet({binVersion: oldVersion});
|
||||
jsTest.log("Replica set downgraded.");
|
||||
|
||||
@ -100,6 +100,10 @@ MemberData::HeartbeatChanges MemberData::setUpValues(Date_t now,
|
||||
_configVersion = hbResponse.getConfigVersion();
|
||||
}
|
||||
|
||||
if (hbResponse.hasLastStableRecoveryTimestamp()) {
|
||||
_lastStableRecoveryTimestamp = hbResponse.getLastStableRecoveryTimestamp();
|
||||
}
|
||||
|
||||
_lastResponse = std::move(hbResponse);
|
||||
|
||||
return {opTimeAdvanced, configChanged, memberStateChanged};
|
||||
|
||||
@ -42,6 +42,8 @@
|
||||
|
||||
#include <string>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace MONGO_MOD_PUB mongo {
|
||||
namespace repl {
|
||||
|
||||
@ -202,6 +204,10 @@ public:
|
||||
return _priorityPort;
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> getLastStableRecoveryTimestamp() const {
|
||||
return _lastStableRecoveryTimestamp;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns true if the last heartbeat data explicilty stated that the node is not electable.
|
||||
*/
|
||||
@ -394,6 +400,9 @@ private:
|
||||
|
||||
// Optional priority port for this member.
|
||||
boost::optional<int> _priorityPort;
|
||||
|
||||
// Last known lastStableRecoveryTimestamp gossiped from this member via heartbeat.
|
||||
boost::optional<Timestamp> _lastStableRecoveryTimestamp;
|
||||
};
|
||||
|
||||
} // namespace repl
|
||||
|
||||
@ -67,6 +67,7 @@ const std::string kSyncSourceFieldName = "syncingTo";
|
||||
const std::string kTermFieldName = "term";
|
||||
const std::string kTimestampFieldName = "ts";
|
||||
const std::string kIsElectableFieldName = "electable";
|
||||
const std::string kLastStableRecoveryTimestampFieldName = "lastStableRecoveryTimestamp";
|
||||
|
||||
} // namespace
|
||||
|
||||
@ -121,6 +122,9 @@ void ReplSetHeartbeatResponse::addToBSON(BSONObjBuilder* builder) const {
|
||||
if (_electableSet) {
|
||||
*builder << kIsElectableFieldName << _electable;
|
||||
}
|
||||
if (_lastStableRecoveryTimestamp) {
|
||||
builder->append(kLastStableRecoveryTimestampFieldName, *_lastStableRecoveryTimestamp);
|
||||
}
|
||||
}
|
||||
|
||||
BSONObj ReplSetHeartbeatResponse::toBSON() const {
|
||||
@ -295,6 +299,20 @@ Status ReplSetHeartbeatResponse::initialize(const BSONObj& doc, long long term)
|
||||
_syncingTo = HostAndPort(syncingToElement.String());
|
||||
}
|
||||
|
||||
const BSONElement lastStableRecoveryTimestampElement =
|
||||
doc[kLastStableRecoveryTimestampFieldName];
|
||||
if (!lastStableRecoveryTimestampElement.eoo()) {
|
||||
if (lastStableRecoveryTimestampElement.type() != BSONType::timestamp) {
|
||||
return Status(ErrorCodes::TypeMismatch,
|
||||
str::stream()
|
||||
<< "Expected \"" << kLastStableRecoveryTimestampFieldName
|
||||
<< "\" field in response to replSetHeartbeat to have type Timestamp, "
|
||||
"but found "
|
||||
<< typeName(lastStableRecoveryTimestampElement.type()));
|
||||
}
|
||||
_lastStableRecoveryTimestamp = lastStableRecoveryTimestampElement.timestamp();
|
||||
}
|
||||
|
||||
const BSONElement rsConfigElement = doc[kConfigFieldName];
|
||||
if (rsConfigElement.eoo()) {
|
||||
_configSet = false;
|
||||
|
||||
@ -43,6 +43,8 @@
|
||||
|
||||
#include <string>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class BSONObj;
|
||||
@ -131,6 +133,12 @@ public:
|
||||
return _electableSet;
|
||||
}
|
||||
bool isElectable() const;
|
||||
bool hasLastStableRecoveryTimestamp() const {
|
||||
return _lastStableRecoveryTimestamp.has_value();
|
||||
}
|
||||
Timestamp getLastStableRecoveryTimestamp() const {
|
||||
return *_lastStableRecoveryTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets _setName to "name".
|
||||
@ -215,6 +223,9 @@ public:
|
||||
_electableSet = true;
|
||||
_electable = electable;
|
||||
}
|
||||
void setLastStableRecoveryTimestamp(Timestamp ts) {
|
||||
_lastStableRecoveryTimestamp = ts;
|
||||
}
|
||||
|
||||
private:
|
||||
bool _electionTimeSet = false;
|
||||
@ -249,6 +260,8 @@ private:
|
||||
|
||||
bool _electableSet = false;
|
||||
bool _electable = false;
|
||||
|
||||
boost::optional<Timestamp> _lastStableRecoveryTimestamp;
|
||||
};
|
||||
|
||||
} // namespace repl
|
||||
|
||||
@ -57,6 +57,7 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
|
||||
Date_t appliedWallTime = Date_t() + Seconds(appliedOpTime.getSecs());
|
||||
OpTime writtenOpTime = OpTime(Timestamp(50), 0);
|
||||
Date_t writtenWallTime = Date_t() + Seconds(writtenOpTime.getSecs());
|
||||
Timestamp lastStableRecoveryTimestamp = Timestamp(9);
|
||||
ASSERT_EQUALS(false, hbResponse.hasState());
|
||||
ASSERT_EQUALS(false, hbResponse.hasElectionTime());
|
||||
ASSERT_EQUALS(false, hbResponse.hasDurableOpTime());
|
||||
@ -94,6 +95,8 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
|
||||
// set writtenOpTime
|
||||
hbResponse.setWrittenOpTimeAndWallTime({writtenOpTime, writtenWallTime});
|
||||
fieldsSet += 2; // OpTime and WallTime are separate fields
|
||||
hbResponse.setLastStableRecoveryTimestamp(lastStableRecoveryTimestamp);
|
||||
++fieldsSet;
|
||||
// set config
|
||||
ReplSetConfig config;
|
||||
hbResponse.setConfig(config);
|
||||
@ -122,6 +125,7 @@ TEST(ReplSetHeartbeatResponse, DefaultConstructThenSlowlyBuildToFullObj) {
|
||||
ASSERT_EQUALS(appliedWallTime, hbResponse.getAppliedOpTimeAndWallTime().wallTime);
|
||||
ASSERT_EQUALS(writtenOpTime, hbResponse.getWrittenOpTime());
|
||||
ASSERT_EQUALS(writtenWallTime, hbResponse.getWrittenOpTimeAndWallTime().wallTime);
|
||||
ASSERT_EQUALS(lastStableRecoveryTimestamp, hbResponse.getLastStableRecoveryTimestamp());
|
||||
ASSERT_EQUALS(config.toBSON().toString(), hbResponse.getConfig().toBSON().toString());
|
||||
|
||||
hbResponseObj = hbResponse.toBSON();
|
||||
@ -260,6 +264,31 @@ TEST(ReplSetHeartbeatResponse, InitializeNoWrittenWallTime) {
|
||||
ASSERT_EQUALS("", result.reason());
|
||||
}
|
||||
|
||||
TEST(ReplSetHeartbeatResponse, InitializeWrongLastStableRecoveryTimestampType) {
|
||||
ReplSetHeartbeatResponse hbResponse;
|
||||
BSONObj initializerObj =
|
||||
BSON("ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON()
|
||||
<< "durableWallTime" << Date_t() + Seconds(100) << "opTime"
|
||||
<< OpTime(Timestamp(100, 0), 0).toBSON() << "wallTime" << Date_t() + Seconds(100)
|
||||
<< "v" << 2 << "lastStableRecoveryTimestamp" << 12345LL);
|
||||
Status result = hbResponse.initialize(initializerObj, 0);
|
||||
ASSERT_EQUALS(ErrorCodes::TypeMismatch, result);
|
||||
ASSERT_STRING_CONTAINS(result.reason(), "Expected \"lastStableRecoveryTimestamp\" field");
|
||||
}
|
||||
|
||||
TEST(ReplSetHeartbeatResponse, InitializeNoLastStableRecoveryTimestamp) {
|
||||
// Verifies backward compatibility: responses which don't report lastStableRecoveryTimestamp
|
||||
// (older versions) still initialize successfully and hasLastStableRecoveryTimestamp() returns
|
||||
// false.
|
||||
ReplSetHeartbeatResponse hbResponse;
|
||||
BSONObj initializerObj = BSON(
|
||||
"ok" << 1.0 << "durableOpTime" << OpTime(Timestamp(100, 0), 0).toBSON() << "durableWallTime"
|
||||
<< Date_t() + Seconds(100) << "opTime" << OpTime(Timestamp(100, 0), 0).toBSON()
|
||||
<< "wallTime" << Date_t() + Seconds(100) << "v" << 2);
|
||||
ASSERT_OK(hbResponse.initialize(initializerObj, 0));
|
||||
ASSERT_FALSE(hbResponse.hasLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST(ReplSetHeartbeatResponse, InitializeMemberStateWrongType) {
|
||||
ReplSetHeartbeatResponse hbResponse;
|
||||
BSONObj initializerObj =
|
||||
|
||||
@ -3179,6 +3179,8 @@ Status ReplicationCoordinatorImpl::processReplSetGetStatus(
|
||||
return Status(ErrorCodes::ShutdownInProgress, "shutdown in progress");
|
||||
}
|
||||
|
||||
_topCoord->setCachedLastStableRecoveryTimestamp(lastStableRecoveryTimestamp);
|
||||
|
||||
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
|
||||
_topCoord->prepareStatusResponse(
|
||||
TopologyCoordinator::ReplSetStatusArgs{
|
||||
@ -5181,6 +5183,31 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
|
||||
// Update our understanding of the oldest available snapshot timestamp.
|
||||
setOldestTimestampMetric(_storage->getOldestTimestamp(getServiceContext()));
|
||||
}
|
||||
|
||||
_maybeUpdateCachedLastStableRecoveryTimestamp(lk, _replExecutor->now());
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_maybeUpdateCachedLastStableRecoveryTimestamp(WithLock lk,
|
||||
Date_t now) {
|
||||
// Periodically refresh the cached lastStableRecoveryTimestamp so it can be gossiped to other
|
||||
// replica set members via heartbeats. Once a value has been established, throttle to once per
|
||||
// two checkpoint intervals to avoid querying the storage engine too frequently.
|
||||
const bool needsRefresh = [&] {
|
||||
if (!_topCoord->hasCachedLastStableRecoveryTimestamp()) {
|
||||
return true;
|
||||
}
|
||||
const double syncDelaySeconds = storageGlobalParams.syncdelay.load();
|
||||
const Seconds effectiveSyncDelay{
|
||||
syncDelaySeconds > 0 ? static_cast<std::int64_t>(syncDelaySeconds) : 60LL};
|
||||
return now - _lastStableRecoveryTimestampRefreshTime >= 2 * effectiveSyncDelay;
|
||||
}();
|
||||
if (needsRefresh) {
|
||||
const auto ts = _storage->getLastStableRecoveryTimestamp(getServiceContext());
|
||||
if (ts && *ts != Timestamp::min()) {
|
||||
_topCoord->setCachedLastStableRecoveryTimestamp(ts);
|
||||
}
|
||||
_lastStableRecoveryTimestampRefreshTime = now;
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::finishRecoveryIfEligible(OperationContext* opCtx) {
|
||||
|
||||
@ -1604,9 +1604,18 @@ private:
|
||||
|
||||
/**
|
||||
* Calculates and sets the value of the 'stable' replication optime for the storage engine.
|
||||
*
|
||||
* This function may also update the topology coordinator's cached value of the storage engine's
|
||||
* recovery timestamp.
|
||||
*/
|
||||
void _setStableTimestampForStorage(WithLock lk);
|
||||
|
||||
/**
|
||||
* Updates the topology coordinator's cached value of the storage engine's recovery timestamp if
|
||||
* enough wall-clock time has elapsed that a checkpoint is certain to have occurred.
|
||||
*/
|
||||
void _maybeUpdateCachedLastStableRecoveryTimestamp(WithLock lk, Date_t now);
|
||||
|
||||
/**
|
||||
* Clears the current committed snapshot.
|
||||
*/
|
||||
@ -2018,6 +2027,10 @@ private:
|
||||
// The cached value of the topology from the most recent SplitHorizonChange.
|
||||
int64_t _lastHorizonTopologyChange{-1}; // (M)
|
||||
|
||||
// The last time we queried the storage engine for lastStableRecoveryTimestamp and cached it in
|
||||
// _topCoord. Used to throttle calls to at most once per two checkpoint intervals.
|
||||
Date_t _lastStableRecoveryTimestampRefreshTime; // (M)
|
||||
|
||||
// This should be set during sharding initialization except on config shard.
|
||||
boost::optional<bool> _wasCWWCSetOnConfigServerOnStartup;
|
||||
|
||||
|
||||
@ -6219,6 +6219,104 @@ TEST_F(StableOpTimeTest, AdvanceCommitPointSetsStableOpTimeForStorage) {
|
||||
ASSERT_EQUALS(Timestamp(3, 2), stableTimestamp);
|
||||
}
|
||||
|
||||
class LastStableRecoveryTimestampTest : public ReplCoordTest {
|
||||
public:
|
||||
void setUp() override {
|
||||
ReplCoordTest::setUp();
|
||||
init("mySet/test1:1234,test2:1234,test3:1234");
|
||||
assertStartSuccess(BSON("_id" << "mySet"
|
||||
<< "protocolVersion" << 1 << "version" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 0 << "host"
|
||||
<< "test1:1234")
|
||||
<< BSON("_id" << 1 << "host"
|
||||
<< "test2:1234")
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
replCoordSetMyLastWrittenAndAppliedAndDurableOpTime(OpTimeWithTermOne(1, 1),
|
||||
Date_t() + Seconds(100));
|
||||
simulateSuccessfulV1Election();
|
||||
getStorageInterface()->allDurableTimestamp = Timestamp(10, 1);
|
||||
replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100));
|
||||
replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100));
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(LastStableRecoveryTimestampTest, CachesRealTimestamp) {
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1);
|
||||
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false);
|
||||
|
||||
ASSERT_TRUE(getTopoCoord().hasCachedLastStableRecoveryTimestamp());
|
||||
ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(LastStableRecoveryTimestampTest, DoesNotCacheMinTimestamp) {
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp::min();
|
||||
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false);
|
||||
|
||||
ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(LastStableRecoveryTimestampTest, DoesNotCacheAbsentTimestamp) {
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = boost::none;
|
||||
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false);
|
||||
|
||||
ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(LastStableRecoveryTimestampTest, ThrottlesCacheRefreshAfterValueEstablished) {
|
||||
const double origSyncDelay = storageGlobalParams.syncdelay.load();
|
||||
// --syncdelay=1 second means the throttle window is 2 seconds.
|
||||
storageGlobalParams.syncdelay.store(1.0);
|
||||
ON_BLOCK_EXIT([origSyncDelay] { storageGlobalParams.syncdelay.store(origSyncDelay); });
|
||||
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1);
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false);
|
||||
ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp());
|
||||
|
||||
// A second call within the throttle window should not update the cached value.
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 2);
|
||||
replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100));
|
||||
replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100));
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100), false);
|
||||
ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp());
|
||||
|
||||
// After the throttle window expires, the next call should update the cached value.
|
||||
enterNetwork();
|
||||
getNet()->runUntil(getNet()->now() + Seconds(3));
|
||||
exitNetwork();
|
||||
replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100));
|
||||
replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100));
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(4, 1), Date_t() + Seconds(100), false);
|
||||
ASSERT_EQUALS(Timestamp(100, 2), *getTopoCoord().getCachedLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(LastStableRecoveryTimestampTest, AggressivelyRefreshesUntilValueEstablished) {
|
||||
// Use a throttle window long enough that it cannot expire during the test. Any refreshing of
|
||||
// the cached value will be driven by the cached value being unset, not by the timer.
|
||||
const double origSyncDelay = storageGlobalParams.syncdelay.load();
|
||||
storageGlobalParams.syncdelay.store(3600.0);
|
||||
ON_BLOCK_EXIT([origSyncDelay] { storageGlobalParams.syncdelay.store(origSyncDelay); });
|
||||
|
||||
// The first call returns Timestamp::min() and means no value should be cached.
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp::min();
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(2, 1), Date_t() + Seconds(100), false);
|
||||
ASSERT_FALSE(getTopoCoord().hasCachedLastStableRecoveryTimestamp());
|
||||
|
||||
// Even within the throttle window, the next call should still update the cached value because
|
||||
// it was unset.
|
||||
getStorageInterface()->lastStableRecoveryTimestamp = Timestamp(100, 1);
|
||||
replCoordSetMyLastWrittenOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100));
|
||||
replCoordSetMyLastAppliedOpTime(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100));
|
||||
replCoordAdvanceCommitPoint(OpTimeWithTermOne(3, 1), Date_t() + Seconds(100), false);
|
||||
ASSERT_TRUE(getTopoCoord().hasCachedLastStableRecoveryTimestamp());
|
||||
ASSERT_EQUALS(Timestamp(100, 1), *getTopoCoord().getCachedLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDuringShutdown) {
|
||||
assertStartSuccess(BSON("_id" << "mySet"
|
||||
<< "version" << 2 << "members"
|
||||
|
||||
@ -399,7 +399,7 @@ public:
|
||||
|
||||
boost::optional<Timestamp> getLastStableRecoveryTimestamp(
|
||||
ServiceContext* serviceCtx) const override {
|
||||
return boost::none;
|
||||
return lastStableRecoveryTimestamp;
|
||||
}
|
||||
|
||||
Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override {
|
||||
@ -484,6 +484,7 @@ public:
|
||||
Timestamp allDurableTimestamp = Timestamp::min();
|
||||
Timestamp oldestOpenReadTimestamp = Timestamp::min();
|
||||
Timestamp earliestOplogTimestamp = Timestamp::min();
|
||||
boost::optional<Timestamp> lastStableRecoveryTimestamp = boost::none;
|
||||
|
||||
private:
|
||||
mutable std::mutex _mutex;
|
||||
|
||||
@ -1015,6 +1015,9 @@ StatusWith<bool> TopologyCoordinator::prepareHeartbeatResponseV1(
|
||||
response->setAppliedOpTimeAndWallTime(lastOpApplied);
|
||||
response->setWrittenOpTimeAndWallTime(lastOpWritten);
|
||||
response->setDurableOpTimeAndWallTime(lastOpDurable);
|
||||
if (_cachedLastStableRecoveryTimestamp) {
|
||||
response->setLastStableRecoveryTimestamp(*_cachedLastStableRecoveryTimestamp);
|
||||
}
|
||||
|
||||
if (_currentPrimaryIndex != -1) {
|
||||
response->setPrimaryId(_rsConfig.getMemberAt(_currentPrimaryIndex).getId().getData());
|
||||
@ -2166,6 +2169,9 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
|
||||
if (featureFlagMajorityWriteLatency) {
|
||||
bb.appendDate("lastWrittenWallTime", it->getLastWrittenWallTime());
|
||||
}
|
||||
if (lastStableRecoveryTimestamp) {
|
||||
bb.append("lastStableRecoveryTimestamp", *lastStableRecoveryTimestamp);
|
||||
}
|
||||
}
|
||||
|
||||
if (!_syncSource.empty() && !_iAmPrimary()) {
|
||||
@ -2240,6 +2246,9 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
|
||||
if (featureFlagMajorityWriteLatency) {
|
||||
bb.appendDate("lastWrittenWallTime", it->getLastWrittenWallTime());
|
||||
}
|
||||
if (const auto ts = it->getLastStableRecoveryTimestamp()) {
|
||||
bb.append("lastStableRecoveryTimestamp", *ts);
|
||||
}
|
||||
}
|
||||
bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
|
||||
bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
|
||||
|
||||
@ -249,6 +249,14 @@ public:
|
||||
// Returns _electionIdTerm.
|
||||
MONGO_MOD_PRIVATE long long getElectionIdTerm() const;
|
||||
|
||||
MONGO_MOD_PRIVATE bool hasCachedLastStableRecoveryTimestamp() const {
|
||||
return _cachedLastStableRecoveryTimestamp.has_value();
|
||||
}
|
||||
|
||||
MONGO_MOD_PRIVATE boost::optional<Timestamp> getCachedLastStableRecoveryTimestamp() const {
|
||||
return _cachedLastStableRecoveryTimestamp;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Basic state manipulation methods.
|
||||
@ -397,6 +405,13 @@ public:
|
||||
*/
|
||||
MONGO_MOD_PRIVATE void resetMaintenanceCount();
|
||||
|
||||
/**
|
||||
* Update the cached last stable recovery timestamp used to gossip to other nodes.
|
||||
*/
|
||||
MONGO_MOD_PRIVATE void setCachedLastStableRecoveryTimestamp(boost::optional<Timestamp> ts) {
|
||||
_cachedLastStableRecoveryTimestamp = ts;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Methods that prepare responses to command requests.
|
||||
@ -1267,6 +1282,12 @@ private:
|
||||
// point. This allows us to skip more costly checks of if this parameter was recently turned
|
||||
// on/off in the normal case.
|
||||
bool _priorityPortUsageEverDisabled = false;
|
||||
|
||||
// Last stable recovery timestamp queried from the storage engine. The value is cached here so
|
||||
// it can be included in heartbeat responses and gossiped to other replica set members. The
|
||||
// cached value is updated periodically during _setStableTimestampForStorage(), but is throttled
|
||||
// to update at most once per two checkpoint intervals.
|
||||
boost::optional<Timestamp> _cachedLastStableRecoveryTimestamp;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -413,6 +413,10 @@ protected:
|
||||
int selfIndex,
|
||||
bool expectPriority);
|
||||
void testReplSetGetStatus(BSONObj config, bool expectPriority);
|
||||
void testReplSetGetStatusLastStableRecoveryTimestamp(BSONObj config,
|
||||
int selfIndex,
|
||||
boost::optional<Timestamp> selfTs,
|
||||
boost::optional<Timestamp> remoteTs);
|
||||
void testMemberReplicationProgressAfterReconfig(BSONObj initialConfig,
|
||||
BSONObj changedConfig,
|
||||
int selfIndex,
|
||||
@ -2278,6 +2282,67 @@ TEST_F(TopoCoordTest,
|
||||
ASSERT_EQUALS(HostAndPort("h5").toString(), response2Obj["prevSyncTarget"].String());
|
||||
}
|
||||
|
||||
void TopoCoordTest::testReplSetGetStatusLastStableRecoveryTimestamp(
|
||||
BSONObj config,
|
||||
int selfIndex,
|
||||
boost::optional<Timestamp> selfTs,
|
||||
boost::optional<Timestamp> remoteTs) {
|
||||
updateConfig(config, selfIndex);
|
||||
setSelfMemberState(MemberState::RS_SECONDARY);
|
||||
|
||||
OpTime remoteOpTime(Timestamp(3, 1), 20);
|
||||
Date_t remoteWallTime = Date_t() + Seconds(remoteOpTime.getSecs());
|
||||
ReplSetHeartbeatResponse hb;
|
||||
hb.setConfigVersion(1);
|
||||
hb.setState(MemberState::RS_SECONDARY);
|
||||
hb.setElectionTime(Timestamp());
|
||||
hb.setAppliedOpTimeAndWallTime({remoteOpTime, remoteWallTime});
|
||||
hb.setWrittenOpTimeAndWallTime({remoteOpTime, remoteWallTime});
|
||||
hb.setDurableOpTimeAndWallTime({remoteOpTime, remoteWallTime});
|
||||
hb.setTerm(getTopoCoord().getTerm());
|
||||
if (remoteTs) {
|
||||
hb.setLastStableRecoveryTimestamp(*remoteTs);
|
||||
}
|
||||
|
||||
getTopoCoord().prepareHeartbeatRequestV1(now()++, "rs0", HostAndPort("h2"));
|
||||
getTopoCoord().processHeartbeatResponse(
|
||||
now()++, Milliseconds(1), HostAndPort("h2"), StatusWith<ReplSetHeartbeatResponse>(hb));
|
||||
|
||||
getTopoCoord().setCachedLastStableRecoveryTimestamp(selfTs);
|
||||
|
||||
BSONObjBuilder statusBuilder;
|
||||
Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
|
||||
getTopoCoord().prepareStatusResponse(
|
||||
TopologyCoordinator::ReplSetStatusArgs{
|
||||
now()++, 10, OpTime(), BSONObj(), BSONObj(), BSONObj(), selfTs},
|
||||
&statusBuilder,
|
||||
&resultStatus);
|
||||
ASSERT_OK(resultStatus);
|
||||
BSONObj rsStatus = statusBuilder.obj();
|
||||
|
||||
std::vector<BSONElement> members = rsStatus["members"].Array();
|
||||
ASSERT_EQUALS(2U, members.size());
|
||||
|
||||
BSONObj selfStatus = members[selfIndex].Obj();
|
||||
BSONObj h2Status = members[1 - selfIndex].Obj();
|
||||
ASSERT_EQUALS("hself:27017", selfStatus["name"].str());
|
||||
ASSERT_EQUALS("h2:27017", h2Status["name"].str());
|
||||
|
||||
if (selfTs) {
|
||||
ASSERT_TRUE(selfStatus.hasField("lastStableRecoveryTimestamp"));
|
||||
ASSERT_EQUALS(*selfTs, selfStatus["lastStableRecoveryTimestamp"].timestamp());
|
||||
} else {
|
||||
ASSERT_FALSE(selfStatus.hasField("lastStableRecoveryTimestamp"));
|
||||
}
|
||||
|
||||
if (remoteTs) {
|
||||
ASSERT_TRUE(h2Status.hasField("lastStableRecoveryTimestamp"));
|
||||
ASSERT_EQUALS(*remoteTs, h2Status["lastStableRecoveryTimestamp"].timestamp());
|
||||
} else {
|
||||
ASSERT_FALSE(h2Status.hasField("lastStableRecoveryTimestamp"));
|
||||
}
|
||||
}
|
||||
|
||||
void TopoCoordTest::testReplSetGetStatus(BSONObj config, bool expectPriority) {
|
||||
// This test starts by configuring a TopologyCoordinator as a member of a 4 node replica
|
||||
// set, with each node in a different state.
|
||||
@ -2481,7 +2546,8 @@ void TopoCoordTest::testReplSetGetStatus(BSONObj config, bool expectPriority) {
|
||||
ASSERT_TRUE(member2Status.hasField("optimeDurableDate"));
|
||||
ASSERT_TRUE(member2Status.hasField("optimeWritten"));
|
||||
ASSERT_TRUE(selfStatus.hasField("optimeWrittenDate"));
|
||||
ASSERT_FALSE(selfStatus.hasField("lastStableRecoveryTimestamp"));
|
||||
ASSERT_EQUALS(lastStableRecoveryTimestamp,
|
||||
selfStatus["lastStableRecoveryTimestamp"].timestamp());
|
||||
ASSERT_EQUALS(electionTime, selfStatus["electionTime"].timestamp());
|
||||
ASSERT_FALSE(selfStatus.hasField("pingMs"));
|
||||
|
||||
@ -2536,6 +2602,26 @@ TEST_F(TopoCoordTest, ReplSetGetStatusPriority) {
|
||||
true);
|
||||
}
|
||||
|
||||
TEST_F(TopoCoordTest, ReplSetGetStatusShowsLastStableRecoveryTimestamp) {
|
||||
testReplSetGetStatusLastStableRecoveryTimestamp(
|
||||
BSON("_id" << "rs0" << "version" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 0 << "host" << "hself")
|
||||
<< BSON("_id" << 1 << "host" << "h2"))),
|
||||
0,
|
||||
Timestamp(15, 1),
|
||||
Timestamp(20, 3));
|
||||
}
|
||||
|
||||
TEST_F(TopoCoordTest, ReplSetGetStatusOmitsLastStableRecoveryTimestampWhenAbsentFromHeartbeat) {
|
||||
testReplSetGetStatusLastStableRecoveryTimestamp(
|
||||
BSON("_id" << "rs0" << "version" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 0 << "host" << "hself")
|
||||
<< BSON("_id" << 1 << "host" << "h2"))),
|
||||
0,
|
||||
boost::none,
|
||||
boost::none);
|
||||
}
|
||||
|
||||
TEST_F(TopoCoordTest, ReplSetGetStatusWriteMajorityDifferentFromMajorityVoteCount) {
|
||||
// This tests that writeMajorityCount differs from majorityVoteCount in replSetGetStatus when
|
||||
// the number of non-arbiter voters is less than majorityVoteCount.
|
||||
@ -3173,6 +3259,35 @@ TEST_F(TopoCoordTest, RespondToHeartbeatsWithNullLastAppliedAndLastDurableWhileI
|
||||
ASSERT_EQUALS(OpTime(), response.getDurableOpTime());
|
||||
}
|
||||
|
||||
TEST_F(PrepareHeartbeatResponseV1Test,
|
||||
HeartbeatResponseIncludesLastStableRecoveryTimestampWhenCacheIsSet) {
|
||||
getTopoCoord().setCachedLastStableRecoveryTimestamp(Timestamp(10, 5));
|
||||
|
||||
ReplSetHeartbeatArgsV1 args;
|
||||
args.setConfigVersion(1);
|
||||
args.setSetName("rs0");
|
||||
args.setSenderId(20);
|
||||
ReplSetHeartbeatResponse response;
|
||||
Status result(ErrorCodes::InternalError, "prepareHeartbeatResponse didn't set result");
|
||||
prepareHeartbeatResponseV1(args, &response, &result);
|
||||
ASSERT_OK(result);
|
||||
ASSERT_TRUE(response.hasLastStableRecoveryTimestamp());
|
||||
ASSERT_EQUALS(Timestamp(10, 5), response.getLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
TEST_F(PrepareHeartbeatResponseV1Test,
|
||||
HeartbeatResponseOmitsLastStableRecoveryTimestampWhenCacheIsNotSet) {
|
||||
ReplSetHeartbeatArgsV1 args;
|
||||
args.setConfigVersion(1);
|
||||
args.setSetName("rs0");
|
||||
args.setSenderId(20);
|
||||
ReplSetHeartbeatResponse response;
|
||||
Status result(ErrorCodes::InternalError, "prepareHeartbeatResponse didn't set result");
|
||||
prepareHeartbeatResponseV1(args, &response, &result);
|
||||
ASSERT_OK(result);
|
||||
ASSERT_FALSE(response.hasLastStableRecoveryTimestamp());
|
||||
}
|
||||
|
||||
void TopoCoordTest::testMemberReplicationProgressAfterReconfig(BSONObj initialConfig,
|
||||
BSONObj changedConfig,
|
||||
int selfIndex,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user