SERVER-108002: Correct replayWorkloadRecordingFile validation (8.2) (#39376)

GitOrigin-RevId: 278ad65cd078f0f68ce16d0cdebf0196fb542a08
This commit is contained in:
James H 2025-10-15 15:43:46 +01:00 committed by MongoDB Bot
parent a5eeb8939e
commit f913620192
6 changed files with 272 additions and 37 deletions

View File

@ -82,6 +82,29 @@ const replayWorkloadLambda = (recordingFilePath, serverURI) => {
replayWorkloadRecordingFile(recordingFilePath, serverURI);
};
// First, test the native method with _invalid_ input, and check appropriate errors are returned.
// Too few arguments
assert.throwsWithCode(() => replayWorkloadRecordingFile(), ErrorCodes.FailedToParse);
assert.throwsWithCode(() => replayWorkloadRecordingFile("asdf"), ErrorCodes.FailedToParse);
// Too many arguments
assert.throwsWithCode(() => replayWorkloadRecordingFile("foo", "bar", "baz"),
ErrorCodes.FailedToParse);
// Invalid directory
assert.throwsWithCode(() => replayWorkloadRecordingFile("asdf", "asdf"), ErrorCodes.FileNotOpen);
// Empty directory
// Burn-in runs multiple instances; ensure directory is unique.
let realDirectory = "real_directory_" + UUID().hex();
mkdir(realDirectory);
// The cluster spec is _not_ valid here, but is currently not validated until
// first required to connect.
// TODO: SERVER-108026 validate the cluster string earlier.
assert.doesNotThrow(() => replayWorkloadRecordingFile(realDirectory, "asdf"));
removeFile(realDirectory);
// ======================================================================================== //
// Recording
const initialResults = runInstances("traffic_recording", "recordings", defaultOperationsLambda);

View File

@ -66,6 +66,15 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "stop_token",
srcs = [
],
hdrs = [
"stop_token.h",
],
)
mongo_cc_unit_test(
name = "query_util_test",
srcs = [

View File

@ -0,0 +1,94 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
/**
* Shim utilities for std::stop_source, which is not yet available in all
* compiler versions in use.
*
* Approximates the interface of std::stop_source, std::stop_token.
*/
#include "mongo/platform/atomic.h"
#include <memory>
namespace mongo {
// Shared state held by one stop_source, and N stop_tokens.
struct stop_state {
mongo::Atomic<bool> stop = false;
};
class stop_token;
/**
* Used to inform N tasks/threads (via stop_tokens) that they are requested to stop.
*/
class stop_source {
public:
stop_token get_token() const;
void request_stop() {
_state->stop.store(true);
}
~stop_source() {
request_stop();
}
private:
std::shared_ptr<stop_state> _state = std::make_shared<stop_state>();
};
/**
* Used by a task/thread to periodically check if it has been requested to stop.
*
* Created from a stop_source with get_token().
*/
class stop_token {
public:
stop_token() = default;
stop_token(std::shared_ptr<stop_state> state) : _state(std::move(state)) {}
bool stop_requested() const {
return _state && _state->stop.load();
}
bool stop_possible() const {
return bool(_state);
}
private:
std::shared_ptr<stop_state> _state = nullptr;
};
inline stop_token stop_source::get_token() const {
return {_state};
}
} // namespace mongo

View File

@ -34,6 +34,7 @@ mongo_cc_library(
"//src/mongo/client:clientdriver_network",
"//src/mongo/db:traffic_reader",
"//src/mongo/db/concurrency:lock_manager",
"//src/mongo/db/query/util:stop_token",
"//src/mongo/rpc:message",
"//src/mongo/util:periodic_runner",
"//src/mongo/util:periodic_runner_factory",

View File

@ -29,37 +29,160 @@
#include "mongo/replay/replay_client.h"
#include "mongo/db/traffic_reader.h"
#include "mongo/db/query/util/stop_token.h"
#include "mongo/replay/recording_reader.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_config.h"
#include "mongo/replay/session_handler.h"
#include "mongo/stdx/future.h"
#include "mongo/util/assert_util.h"
#include <condition_variable>
#include <exception>
#include <mutex>
#include <string>
namespace mongo {
void replayThread(const ReplayConfig& replayConfig) {
/**
* Helper class for for applying a callable to a container of N elements,
* spawning a new thread for each element.
*
* e.g.,
* std::vector<Task> vec{...};
* Async::apply(vec, [](token stop_token, auto value) {
* // Do something on a separate thread per task.
* });
*
* Tasks receive a stop token, and will be requested to stop after any
* task throws an exception. Once all tasks have finished, either the
* apply call will return, or an exception from one of the tasks
* will be re-thrown.
*
*/
class Async {
public:
Async() = default;
Async(const Async&) = delete;
Async(Async&&) = delete;
Async& operator=(const Async&) = delete;
Async& operator=(Async&&) = delete;
/**
* Apply a callable to each element of a container in a separate thread, passing the provided
* args to each.
*
* If any invocation results in an exception, signal all threads to stop, wait for them to exit,
* then rethrow the exception.
*/
static void apply(auto container, auto callable) {
const auto taskCount = std::size(container);
std::vector<stdx::thread> instances;
instances.reserve(taskCount);
Async state;
for (auto&& task : container) {
state.started();
instances.push_back(stdx::thread([&]() {
try {
callable(state.stop.get_token(), task);
state.success();
} catch (...) {
state.fail(std::current_exception());
}
}));
}
state.wait();
for (auto& instance : instances) {
if (instance.joinable()) {
instance.join();
}
}
state.maybeRethrow();
}
private:
void started() {
auto lh = std::unique_lock(lock);
++running;
}
void success() {
auto lh = std::unique_lock(lock);
--running;
cv.notify_all();
}
void fail(std::exception_ptr ptr) {
auto lh = std::unique_lock(lock);
--running;
if (!exception) {
exception = ptr;
}
cv.notify_all();
}
/**
* Wait until all threads exit.
*
* If any thread ends with an exception, signal all to stop, wait for all to exit.
*/
void wait() {
auto lh = std::unique_lock(lock);
// Wait until an exception is reported, or all threads finish.
cv.wait(lh, [&]() { return exception || running == 0; });
if (exception) {
stop.request_stop();
}
// Wait for all threads to finish
cv.wait(lh, [&]() { return running == 0; });
}
void maybeRethrow() {
if (exception) {
std::rethrow_exception(exception);
}
}
mongo::stop_source stop;
std::mutex lock;
std::condition_variable cv; // NOLINT
size_t running = 0;
std::exception_ptr exception = nullptr;
};
/**
* Consumes a collection of recording files from a _single_ node.
*
* Handles creation of threads to replay individual sessions contained within.
*/
void recordingDispatcher(mongo::stop_token stop, const ReplayConfig& replayConfig) {
RecordingReader reader{replayConfig.recordingPath};
const auto bsonRecordedCommands = reader.processRecording();
uassert(ErrorCodes::ReplayClientInternalError,
"The list of recorded commands cannot be empty",
!bsonRecordedCommands.empty());
// create a new session handler for mananging the recording.
SessionHandler sessionHandler;
// setup recording and replaying starting time
auto firstCommand = bsonRecordedCommands[0];
sessionHandler.setStartTime(ReplayCommand{firstCommand}.fetchRequestTimestamp());
try {
RecordingReader reader{replayConfig.recordingPath};
const auto bsonRecordedCommands = reader.processRecording();
uassert(ErrorCodes::ReplayClientInternalError,
"The list of recorded commands cannot be empty",
!bsonRecordedCommands.empty());
// create a new session handler for mananging the recording.
SessionHandler sessionHandler;
// setup recording and replaying starting time
auto firstCommand = bsonRecordedCommands[0];
sessionHandler.setStartTime(ReplayCommand{firstCommand}.fetchRequestTimestamp());
for (const auto& bsonCommand : bsonRecordedCommands) {
if (stop.stop_requested()) {
return;
}
ReplayCommand command{bsonCommand};
if (command.isStartRecording()) {
// will associated the URI to a session task and run all the commands associated
@ -73,30 +196,13 @@ void replayThread(const ReplayConfig& replayConfig) {
sessionHandler.onBsonCommand(replayConfig.mongoURI, command);
}
}
} catch (const DBException& ex) {
// If we have reached this point we have encountered a problem in the recording. Either a
// ill recording file or some connectivity issue.
// TODO SERVER-106495: report and record these errors.
tasserted(ErrorCodes::ReplayClientSessionSimulationError,
"DBException in handleAsyncResponse, terminating due to:" + ex.toString());
} catch (const std::exception& e) {
tasserted(ErrorCodes::ReplayClientInternalError, e.what());
} catch (...) {
tasserted(ErrorCodes::ReplayClientInternalError, "Unknown error.");
}
}
void ReplayClient::replayRecording(const ReplayConfigs& configs) {
std::vector<stdx::thread> instances;
for (const auto& config : configs) {
instances.push_back(stdx::thread(replayThread, std::ref(config)));
}
for (auto& instance : instances) {
if (instance.joinable()) {
instance.join();
}
}
Async::apply(configs, recordingDispatcher);
}
void ReplayClient::replayRecording(const std::string& recordingFileName, const std::string& uri) {

View File

@ -987,7 +987,9 @@ MongoProgramScope::~MongoProgramScope() {
BSONObj ReplayWorkloadRecordingFile(const BSONObj& a, void*) {
int nFields = a.nFields();
uassert(ErrorCodes::FailedToParse, "wrong number of arguments", nFields <= 2);
uassert(ErrorCodes::FailedToParse,
"Exactly two arguments are required (data path, connection string)",
nFields == 2);
std::vector<BSONElement> elems;
a.elems(elems);