SERVER-108238: [MongoR] Implement cursor rewrite (#48226)

GitOrigin-RevId: c1755bb00b8afcd21201ab41260f31eb07c172f8
This commit is contained in:
James H 2026-03-23 12:15:34 +00:00 committed by MongoDB Bot
parent d41ff17d44
commit d781b39b81
10 changed files with 203 additions and 18 deletions

View File

@ -8,7 +8,7 @@ import {
} from "jstests/noPassthrough/traffic_recording/traffic_replaying_lib.js";
function parseRecordedTraffic(recordingFilePath) {
const recordedTraffic = convertTrafficRecordingToBSON(recordingFilePath);
const recordedTraffic = convertTrafficRecordingToBSON(recordingFilePath, true);
const opTypes = {};
recordedTraffic.forEach((obj) => {
const opType = obj.opType;
@ -18,12 +18,11 @@ function parseRecordedTraffic(recordingFilePath) {
}
function recordAndParseOperations(recordingDirGlobal, customRecordingDir, workflowCallback) {
const {mongodInstance, coll, recordingFilePath} = recordOperations(
const {mongodInstance, coll, recordingFilePath, serverURI} = recordOperations(
recordingDirGlobal,
customRecordingDir,
workflowCallback,
);
const serverURI = `mongodb://${mongodInstance.host}`;
MongoRunner.stopMongod(mongodInstance, null, {user: "admin", pwd: "pass"});
@ -42,12 +41,14 @@ function runInstances(baseDir, customSubDir, workflowCallback) {
const defaultOperationsLambda = (dbContext) => {
const {testDB, coll} = dbContext;
assert.commandWorked(coll.insert({name: "foo biz bar"}));
for (let i = 0; i < 200; ++i) {
assert.commandWorked(coll.insert({name: "foo biz bar", i}));
}
assert.eq("foo biz bar", coll.findOne().name);
assert.commandWorked(coll.insert({name: "foo bar"}));
assert.eq("foo bar", coll.findOne({name: "foo bar"}).name);
assert.commandWorked(coll.deleteOne({}));
assert.eq(1, coll.aggregate().toArray().length);
assert.eq(200, coll.aggregate().toArray().length);
assert.commandWorked(coll.update({}, {}));
};
@ -95,10 +96,11 @@ removeFile(realDirectory);
// Recording
const initialResults = runInstances("traffic_recording_" + UUID().hex(), "recordings", defaultOperationsLambda);
assert.eq(initialResults.opTypes["serverStatus"], 1);
assert.eq(initialResults.opTypes["insert"], 2);
assert.eq(initialResults.opTypes["insert"], 201);
assert.eq(initialResults.opTypes["find"], 2);
assert.eq(initialResults.opTypes["delete"], 1);
assert.eq(initialResults.opTypes["aggregate"], 1);
assert.eq(initialResults.opTypes["getMore"], 1);
assert.eq(initialResults.opTypes["update"], 1);
assert.eq(initialResults.opTypes["sessionStart"], 1);
assert.eq(initialResults.opTypes["sessionEnd"], 1);
@ -118,14 +120,29 @@ const replayResults = runInstances("replayed_recording_" + UUID().hex(), "replay
// in order to compute the filepath, we issue a server status inside runInstances, this plus the
// one recorded will bring total count to 2.
assert.eq(replayResults.opTypes["serverStatus"], 2);
assert.eq(replayResults.opTypes["insert"], 2);
assert.eq(replayResults.opTypes["insert"], 201);
assert.eq(replayResults.opTypes["find"], 2);
assert.eq(replayResults.opTypes["delete"], 1);
assert.eq(replayResults.opTypes["aggregate"], 1);
assert.eq(initialResults.opTypes["getMore"], 1);
assert.eq(replayResults.opTypes["update"], 1);
assert.gte(initialResults.opTypes["sessionStart"], 1);
assert.eq(initialResults.opTypes["sessionEnd"], 1);
assert.gte(replayResults.opTypes["sessionStart"], 1);
assert.gte(replayResults.opTypes["sessionEnd"], 1);
// ======================================================================================== //
function getRecordedGetMore(recordedTraffic) {
for (let req of recordedTraffic) {
if (req.opType == "getMore") {
jsTest.log.debug(tojson(req.rawop.body));
return req.rawop.body["getMore"];
}
}
}
let origCursor = getRecordedGetMore(initialResults.recordedTraffic);
let replayCursor = getRecordedGetMore(replayResults.recordedTraffic);
assert.neq(origCursor, replayCursor);
cleanUpDirectory(initialResults.recordingDirGlobal);
cleanUpDirectory(replayResults.recordingDirGlobal);

View File

@ -40,7 +40,9 @@ export function recordOperations(recordingDirGlobal, customRecordingDir, opsToRe
assert.commandWorked(adminDB.runCommand({startTrafficRecording: 1, destination: customRecordingDir}));
const dbContext = {adminDB, testDB, coll, serverURI: `mongodb://${mongodInstance.host}`};
const serverURI = `mongodb://admin:pass@${mongodInstance.host}/admin`;
const dbContext = {adminDB, testDB, coll, serverURI};
opsToRecord(dbContext);
@ -49,5 +51,5 @@ export function recordOperations(recordingDirGlobal, customRecordingDir, opsToRe
assert.commandWorked(adminDB.runCommand({stopTrafficRecording: 1}));
return {mongodInstance, coll, recordingFilePath};
return {mongodInstance, coll, recordingFilePath, serverURI};
}

View File

@ -13,6 +13,7 @@ mongo_cc_library(
name = "mongor_lib",
srcs = [
"config_handler.cpp",
"cursor_rewrite_hook.cpp",
"performance_reporter.cpp",
"rawop_document.cpp",
"recording_reader.cpp",

View File

@ -0,0 +1,80 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/replay/cursor_rewrite_hook.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/logv2/log.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_hook_manager.h"
#include "mongo/util/assert_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
void CursorRewriteHook::onRequest(ReplayCommand& command) {
if (command.parseOpType() != "getMore") {
return;
}
auto body = command.fetchMsgRequest().body;
auto recCursor = body["getMore"].numberLong();
tassert(10823800, "getMore request has invalid cursor", recCursor);
auto liveCursor = _cursorMap.find(recCursor);
if (liveCursor == _cursorMap.end()) {
return;
}
auto newBody = body.addField(BSON("getMore" << liveCursor->second).firstElement());
Message message;
message.setData(dbMsg, newBody.objdata(), newBody.objsize());
OpMsg::removeChecksum(&message);
OpMsg::appendChecksum(&message);
LOGV2_DEBUG(10893014, 5, "getMore rewritten", "old"_attr = body, "new"_attr = newBody);
command.replaceBody(newBody);
}
void CursorRewriteHook::onLiveResponse(const ReplayCommand& recordedResponse,
const BSONObj& liveResponse) {
const auto& rec = recordedResponse.fetchMsgRequest().body;
auto recCursor = rec.getField("cursor");
auto liveCursor = liveResponse.getField("cursor");
if (recCursor && liveCursor) {
_cursorMap[recCursor["id"].numberLong()] = liveCursor["id"].numberLong();
}
}
static const bool _CursorRewriteHookRegistered =
(ReplayObserverManager::get().registerPerSessionObserver<CursorRewriteHook>(), true);
} // namespace mongo

View File

@ -0,0 +1,53 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/bson/bsonelement.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_hook_manager.h"
#include "mongo/stdx/unordered_map.h"
namespace mongo {
/**
* Hook informed of every request and response for a single session
* during a replay of a traffic recording, used to re-write
* cursor responses.
*/
class CursorRewriteHook : public ReplayObserver {
public:
void onRequest(ReplayCommand& command) override;
void onLiveResponse(const ReplayCommand& recordedResponse,
const BSONObj& liveResponse) override;
private:
stdx::unordered_map<int64_t, int64_t> _cursorMap;
};
} // namespace mongo

View File

@ -85,13 +85,16 @@ std::string ReplayCommand::toString() const {
}
OpMsgRequest ReplayCommand::parseBody() const {
Message message;
// TODO: SERVER-107809 setData here copies the message unnecessarily.
// OpMsg should be changed to allow parsing from a "view" type.
message.setData(dbMsg, _packet.message.data(), _packet.message.dataLen());
OpMsg::removeChecksum(&message);
// TODO: SERVER-109756 remove unused fields such as lsid.
return rpc::opMsgRequestFromAnyProtocol(message);
if (!_parsedRequest) {
Message message;
// TODO: SERVER-107809 setData here copies the message unnecessarily.
// OpMsg should be changed to allow parsing from a "view" type.
message.setData(dbMsg, _packet.message.data(), _packet.message.dataLen());
OpMsg::removeChecksum(&message);
// TODO: SERVER-109756 remove unused fields such as lsid.
_parsedRequest = rpc::opMsgRequestFromAnyProtocol(message);
}
return *_parsedRequest;
}
Microseconds ReplayCommand::parseOffset() const {
return _packet.offset;
@ -119,6 +122,11 @@ EventType ReplayCommand::getEventType() const {
return _packet.eventType;
}
void ReplayCommand::replaceBody(BSONObj body) {
parseBody();
_parsedRequest->body = body;
}
bool ReplayCommand::isSessionStart() const {
return _packet.eventType == EventType::kSessionStart;
}

View File

@ -76,6 +76,8 @@ public:
EventType getEventType() const;
void replaceBody(BSONObj buf);
private:
/** Extract the actual message body containing the actual bson command containing the query */
OpMsgRequest parseBody() const;
@ -93,6 +95,13 @@ private:
TrafficReaderPacket _packet;
// During replay, it may be necessary to edit a message before executing it.
// In that case, a new message may be stored in this shared buffer, and _packet
// made to reference it.
SharedBuffer _ownedBody;
// Lazily initialized on first use, to avoid repeated parsing from packet data.
mutable boost::optional<OpMsgRequest> _parsedRequest;
};
std::pair<Microseconds, uint64_t> extractOffsetAndSessionFromCommand(const ReplayCommand& command);

View File

@ -41,6 +41,11 @@ ReplayObserverManager& ReplayObserverManager::get() {
return mgr;
}
void ReplayObserverManager::reset() {
_observers.clear();
_sessionObserverFactories.clear();
}
PerSessionObserverState ReplayObserverManager::makeSessionObservers() {
std::vector<std::shared_ptr<ReplayObserver>> hooks;
hooks.reserve(_sessionObserverFactories.size());

View File

@ -86,6 +86,14 @@ public:
void observeLiveResponse(const ReplayCommand& recordedResponse, const BSONObj& liveResponse);
/**
* Test-only method to clear registered observers.
*
* Not safe to use outside of unit tests which can guarantee no concurrent access
* from a recording.
*/
void reset();
private:
// Observers will be invoked for all events across all sessions, potentially
// concurrently.

View File

@ -336,6 +336,8 @@ TEST(SessionSimulatorTest, HooksInvoked) {
sessionSimulator.run();
}
ReplayObserverManager::get().reset();
}
} // namespace mongo