mongo/buildscripts/resmokelib/testing/hooks/fuzz_runtime_parameters.py
Gregory Wlodarek b1c4183b26 SERVER-123704 Add Interrupted to retryable errors in fuzz_runtime_parameters hook (#51462)
GitOrigin-RevId: ef9ac813eb3e070d69a729e466bc15194c634d17
2026-04-13 13:34:58 +00:00

449 lines
19 KiB
Python

"""Test hook that periodically makes the primary of a replica set step down."""
import copy
import random
import re
import sys
import threading
import time
from pymongo.errors import OperationFailure
from buildscripts.resmokelib import config, errors
from buildscripts.resmokelib.generate_fuzz_config.mongo_fuzzer_configs import (
generate_normal_mongo_parameters,
generate_special_runtime_parameters,
is_enterprise_param_available,
)
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import replicaset, shardedcluster, standalone
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interface
from buildscripts.resmokelib.testing.retry import with_naive_retry
def validate_runtime_parameter_spec(spec):
for key, value in spec.items():
if not (isinstance(value, dict) and value.get("period", 0) >= 1):
raise ValueError(
f"Invalid runtime parameter fuzz config entry for key '{key}' : {value}"
)
def build_client(node, auth_options):
"""Build a pymongo MongoClient for the given node with the given auth options."""
if config.IS_ASAN:
return fixture_interface.build_client(node, auth_options, timeout_millis=120000)
return fixture_interface.build_client(node, auth_options)
class RuntimeParametersState:
"""Encapsulates the runtime-state of a set of parameters we are fuzzing. Tracks the last time we set a parameter value and holds
the logic for generating new values."""
def __init__(self, spec, seed):
# Initialize the runtime state of each parameter in the spec, including the lastSet time at now, so we start setting the parameters
# at appropriate intervals after the suite begins.
now = time.time()
self._params = {
key: {**copy.deepcopy(value), "lastSet": now} for key, value in spec.items()
}
self._rng = random.Random(seed)
def generate_parameters(self):
"""Returns a dictionary of what parameters should be set now, along with values to set them to, based on the last time the
parameter was set and the period provided in the spec"""
ret = {}
now = time.time()
for key, value in self._params.items():
if now - value["lastSet"] >= value["period"]:
if not value.get("custom_fuzz_value_assignment", False):
ret[key] = generate_normal_mongo_parameters(self._rng, value)
else:
ret[key] = generate_special_runtime_parameters(self._rng, key, value)
value["lastSet"] = now
return ret
def get_spec(self):
"""Return a dictionary of all parameters subject to runtime fuzzing suitable for use with getParameter."""
return {key: 1 for key in self._params.keys()}
def get_keys(self):
"""Returns all parameter names."""
return list(self._params.keys())
class FuzzRuntimeParameters(interface.Hook):
"""Regularly connect to nodes and sends them a setParameter command."""
DESCRIPTION = "Changes the value of runtime-settable parameters at regular intervals"
IS_BACKGROUND = True
def __init__(
self,
hook_logger,
fixture,
seed=random.randrange(sys.maxsize),
auth_options=None,
):
"""Initialize the FuzzRuntimeParameters.
Args:
hook_logger: the logger instance for this hook.
fixture: the target fixture (standalone, replica set, sharded cluster, or multi-cluster fixture).
auth_options: dictionary of auth options.
"""
interface.Hook.__init__(self, hook_logger, fixture, FuzzRuntimeParameters.DESCRIPTION)
self._mongod_param_state = None
self._mongos_param_state = None
self._cluster_param_state = None
self._seed = seed
self._fixture = fixture
self._standalone_fixtures = []
self._rs_fixtures = []
self._mongos_fixtures = []
self._setParameter_thread = None
self._auth_options = auth_options
def before_suite(self, test_report):
"""Before suite."""
for cluster in self._fixture.get_testable_clusters():
self._add_fixture(cluster)
from buildscripts.resmokelib import config
from buildscripts.resmokelib.generate_fuzz_config.config_fuzzer_limits import (
config_fuzzer_params,
)
# Get only the mongod and mongos parameters that have "runtime" in the "fuzz_at" param value.
runtime_mongod_params = {
param: val
for param, val in config_fuzzer_params["mongod"].items()
if "runtime" in val.get("fuzz_at", []) and is_enterprise_param_available(val)
}
runtime_mongos_params = {
param: val
for param, val in config_fuzzer_params["mongos"].items()
if "runtime" in val.get("fuzz_at", []) and is_enterprise_param_available(val)
}
# Get cluster parameters
cluster_params = {
param: val
for param, val in config_fuzzer_params["cluster"].items()
if is_enterprise_param_available(val)
}
# Flow control-related parameters should only be fuzzed when enableFlowControl is set to True at startup.
enableFlowControl = re.search(r"enableFlowControl:\s+(\w+)", config.MONGOD_SET_PARAMETERS)
if not enableFlowControl or enableFlowControl.group(1) == "false":
runtime_mongod_params = {
k: v for k, v in runtime_mongod_params.items() if "flowControl" not in k
}
auditingEnabled = config.MONGOD_EXTRA_CONFIG.get("auditRuntimeConfiguration", "off") == "on"
if "auditConfig" in cluster_params and not auditingEnabled:
# auditConfig requires auditing to be enabled, so we should not fuzz it if auditing is disabled.
del cluster_params["auditConfig"]
validate_runtime_parameter_spec(runtime_mongod_params)
validate_runtime_parameter_spec(runtime_mongos_params)
validate_runtime_parameter_spec(cluster_params)
# Construct the runtime state before the suite begins.
# The initial lastSet time of each parameter is the start time of the suite.
self._mongod_param_state = RuntimeParametersState(runtime_mongod_params, self._seed)
self._mongos_param_state = RuntimeParametersState(runtime_mongos_params, self._seed)
self._cluster_param_state = RuntimeParametersState(cluster_params, self._seed)
self._setParameter_thread = _SetParameterThread(
self.logger,
self._mongos_fixtures,
self._rs_fixtures,
self._standalone_fixtures,
self._fixture,
self._mongod_param_state,
self._mongos_param_state,
self._cluster_param_state,
lifecycle_interface.FlagBasedThreadLifecycle(),
self._auth_options,
)
self.logger.info("Starting the runtime parameter fuzzing thread.")
self._setParameter_thread.start()
def after_suite(self, test_report, teardown_flag=None):
"""After suite."""
self.logger.info("Stopping the runtime parameter fuzzing thread.")
self._setParameter_thread.stop()
self.logger.info("Runtime parameter fuzzing thread stopped.")
def before_test(self, test, test_report):
"""Before test. Log current config of all runtime-fuzzable params."""
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
for mongos in self._mongos_fixtures:
self._invoke_get_parameter_and_log(mongos)
if self._mongos_fixtures:
self._invoke_get_cluster_parameter_and_log(self._mongos_fixtures[0])
self.logger.info("Resuming the runtime parameter fuzzing thread.")
self._setParameter_thread.pause()
self._setParameter_thread.resume()
def after_test(self, test, test_report):
"""After test. Log current config of all runtime-fuzzable params."""
self.logger.info("Pausing the runtime parameter fuzzing thread.")
self._setParameter_thread.pause()
self.logger.info("Paused the runtime parameter fuzzing thread.")
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
for mongos in self._mongos_fixtures:
self._invoke_get_parameter_and_log(mongos)
if self._mongos_fixtures:
self._invoke_get_cluster_parameter_and_log(self._mongos_fixtures[0])
def _add_fixture(self, fixture):
if isinstance(fixture, standalone.MongoDFixture):
self._standalone_fixtures.append(fixture)
elif isinstance(fixture, replicaset.ReplicaSetFixture):
self._rs_fixtures.append(fixture)
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
for shard_fixture in fixture.shards:
self._add_fixture(shard_fixture)
if fixture.config_shard is None:
self._add_fixture(fixture.configsvr)
for mongos_fixture in fixture.mongos:
self._mongos_fixtures.append(mongos_fixture)
else:
raise ValueError("No fixture to run setParameter on.")
def _invoke_get_parameter_and_log(self, node):
"""Helper to print the current state of a node's runtime-fuzzable parameters. Only usable once before_suite has initialized the runtime state of the parameters."""
client = build_client(node, self._auth_options)
params_to_get = (
self._mongos_param_state.get_spec()
if client.is_mongos
else self._mongod_param_state.get_spec()
)
get_result = client.admin.command("getParameter", 1, **params_to_get)
self.logger.info(
"Current state of runtime-fuzzable parameters on node on port %d. Parameters: %s",
node.port,
get_result,
)
def _invoke_get_cluster_parameter_and_log(self, node):
"""Helper to print the current state of a cluster's fuzzable cluster parameters. Only usable once before_suite has initialized the runtime state of the parameters."""
client = build_client(node, self._auth_options)
params_to_get = self._cluster_param_state.get_keys()
get_result = client.admin.command("getClusterParameter", params_to_get)
self.logger.info(
"Current state of fuzzable cluster parameters on cluster. Parameters: %s",
get_result,
)
class _SetParameterThread(threading.Thread):
def __init__(
self,
logger,
mongos_fixtures,
rs_fixtures,
standalone_fixtures,
fixture,
mongod_param_state: RuntimeParametersState,
mongos_param_state: RuntimeParametersState,
cluster_param_state: RuntimeParametersState,
lifecycle,
auth_options=None,
):
"""Initialize _SetParameterThread."""
threading.Thread.__init__(self, name="SetParameterThread")
self.daemon = True
self.logger = logger
self._mongos_fixtures = mongos_fixtures
self._rs_fixtures = rs_fixtures
self._standalone_fixtures = standalone_fixtures
self._fixture = fixture
self._mongod_param_state = mongod_param_state
self._mongos_param_state = mongos_param_state
self._cluster_param_state = cluster_param_state
self.__lifecycle = lifecycle
self._auth_options = auth_options
self._setparameter_interval_secs = 1
self._last_exec = time.time()
self._pause_timeout_secs = fixture_interface.ReplFixture.AWAIT_REPL_TIMEOUT_MINS * 60
self._stop_timeout_secs = fixture_interface.ReplFixture.AWAIT_REPL_TIMEOUT_MINS * 60
self._thread_state = lifecycle_interface.HookThreadState()
def run(self):
"""Execute the thread."""
try:
while True:
self._thread_state.mark_idle("waiting_for_action_permitted")
permitted = self.__lifecycle.wait_for_action_permitted()
if not permitted:
self._thread_state.mark_stopped("lifecycle_stop_requested")
break
self._thread_state.mark_running("set_parameter_cycle")
now = time.time()
if now - self._last_exec > self._setparameter_interval_secs:
self._thread_state.set_phase("do_set_parameter")
self._do_set_parameter()
self._last_exec = time.time()
found_idle_request = self.__lifecycle.poll_for_idle_request()
if found_idle_request:
self._thread_state.set_phase("sending_idle_acknowledgement")
self.__lifecycle.send_idle_acknowledgement()
continue
# The 'wait_secs' is used to wait 'self._setparameter_interval_secs' from the moment
# the last setParameter command was sent.
now = time.time()
wait_secs = max(0, self._setparameter_interval_secs - (now - self._last_exec))
self._thread_state.mark_idle("waiting_for_action_interval")
self.__lifecycle.wait_for_action_interval(wait_secs)
except Exception as err:
# Proactively log the exception when it happens so it will be
# flushed immediately.
self.logger.exception("SetParameter thread threw exception")
self._thread_state.mark_failed(err, "run_loop")
finally:
state, _phase = self._thread_state.describe()
if state not in ("failed", "stopped"):
self._thread_state.mark_stopped("run_loop_exited")
def stop(self):
"""Stop the thread."""
self._thread_state.mark_stopping("stop_requested")
self.__lifecycle.stop()
# Unpause to allow the thread to finish.
self.resume()
self.join(self._stop_timeout_secs)
if self.is_alive():
state, phase = self._thread_state.describe()
raise errors.ServerFailure(
"Timed out waiting for setParameter thread to stop; "
f"state={state}, phase={phase}, timeout={self._stop_timeout_secs}s."
)
self._thread_state.assert_healthy(self.is_alive(), "setParameter", allow_stopped=True)
def pause(self):
"""Pause the thread."""
self.__lifecycle.mark_test_finished()
# Wait until we are no longer executing stepdowns.
self._thread_state.wait_until_idle(self._pause_timeout_secs, "setParameter")
self._thread_state.assert_healthy(self.is_alive(), "setParameter")
# Check that fixtures are still running
for rs_fixture in self._rs_fixtures:
if not rs_fixture.is_running():
raise errors.ServerFailure(
"ReplicaSetFixture with pids {} expected to be running in"
" SetParameter, but wasn't.".format(rs_fixture.pids())
)
for mongos_fixture in self._mongos_fixtures:
if not mongos_fixture.is_running():
raise errors.ServerFailure(
"MongoSFixture with pids {} expected to be running in"
" SetParameter, but wasn't.".format(mongos_fixture.pids())
)
def resume(self):
"""Resume the thread."""
self.__lifecycle.mark_test_started()
def _do_set_parameter(self):
mongod_params_to_set = self._mongod_param_state.generate_parameters()
mongos_params_to_set = self._mongos_param_state.generate_parameters()
cluster_params_to_set = self._cluster_param_state.generate_parameters()
def invoke_set_parameter(client, params):
# Do nothing if there are no params to set this iteration.
if not params:
return
client.admin.command("setParameter", 1, **params)
def invoke_set_cluster_parameter(client, params):
for key, value in params.items():
try:
with_naive_retry(
lambda: client.admin.command("setClusterParameter", {key: value}),
# Retry on
#
# ConflictingOperationInProgress
# as setClusterParameter is a DDL operation, it might clash with other
# operations
#
# AddOrRemoveShardInProgress
# setClusterParameter explicitly clashes with that one too
#
# Interrupted
# kill_sessions workloads can interrupt the setClusterParameter operation
# by killing its session
extra_retryable_error_codes=[117, 414, 11601],
)
except OperationFailure as exc:
# BadValue (code 2) might happen when we do a downgrade and try to set a cluster
# variable that is not supported in the given FCV
if exc.code != 2:
raise
for repl_set in self._rs_fixtures:
self.logger.info(
"Setting parameters on all nodes of replica set %s. Parameters: %s",
repl_set.replset_name,
mongod_params_to_set,
)
for node in repl_set.nodes:
invoke_set_parameter(build_client(node, self._auth_options), mongod_params_to_set)
for standalone in self._standalone_fixtures:
self.logger.info(
"Setting parameters on standalone on port %d. Parameters: %s",
standalone.port,
mongod_params_to_set,
)
invoke_set_parameter(build_client(standalone, self._auth_options), mongod_params_to_set)
for mongos in self._mongos_fixtures:
self.logger.info(
"Setting parameters on mongos port %d. Parameters: %s",
mongos.port,
mongos_params_to_set,
)
invoke_set_parameter(build_client(mongos, self._auth_options), mongos_params_to_set)
if self._mongos_fixtures:
self.logger.info(
"Setting parameters cluster. Parameters: %s",
cluster_params_to_set,
)
invoke_set_cluster_parameter(
build_client(self._mongos_fixtures[0], self._auth_options),
cluster_params_to_set,
)