diff --git a/jstests/noPassthrough/traffic_recording/traffic_replaying.js b/jstests/noPassthrough/traffic_recording/traffic_replaying.js index 70d818bee41..e003921ca61 100644 --- a/jstests/noPassthrough/traffic_recording/traffic_replaying.js +++ b/jstests/noPassthrough/traffic_recording/traffic_replaying.js @@ -8,7 +8,7 @@ import { } from "jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js"; function parseRecordedTraffic(recordingFilePath) { - const recordedTraffic = convertTrafficRecordingToBSON(recordingFilePath); + const recordedTraffic = convertTrafficRecordingToBSON(recordingFilePath, true); const opTypes = {}; recordedTraffic.forEach((obj) => { const opType = obj.opType; @@ -18,12 +18,11 @@ function parseRecordedTraffic(recordingFilePath) { } function recordAndParseOperations(recordingDirGlobal, customRecordingDir, workflowCallback) { - const {mongodInstance, coll, recordingFilePath} = recordOperations( + const {mongodInstance, coll, recordingFilePath, serverURI} = recordOperations( recordingDirGlobal, customRecordingDir, workflowCallback, ); - const serverURI = `mongodb://${mongodInstance.host}`; MongoRunner.stopMongod(mongodInstance, null, {user: "admin", pwd: "pass"}); @@ -42,12 +41,14 @@ function runInstances(baseDir, customSubDir, workflowCallback) { const defaultOperationsLambda = (dbContext) => { const {testDB, coll} = dbContext; - assert.commandWorked(coll.insert({name: "foo biz bar"})); + for (let i = 0; i < 200; ++i) { + assert.commandWorked(coll.insert({name: "foo biz bar", i})); + } assert.eq("foo biz bar", coll.findOne().name); assert.commandWorked(coll.insert({name: "foo bar"})); assert.eq("foo bar", coll.findOne({name: "foo bar"}).name); assert.commandWorked(coll.deleteOne({})); - assert.eq(1, coll.aggregate().toArray().length); + assert.eq(200, coll.aggregate().toArray().length); assert.commandWorked(coll.update({}, {})); }; @@ -95,10 +96,11 @@ removeFile(realDirectory); // Recording const initialResults = runInstances("traffic_recording_" + UUID().hex(), "recordings", defaultOperationsLambda); assert.eq(initialResults.opTypes["serverStatus"], 1); -assert.eq(initialResults.opTypes["insert"], 2); +assert.eq(initialResults.opTypes["insert"], 201); assert.eq(initialResults.opTypes["find"], 2); assert.eq(initialResults.opTypes["delete"], 1); assert.eq(initialResults.opTypes["aggregate"], 1); +assert.eq(initialResults.opTypes["getMore"], 1); assert.eq(initialResults.opTypes["update"], 1); assert.eq(initialResults.opTypes["sessionStart"], 1); assert.eq(initialResults.opTypes["sessionEnd"], 1); @@ -118,14 +120,29 @@ const replayResults = runInstances("replayed_recording_" + UUID().hex(), "replay // in order to compute the filepath, we issue a server status inside runInstances, this plus the // one recorded will bring total count to 2. assert.eq(replayResults.opTypes["serverStatus"], 2); -assert.eq(replayResults.opTypes["insert"], 2); +assert.eq(replayResults.opTypes["insert"], 201); assert.eq(replayResults.opTypes["find"], 2); assert.eq(replayResults.opTypes["delete"], 1); assert.eq(replayResults.opTypes["aggregate"], 1); +assert.eq(initialResults.opTypes["getMore"], 1); assert.eq(replayResults.opTypes["update"], 1); -assert.gte(initialResults.opTypes["sessionStart"], 1); -assert.eq(initialResults.opTypes["sessionEnd"], 1); +assert.gte(replayResults.opTypes["sessionStart"], 1); +assert.gte(replayResults.opTypes["sessionEnd"], 1); // ======================================================================================== // +function getRecordedGetMore(recordedTraffic) { + for (let req of recordedTraffic) { + if (req.opType == "getMore") { + jsTest.log.debug(tojson(req.rawop.body)); + return req.rawop.body["getMore"]; + } + } +} + +let origCursor = getRecordedGetMore(initialResults.recordedTraffic); +let replayCursor = getRecordedGetMore(replayResults.recordedTraffic); + +assert.neq(origCursor, replayCursor); + cleanUpDirectory(initialResults.recordingDirGlobal); cleanUpDirectory(replayResults.recordingDirGlobal); diff --git a/jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js b/jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js index c6f7facd32b..765e745411a 100644 --- a/jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js +++ b/jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js @@ -40,7 +40,9 @@ export function recordOperations(recordingDirGlobal, customRecordingDir, opsToRe assert.commandWorked(adminDB.runCommand({startTrafficRecording: 1, destination: customRecordingDir})); - const dbContext = {adminDB, testDB, coll, serverURI: `mongodb://${mongodInstance.host}`}; + const serverURI = `mongodb://admin:pass@${mongodInstance.host}/admin`; + + const dbContext = {adminDB, testDB, coll, serverURI}; opsToRecord(dbContext); @@ -49,5 +51,5 @@ export function recordOperations(recordingDirGlobal, customRecordingDir, opsToRe assert.commandWorked(adminDB.runCommand({stopTrafficRecording: 1})); - return {mongodInstance, coll, recordingFilePath}; + return {mongodInstance, coll, recordingFilePath, serverURI}; } diff --git a/src/mongo/replay/BUILD.bazel b/src/mongo/replay/BUILD.bazel index 31318072033..2af98b9c185 100644 --- a/src/mongo/replay/BUILD.bazel +++ b/src/mongo/replay/BUILD.bazel @@ -13,6 +13,7 @@ mongo_cc_library( name = "mongor_lib", srcs = [ "config_handler.cpp", + "cursor_rewrite_hook.cpp", "performance_reporter.cpp", "rawop_document.cpp", "recording_reader.cpp", diff --git a/src/mongo/replay/cursor_rewrite_hook.cpp b/src/mongo/replay/cursor_rewrite_hook.cpp new file mode 100644 index 00000000000..9c5be0629ce --- /dev/null +++ b/src/mongo/replay/cursor_rewrite_hook.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/replay/cursor_rewrite_hook.h" + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/logv2/log.h" +#include "mongo/replay/replay_command.h" +#include "mongo/replay/replay_hook_manager.h" +#include "mongo/util/assert_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +namespace mongo { + + +void CursorRewriteHook::onRequest(ReplayCommand& command) { + if (command.parseOpType() != "getMore") { + return; + } + + auto body = command.fetchMsgRequest().body; + auto recCursor = body["getMore"].numberLong(); + tassert(10823800, "getMore request has invalid cursor", recCursor); + auto liveCursor = _cursorMap.find(recCursor); + if (liveCursor == _cursorMap.end()) { + return; + } + auto newBody = body.addField(BSON("getMore" << liveCursor->second).firstElement()); + + Message message; + message.setData(dbMsg, newBody.objdata(), newBody.objsize()); + OpMsg::removeChecksum(&message); + OpMsg::appendChecksum(&message); + + LOGV2_DEBUG(10893014, 5, "getMore rewritten", "old"_attr = body, "new"_attr = newBody); + command.replaceBody(newBody); +} +void CursorRewriteHook::onLiveResponse(const ReplayCommand& recordedResponse, + const BSONObj& liveResponse) { + const auto& rec = recordedResponse.fetchMsgRequest().body; + auto recCursor = rec.getField("cursor"); + auto liveCursor = liveResponse.getField("cursor"); + if (recCursor && liveCursor) { + _cursorMap[recCursor["id"].numberLong()] = liveCursor["id"].numberLong(); + } +} + + +static const bool _CursorRewriteHookRegistered = + (ReplayObserverManager::get().registerPerSessionObserver(), true); + +} // namespace mongo diff --git a/src/mongo/replay/cursor_rewrite_hook.h b/src/mongo/replay/cursor_rewrite_hook.h new file mode 100644 index 00000000000..64958814e3d --- /dev/null +++ b/src/mongo/replay/cursor_rewrite_hook.h @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/bson/bsonelement.h" +#include "mongo/replay/replay_command.h" +#include "mongo/replay/replay_hook_manager.h" +#include "mongo/stdx/unordered_map.h" + +namespace mongo { + +/** + * Hook informed of every request and response for a single session + * during a replay of a traffic recording, used to re-write + * cursor responses. + */ +class CursorRewriteHook : public ReplayObserver { +public: + void onRequest(ReplayCommand& command) override; + void onLiveResponse(const ReplayCommand& recordedResponse, + const BSONObj& liveResponse) override; + +private: + stdx::unordered_map _cursorMap; +}; + +} // namespace mongo diff --git a/src/mongo/replay/replay_command.cpp b/src/mongo/replay/replay_command.cpp index df10492261a..580c77f7507 100644 --- a/src/mongo/replay/replay_command.cpp +++ b/src/mongo/replay/replay_command.cpp @@ -85,13 +85,16 @@ std::string ReplayCommand::toString() const { } OpMsgRequest ReplayCommand::parseBody() const { - Message message; - // TODO: SERVER-107809 setData here copies the message unnecessarily. - // OpMsg should be changed to allow parsing from a "view" type. - message.setData(dbMsg, _packet.message.data(), _packet.message.dataLen()); - OpMsg::removeChecksum(&message); - // TODO: SERVER-109756 remove unused fields such as lsid. - return rpc::opMsgRequestFromAnyProtocol(message); + if (!_parsedRequest) { + Message message; + // TODO: SERVER-107809 setData here copies the message unnecessarily. + // OpMsg should be changed to allow parsing from a "view" type. + message.setData(dbMsg, _packet.message.data(), _packet.message.dataLen()); + OpMsg::removeChecksum(&message); + // TODO: SERVER-109756 remove unused fields such as lsid. + _parsedRequest = rpc::opMsgRequestFromAnyProtocol(message); + } + return *_parsedRequest; } Microseconds ReplayCommand::parseOffset() const { return _packet.offset; @@ -119,6 +122,11 @@ EventType ReplayCommand::getEventType() const { return _packet.eventType; } +void ReplayCommand::replaceBody(BSONObj body) { + parseBody(); + _parsedRequest->body = body; +} + bool ReplayCommand::isSessionStart() const { return _packet.eventType == EventType::kSessionStart; } diff --git a/src/mongo/replay/replay_command.h b/src/mongo/replay/replay_command.h index c407d295dd3..3b0d25fe642 100644 --- a/src/mongo/replay/replay_command.h +++ b/src/mongo/replay/replay_command.h @@ -76,6 +76,8 @@ public: EventType getEventType() const; + void replaceBody(BSONObj buf); + private: /** Extract the actual message body containing the actual bson command containing the query */ OpMsgRequest parseBody() const; @@ -93,6 +95,13 @@ private: TrafficReaderPacket _packet; + // During replay, it may be necessary to edit a message before executing it. + // In that case, a new message may be stored in this shared buffer, and _packet + // made to reference it. + SharedBuffer _ownedBody; + + // Lazily initialized on first use, to avoid repeated parsing from packet data. + mutable boost::optional _parsedRequest; }; std::pair extractOffsetAndSessionFromCommand(const ReplayCommand& command); diff --git a/src/mongo/replay/replay_hook_manager.cpp b/src/mongo/replay/replay_hook_manager.cpp index b9366fdc5fa..24476c14a07 100644 --- a/src/mongo/replay/replay_hook_manager.cpp +++ b/src/mongo/replay/replay_hook_manager.cpp @@ -41,6 +41,11 @@ ReplayObserverManager& ReplayObserverManager::get() { return mgr; } +void ReplayObserverManager::reset() { + _observers.clear(); + _sessionObserverFactories.clear(); +} + PerSessionObserverState ReplayObserverManager::makeSessionObservers() { std::vector> hooks; hooks.reserve(_sessionObserverFactories.size()); diff --git a/src/mongo/replay/replay_hook_manager.h b/src/mongo/replay/replay_hook_manager.h index a3d1c425d52..1664fdec762 100644 --- a/src/mongo/replay/replay_hook_manager.h +++ b/src/mongo/replay/replay_hook_manager.h @@ -86,6 +86,14 @@ public: void observeLiveResponse(const ReplayCommand& recordedResponse, const BSONObj& liveResponse); + /** + * Test-only method to clear registered observers. + * + * Not safe to use outside of unit tests which can guarantee no concurrent access + * from a recording. + */ + void reset(); + private: // Observers will be invoked for all events across all sessions, potentially // concurrently. diff --git a/src/mongo/replay/session_simulator_test.cpp b/src/mongo/replay/session_simulator_test.cpp index ad018fee787..c4108061cb9 100644 --- a/src/mongo/replay/session_simulator_test.cpp +++ b/src/mongo/replay/session_simulator_test.cpp @@ -336,6 +336,8 @@ TEST(SessionSimulatorTest, HooksInvoked) { sessionSimulator.run(); } + + ReplayObserverManager::get().reset(); } } // namespace mongo