SERVER-123222 Exclude Injector Nodes from RSM Server Selection in Standby Clusters (#52385)

GitOrigin-RevId: 8f51a7d724fe8380a4d04adeffc11ff73b0838fd
This commit is contained in:
Igor Praznik 2026-04-29 23:06:08 +02:00 committed by MongoDB Bot
parent 0e71217f58
commit be92c57445
11 changed files with 315 additions and 37 deletions

View File

@ -739,3 +739,69 @@ res = testDB.runCommand({insert: "test", documents: [{x: "retryable_write"}], tx
assert.eq(res.writeConcernError, {code: ErrorCodes.NotWritablePrimary, errmsg: "hello"});
// There should be no errorLabels field if no error labels provided in failCommand.
assert(!res.hasOwnProperty("errorLabels"), res);
// failCommandsExcept is a new option and is not understood by older binaries,
// so skip these tests when running against mixed-version clusters.
// TODO SERVER-125631: Remove the isMultiversion check when 9.0 becomes last-lts.
if (!isMultiversion) {
// Test failAllCommands with failCommandsExcept: every command fails except those allowlisted.
assert.commandWorked(
adminDB.runCommand({
configureFailPoint: "failCommand",
mode: "alwaysOn",
data: {
errorCode: ErrorCodes.NotWritablePrimary,
failAllCommands: true,
failCommandsExcept: ["ping"],
threadName: threadName,
},
}),
);
// "ping" is allowlisted -> still works.
assert.commandWorked(testDB.runCommand({ping: 1}));
// Other commands hit the failpoint.
assert.commandFailedWithCode(testDB.runCommand({find: "test"}), ErrorCodes.NotWritablePrimary);
assert.commandFailedWithCode(
testDB.runCommand({insert: "test", documents: [{x: 1}]}),
ErrorCodes.NotWritablePrimary,
);
// Aliases registered on the same command are honored: listing "isMaster" also exempts its
// alias "ismaster" (both belong to the CmdIsMaster command class). However "hello" and
// "isMaster" are separate command classes (CmdHello vs CmdIsMaster) and do NOT alias each
// other, so each must be listed explicitly to be exempt.
assert.commandWorked(
adminDB.runCommand({
configureFailPoint: "failCommand",
mode: "alwaysOn",
data: {
errorCode: ErrorCodes.NotWritablePrimary,
failAllCommands: true,
failCommandsExcept: ["isMaster"],
threadName: threadName,
},
}),
);
assert.commandWorked(testDB.runCommand({isMaster: 1}));
assert.commandWorked(testDB.runCommand({ismaster: 1}));
assert.commandFailedWithCode(testDB.runCommand({hello: 1}), ErrorCodes.NotWritablePrimary);
assert.commandFailedWithCode(testDB.runCommand({ping: 1}), ErrorCodes.NotWritablePrimary);
// failCommandsExcept without failAllCommands has no effect (the allowlist only modifies
// failAllCommands; failCommands denylist remains authoritative).
assert.commandWorked(
adminDB.runCommand({
configureFailPoint: "failCommand",
mode: "alwaysOn",
data: {
errorCode: ErrorCodes.NotWritablePrimary,
failCommands: ["ping"],
failCommandsExcept: ["ping"],
threadName: threadName,
},
}),
);
assert.commandFailedWithCode(testDB.runCommand({ping: 1}), ErrorCodes.NotWritablePrimary);
assert.commandWorked(adminDB.runCommand({configureFailPoint: "failCommand", mode: "off"}));
}

View File

@ -1,5 +1,11 @@
/**
* Test fixture for simulating a standby cluster's config server.
* Test fixture for simulating a standby cluster's config server. Construction launches a normal
* ShardingTest; `transitionToStandby()` then converts the config server replica set into a
* non-configsvr "standby" replica set with node 0 tagged `processType: INJECTOR`.
*
* After the transition, a `failCommand` failpoint is enabled on node 0 that fails every command
* except the small wire-protocol allowlist the real injector handles (hello, isMaster, ismaster,
* ping, etc.).
*
* Example usage:
*
@ -8,10 +14,11 @@
* fixture.st.s.getDB("test").foo.insert({_id: 1});
* fixture.transitionToStandby();
* // Interact with the standby config replica set via fixture.standbyRS.
* const primary = fixture.standbyRS.getPrimary();
* const injector = fixture.standbyRS.nodes[0]; // INJECTOR-tagged, failpoint-active
* fixture.teardown();
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
@ -48,6 +55,9 @@ export class StandbyClusterTestFixture {
*/
this._sentries = null;
/** @private Handle returned by configureFailPoint(); used to disable the failpoint. */
this._injectorFailpoint = null;
this._setup();
}
@ -121,13 +131,21 @@ export class StandbyClusterTestFixture {
MongoRunner.stopMongod(node0);
// Build a new config: rename the set to "standby", replace all member ports with the
// newly-allocated ports, and remove the configsvr field.
// newly-allocated ports, and remove the configsvr field. Only node 0 is electable
// (priority 1; others priority 0), so it deterministically wins the election and becomes
// primary -- this mirrors a real standby cluster where the injector is always primary.
// Node 0 is tagged with processType: INJECTOR so the topology matches production: the
// RSM keeps the INJECTOR-tagged primary visible for replication purposes but excludes it
// from server selection, forcing client traffic to the secondaries.
const newMembers = existingConfig.members.map((member, idx) => {
const newMember = Object.extend({}, member, /*deep=*/ true);
const hostParts = newMember.host.split(":");
hostParts[hostParts.length - 1] = String(newPorts[idx]);
newMember.host = hostParts.join(":");
newMember.priority = idx === 0 ? 1 : 0;
if (idx === 0) {
newMember.tags = {processType: "INJECTOR"};
}
return newMember;
});
@ -136,13 +154,11 @@ export class StandbyClusterTestFixture {
newConfig.members = newMembers;
delete newConfig.configsvr;
// We use a single electable member (priority set in newMembers above) and a very long
// cluster-wide election timeout so that after the forced stepdown no new primary is elected
// for the duration of the test.
if (!newConfig.settings) {
newConfig.settings = {};
}
newConfig.settings.electionTimeoutMillis = ReplSetTest.kForeverMillis;
// Node 0 is the only electable member (priority set in newMembers above) and remains
// primary for the duration of the test, mirroring a real standby cluster where the
// INJECTOR-tagged node is primary.
newConfig.settings = newConfig.settings || {};
newConfig.settings.electionTimeoutMillis = 10000;
// Replace the hosts in each individual replSetConfig.
for (let i = 0; i < numNodes; i++) {
@ -182,20 +198,26 @@ export class StandbyClusterTestFixture {
this.standbyRS.asCluster(nodes, () => {
this.standbyRS.stepUp(nodes[0], {awaitReplicationBeforeStepUp: false});
this.standbyRS.awaitNodesAgreeOnPrimary();
// Write a majority-committed noop before stepping down to ensure the committed snapshot
// is established. The primary's drain-completion noop may not yet be journaled, and
// without a journaled entry the commit point (and thus the committed snapshot) never
// advances. Once the node steps down to secondary, the commit point can no longer
// advance, so majority reads would be permanently blocked.
const primary = this.standbyRS.getPrimary();
assert.commandWorked(
primary.adminCommand({
appendOplogNote: 1,
data: {msg: "standby transition commit point advance"},
writeConcern: {w: "majority", wtimeout: ReplSetTest.kDefaultTimeoutMS},
}),
);
assert.commandWorked(primary.adminCommand({replSetStepDown: 1, force: true}));
// Node 0 stays primary; the INJECTOR tag keeps client traffic from being routed to it
// via server selection, while the failpoint fails any command that does slip through
// (everything outside the wire-protocol allowlist the real injector handles).
this._injectorFailpoint = configureFailPoint(nodes[0], "failCommand", {
failAllCommands: true,
failCommandsExcept: [
"hello",
"isMaster",
"ismaster",
"ping",
"find",
"getMore",
"killCursors",
"replSetHeartbeat",
"replSetUpdatePosition",
"saslStart",
"saslContinue",
],
errorCode: 12319007,
});
});
// Start a mongosentry on every retired port (old shard ports and old config server ports).
@ -233,6 +255,16 @@ export class StandbyClusterTestFixture {
}
if (this.standbyRS) {
// Disable the injector failpoint before stopSet (if it was enabled), otherwise the
// shutdown command (not on the allowlist) would be rejected.
if (this._injectorFailpoint) {
try {
this._injectorFailpoint.off();
} catch (e) {
jsTest.log.info(`Failed to disable injector failpoint during teardown: ${e}`);
}
this._injectorFailpoint = null;
}
this.standbyRS.stopSet();
} else {
this.st.stop();

View File

@ -13,7 +13,7 @@ import {StandbyClusterTestFixture} from "jstests/noPassthrough/libs/sharded_clus
const fixture = new StandbyClusterTestFixture({
name: jsTestName(),
shards: 2,
rs: {nodes: 1},
rs: {nodes: 3},
configShard: true,
});

View File

@ -18,7 +18,7 @@ describe("config-only mode startup", function () {
name: "config_only_mode_startup",
shards: 1,
rs: {nodes: 1},
config: {nodes: 1},
config: 3,
});
this.fixture.transitionToStandby();
this.rs = this.fixture.standbyRS;

View File

@ -17,7 +17,7 @@ const fixture = new StandbyClusterTestFixture({
name: "mongos_start_with_standby_config",
shards: 1,
rs: {nodes: 1},
config: {nodes: 1},
config: 3,
keyFile: KEY_FILE,
});

View File

@ -35,7 +35,9 @@
#include "mongo/bson/bsontypes.h"
#include "mongo/bson/oid.h"
#include "mongo/client/sdam/sdam_datatypes.h"
#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/duration.h"
@ -283,6 +285,14 @@ const std::map<std::string, std::string>& ServerDescription::getTags() const {
return _tags;
}
bool ServerDescription::isInjector() const {
if (MONGO_unlikely(serverGlobalParams.configOnly)) {
auto it = _tags.find(kProcessTypeTagKey);
return it != _tags.end() && it->second == kInjectorTagValue;
}
return false;
}
const boost::optional<std::string>& ServerDescription::getSetName() const {
return _setName;
}

View File

@ -94,6 +94,14 @@ public:
const boost::optional<HostAndPort>& getMe() const;
const boost::optional<std::string>& getSetName() const;
const std::map<std::string, std::string>& getTags() const;
/**
* Returns true if this server is tagged as an injector (i.e. has the tag
* processType: INJECTOR). Injector-tagged servers belong to a standby cluster's
* replica set: they remain visible in the SDAM topology so heartbeats and replication
* tracking continue to work, but server selection (in ServerSelector) excludes them so
* external client traffic is routed to non-injector members.
*/
bool isInjector() const;
void appendBsonTags(BSONObjBuilder& builder) const;
// network attributes
@ -144,6 +152,8 @@ private:
void saveTopologyVersion(BSONElement topologyVersionField);
static inline const std::string kIsDbGrid = "isdbgrid";
static inline const std::string kProcessTypeTagKey = "processType";
static inline const std::string kInjectorTagValue = "INJECTOR";
static inline const double kRttAlpha = 0.2;
// address: the hostname or IP, and the port number, that the client connects to. Note that this

View File

@ -77,10 +77,7 @@ void ServerSelector::_getCandidateServers(std::vector<ServerDescriptionPtr>* res
topologyDescription->findServers([excludedHosts](const ServerDescriptionPtr& s) {
auto isPrimaryOrSecondary = (s->getType() == ServerType::kRSPrimary ||
s->getType() == ServerType::kRSSecondary);
auto isNotExcluded =
(std::find(excludedHosts.begin(), excludedHosts.end(), s->getAddress()) ==
excludedHosts.end());
return (isPrimaryOrSecondary && isNotExcluded);
return isPrimaryOrSecondary && passesExclusionFilters(excludedHosts, s);
});
auto beginIt = eligibleServers.begin();

View File

@ -141,8 +141,16 @@ private:
bool recencyFilter(const ReadPreferenceSetting& readPref, const ServerDescriptionPtr& s);
static bool excludedHostsFilter(const std::vector<HostAndPort>& excludedHosts,
const ServerDescriptionPtr& s) {
// Returns true when the server is allowed to be selected. Rejects:
// - servers in the caller-provided excludedHosts list (e.g. retry blocklists), and
// - servers tagged as injectors (processType: INJECTOR), which belong to a standby
// cluster's replica set and are kept in the topology for replication/heartbeat
// tracking but must never receive client commands.
static bool passesExclusionFilters(const std::vector<HostAndPort>& excludedHosts,
const ServerDescriptionPtr& s) {
if (s->isInjector()) {
return false;
}
return std::find(excludedHosts.begin(), excludedHosts.end(), s->getAddress()) ==
excludedHosts.end();
}
@ -155,11 +163,14 @@ private:
using SelectionFilter = unique_function<std::function<bool(const ServerDescriptionPtr&)>(
const ReadPreferenceSetting&, const std::vector<HostAndPort>&)>;
// Note that each replica-set filter below delegates to passesExclusionFilters(), so it will
// always skip injector-tagged servers. See ServerDescription::isInjector for additional
// details.
const SelectionFilter secondaryFilter = [this](const ReadPreferenceSetting& readPref,
const std::vector<HostAndPort>& excludedHosts) {
return [&](const ServerDescriptionPtr& s) {
return (s->getType() == ServerType::kRSSecondary) && recencyFilter(readPref, s) &&
excludedHostsFilter(excludedHosts, s);
passesExclusionFilters(excludedHosts, s);
};
};
@ -167,7 +178,7 @@ private:
const std::vector<HostAndPort>& excludedHosts) {
return [&](const ServerDescriptionPtr& s) {
return (s->getType() == ServerType::kRSPrimary) && recencyFilter(readPref, s) &&
excludedHostsFilter(excludedHosts, s);
passesExclusionFilters(excludedHosts, s);
};
};
@ -176,7 +187,7 @@ private:
return [&](const ServerDescriptionPtr& s) {
return (s->getType() == ServerType::kRSPrimary ||
s->getType() == ServerType::kRSSecondary) &&
recencyFilter(readPref, s) && excludedHostsFilter(excludedHosts, s);
recencyFilter(readPref, s) && passesExclusionFilters(excludedHosts, s);
};
};

View File

@ -43,6 +43,7 @@
#include "mongo/client/sdam/topology_description.h"
#include "mongo/client/sdam/topology_state_machine.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/server_options.h"
#include "mongo/db/wire_version.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@ -149,6 +150,44 @@ public:
};
ServerSelector selector = ServerSelector(sdamConfiguration);
struct InjectorPrimaryTopology {
std::shared_ptr<TopologyDescription> topologyDescription;
TopologyStateMachine stateMachine;
InjectorPrimaryTopology(const SdamConfiguration& config)
: topologyDescription(std::make_shared<TopologyDescription>(config)),
stateMachine(config) {
const auto injector = ServerDescriptionBuilder()
.withAddress(HostAndPort("s0"))
.withType(ServerType::kRSPrimary)
.withLastUpdateTime(Date_t::now())
.withLastWriteDate(Date_t::now())
.withRtt(Milliseconds{1})
.withSetName("set")
.withHost(HostAndPort("s0"))
.withHost(HostAndPort("s1"))
.withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
.withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
.withTag("processType", "INJECTOR")
.withElectionId(kOidOne)
.withSetVersion(100)
.instance();
stateMachine.onServerDescription(*topologyDescription, injector);
const auto secondary = ServerDescriptionBuilder()
.withAddress(HostAndPort("s1"))
.withType(ServerType::kRSSecondary)
.withRtt(Milliseconds{1})
.withSetName("set")
.withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
.withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
.withLastUpdateTime(Date_t::now())
.withLastWriteDate(Date_t::now())
.instance();
stateMachine.onServerDescription(*topologyDescription, secondary);
}
};
};
TEST_F(ServerSelectorTestFixture, ShouldFilterCorrectlyByLatencyWindow) {
@ -972,4 +1011,94 @@ TEST_F(ServerSelectorTestFixture, ShouldIgnoreMinClusterTimeIfNotSatisfiable) {
ASSERT_EQ(result->size(), 1);
ASSERT_EQ((*result)[0]->getAddress(), s0->getAddress());
}
class ConfigOnlyServerSelectorTest : public ServerSelectorTestFixture {
public:
void setUp() override {
ServerSelectorTestFixture::setUp();
serverGlobalParams.configOnly = true;
}
void tearDown() override {
serverGlobalParams.configOnly = false;
ServerSelectorTestFixture::tearDown();
}
};
TEST_F(ConfigOnlyServerSelectorTest, ShouldExcludeInjectorFromPrimaryOnlySelection) {
InjectorPrimaryTopology topo(sdamConfiguration);
auto result = selector.selectServers(
topo.topologyDescription, ReadPreferenceSetting(ReadPreference::PrimaryOnly), {});
ASSERT_EQ(adaptForAssert(boost::none), adaptForAssert(result));
}
TEST_F(ConfigOnlyServerSelectorTest, ShouldExcludeInjectorFromNearestSelection) {
InjectorPrimaryTopology topo(sdamConfiguration);
std::map<HostAndPort, int> frequencyInfo{{HostAndPort("s0"), 0}, {HostAndPort("s1"), 0}};
for (int i = 0; i < NUM_ITERATIONS; i++) {
auto server = selector.selectServer(
topo.topologyDescription, ReadPreferenceSetting(ReadPreference::Nearest), {});
if (server) {
frequencyInfo[(*server)->getAddress()]++;
}
}
ASSERT_FALSE(frequencyInfo[HostAndPort("s0")]);
ASSERT_EQ(frequencyInfo[HostAndPort("s1")], NUM_ITERATIONS);
}
TEST_F(ConfigOnlyServerSelectorTest, ShouldExcludeInjectorFromPrimaryPreferredSelection) {
InjectorPrimaryTopology topo(sdamConfiguration);
std::map<HostAndPort, int> frequencyInfo{{HostAndPort("s0"), 0}, {HostAndPort("s1"), 0}};
for (int i = 0; i < NUM_ITERATIONS; i++) {
auto server = selector.selectServer(
topo.topologyDescription, ReadPreferenceSetting(ReadPreference::PrimaryPreferred), {});
if (server) {
frequencyInfo[(*server)->getAddress()]++;
}
}
ASSERT_FALSE(frequencyInfo[HostAndPort("s0")]);
ASSERT_EQ(frequencyInfo[HostAndPort("s1")], NUM_ITERATIONS);
}
TEST_F(ServerSelectorTestFixture, ShouldNotExcludeInjectorWhenNotConfigOnly) {
InjectorPrimaryTopology topo(sdamConfiguration);
auto result = selector.selectServers(
topo.topologyDescription, ReadPreferenceSetting(ReadPreference::PrimaryOnly), {});
ASSERT(result);
ASSERT_EQ(1, result->size());
ASSERT_EQ(HostAndPort("s0"), (*result)[0]->getAddress());
}
TEST_F(ServerSelectorTestFixture, ShouldNotExcludeNonInjectorWithOtherTags) {
TopologyStateMachine stateMachine(sdamConfiguration);
auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
const auto primary = ServerDescriptionBuilder()
.withAddress(HostAndPort("s0"))
.withType(ServerType::kRSPrimary)
.withLastUpdateTime(Date_t::now())
.withLastWriteDate(Date_t::now())
.withRtt(Milliseconds{1})
.withSetName("set")
.withHost(HostAndPort("s0"))
.withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
.withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
.withTag("processType", "NORMAL")
.withElectionId(kOidOne)
.withSetVersion(100)
.instance();
stateMachine.onServerDescription(*topologyDescription, primary);
auto result = selector.selectServers(
topologyDescription, ReadPreferenceSetting(ReadPreference::PrimaryOnly), {});
ASSERT(result);
ASSERT_EQ(1, result->size());
ASSERT_EQ(HostAndPort("s0"), (*result)[0]->getAddress());
}
} // namespace mongo::sdam

View File

@ -695,7 +695,30 @@ bool CommandHelpers::shouldActivateFailCommandFailPoint(const BSONObj& data,
}
}
if (data.hasField("failAllCommands")) {
if (data.hasField("failAllCommands") && data.getBoolField("failAllCommands")) {
// "failCommandsExcept" is an allowlist that only applies when "failAllCommands" is set:
// commands matching any entry are exempt from the failpoint. Aliases registered on the
// same command class are honored (e.g. listing "isMaster" also exempts "ismaster"), but
// "hello" and "isMaster" are separate command classes and must each be listed to exempt
// both.
if (auto exemptField = data["failCommandsExcept"]; exemptField.type() == BSONType::array) {
for (auto&& exemptCmd : exemptField.Array()) {
if (exemptCmd.type() == BSONType::string &&
cmd->hasAlias(exemptCmd.valueStringData())) {
LOGV2(12322200,
"Skipping 'failCommand' failpoint for command exempt via "
"'failCommandsExcept'",
"data"_attr = data,
"threadName"_attr = threadName,
"appName"_attr = appName,
logAttrs(nss),
"isInternalClient"_attr = isInternalThreadOrClient,
"isOnPriorityPort"_attr = isOnPriorityPort,
"command"_attr = cmd->getName());
return false;
}
}
}
LOGV2(6348500,
"Activating 'failCommand' failpoint for all commands",
"data"_attr = data,