SERVER-111191 Discard profiler entries upon LockTimeout [v8.2] (#44812)
GitOrigin-RevId: 9e8dfb52e4121ea2a4825d9dd7ded4b2377fe2c3
This commit is contained in:
parent
9ef9154b3c
commit
30e5202326
371
jstests/noPassthrough/query/profile/profile_abandoned_writes.js
Normal file
371
jstests/noPassthrough/query/profile/profile_abandoned_writes.js
Normal file
@ -0,0 +1,371 @@
|
||||
/**
|
||||
* Test profiling is automatically disabled when there are too many LockTimeout errors per second.
|
||||
*
|
||||
* Tests that the system automatically stops profiling if there are too many LockTimeout errors per
|
||||
* second when acquiring the system.profile lock. To ensure reproducibility, the
|
||||
* 'forceLockTimeoutForProfiler' failpoint is used to simulate the LockTimeout condition. To keep
|
||||
* the test brief, we also lower the internalProfilingMaxAbandonedWritesPerSecondPerDb threshold.
|
||||
*
|
||||
* When the per-second threshold is exceeded, the profiler is disabled by setting the profiling
|
||||
* level to 0. A note message is logged to the system.profile collection. To re-enable profiling,
|
||||
* the level must be explicitly set back to 1 or 2.
|
||||
*
|
||||
* @tags: [
|
||||
* ]
|
||||
*/
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {after, before, describe, it} from "jstests/libs/mochalite.js";
|
||||
|
||||
describe("Profile Abandoned Writes", function() {
|
||||
let conn;
|
||||
let adminDB;
|
||||
const maxAbandonedWritesPerSecond = 5;
|
||||
|
||||
before(function() {
|
||||
// Start mongod with low threshold for abandoned writes per second.
|
||||
conn = MongoRunner.runMongod({
|
||||
setParameter:
|
||||
{internalProfilingMaxAbandonedWritesPerSecondPerDb: maxAbandonedWritesPerSecond},
|
||||
});
|
||||
adminDB = conn.getDB("admin");
|
||||
});
|
||||
|
||||
after(function() {
|
||||
MongoRunner.stopMongod(conn);
|
||||
});
|
||||
|
||||
// Helper function to get serverStatus profiler metrics.
|
||||
function getProfilerStats() {
|
||||
const serverStatus = adminDB.runCommand({serverStatus: 1});
|
||||
return serverStatus.profiler;
|
||||
}
|
||||
|
||||
// Helper function to perform operations that trigger profiling.
|
||||
function performOperations(coll, count, comment) {
|
||||
for (let i = 0; i < count; i++) {
|
||||
assert.commandWorked(coll.insert({x: i, comment: comment + "-" + i}));
|
||||
}
|
||||
}
|
||||
|
||||
function doSomeProblematicProfiling(coll, count, comment) {
|
||||
// Enable profiling.
|
||||
assert.commandWorked(coll.getDB().setProfilingLevel(2));
|
||||
|
||||
// Enable the failpoint to force LockTimeout errors.
|
||||
const fp = configureFailPoint(adminDB, "forceLockTimeoutForProfiler");
|
||||
|
||||
// Perform operations that will hit LockTimeout errors.
|
||||
// We need to exceed maxAbandonedWritesPerSecond to trigger profiling disablement.
|
||||
performOperations(coll, count, comment);
|
||||
|
||||
// Disable the failpoint.
|
||||
fp.off();
|
||||
}
|
||||
|
||||
function assertNoteDocExists(db) {
|
||||
// Check for a note entry in system.profile indicating profiling was abandoned.
|
||||
// Find the note document.
|
||||
const noteDoc = db.system.profile.findOne({note: {$exists: true}});
|
||||
assert.neq(noteDoc, null, "Expected to find a note document");
|
||||
|
||||
// Verify structure.
|
||||
assert(noteDoc.hasOwnProperty("ts"), "Expected note document to have 'ts' field");
|
||||
assert(noteDoc.hasOwnProperty("note"), "Expected note document to have 'note' field");
|
||||
assert.eq(typeof noteDoc.note, "string", "Expected 'note' field to be a string");
|
||||
assert(
|
||||
noteDoc.hasOwnProperty("internalQueryGlobalProfilingLockDeadlineMs"),
|
||||
"Expected note document to have 'internalQueryGlobalProfilingLockDeadlineMs' field",
|
||||
);
|
||||
assert(
|
||||
noteDoc.hasOwnProperty("internalProfilingMaxAbandonedWritesPerSecondPerDb"),
|
||||
"Expected note document to have 'internalProfilingMaxAbandonedWritesPerSecondPerDb' field",
|
||||
);
|
||||
}
|
||||
|
||||
it("disables profiling automatically after exceeding per-second threshold", function() {
|
||||
const testDB = conn.getDB(jsTestName() + "_autoDisableTest");
|
||||
assert.commandWorked(testDB.dropDatabase());
|
||||
const coll = testDB.getCollection("testColl");
|
||||
|
||||
// Get initial serverStatus metrics.
|
||||
let stats = getProfilerStats();
|
||||
const initialAbandonedWrites = stats.totalAbandonedWrites;
|
||||
const initialDbsPastThreshold = stats.dbsPastThreshold;
|
||||
|
||||
doSomeProblematicProfiling(coll, 2 * maxAbandonedWritesPerSecond, "autoDisableTest");
|
||||
|
||||
// Verify that totalAbandonedWrites has increased.
|
||||
stats = getProfilerStats();
|
||||
const finalAbandonedWrites = stats.totalAbandonedWrites;
|
||||
assert.gte(
|
||||
finalAbandonedWrites - initialAbandonedWrites,
|
||||
maxAbandonedWritesPerSecond,
|
||||
`Expected totalAbandonedWrites to increase by at least the threshold. final: ${
|
||||
finalAbandonedWrites}, initial: ${initialAbandonedWrites}`,
|
||||
);
|
||||
|
||||
// Verify that dbsPastThreshold has incremented.
|
||||
const finalDbsPastThreshold = stats.dbsPastThreshold;
|
||||
assert.eq(finalDbsPastThreshold,
|
||||
initialDbsPastThreshold + 1,
|
||||
"Expected dbsPastThreshold to increment by 1");
|
||||
|
||||
assertNoteDocExists(testDB);
|
||||
|
||||
// Verify profiling level is now 0 (disabled).
|
||||
const profilingStatus = assert.commandWorked(testDB.runCommand({profile: -1}));
|
||||
assert.eq(
|
||||
profilingStatus.was, 0, "Expected profiling level to be 0 after exceeding threshold");
|
||||
|
||||
// Verify profiling is now disabled - additional operations should not be profiled.
|
||||
const countBefore = testDB.system.profile.count();
|
||||
performOperations(coll, 5, "autoDisableTest-after-disable");
|
||||
const countAfter = testDB.system.profile.count();
|
||||
assert.eq(countAfter,
|
||||
countBefore,
|
||||
"Expected no new profile entries after profiling was disabled");
|
||||
});
|
||||
|
||||
it("can be re-enabled after automatic disable", function() {
|
||||
const testDB = conn.getDB(jsTestName() + "_manualReenableTest");
|
||||
assert.commandWorked(testDB.dropDatabase());
|
||||
const coll = testDB.getCollection("testColl");
|
||||
|
||||
doSomeProblematicProfiling(
|
||||
coll, 2 * maxAbandonedWritesPerSecond, "manualReenableTest-disable");
|
||||
|
||||
// Verify profiling is disabled (level should be 0).
|
||||
let profilingStatus = testDB.runCommand({profile: -1});
|
||||
assert.eq(
|
||||
profilingStatus.was, 0, "Expected profiling level to be 0 after exceeding threshold");
|
||||
|
||||
let profileDocs = testDB.system.profile.find({note: {$exists: true}}).toArray();
|
||||
assert.gt(profileDocs.length, 0, "Expected to find note about profiling being disabled");
|
||||
|
||||
const countBefore = testDB.system.profile.count();
|
||||
performOperations(coll, 3, "manualReenableTest-verify-disabled");
|
||||
const countAfter = testDB.system.profile.count();
|
||||
assert.eq(countAfter, countBefore, "Expected profiling to be disabled");
|
||||
|
||||
// Re-enable profiling by setting the level back to 2.
|
||||
assert.commandWorked(testDB.setProfilingLevel(2));
|
||||
jsTest.log.info("Re-enabled profiling by setting level back to 2");
|
||||
|
||||
// Verify profiling level is back to 2.
|
||||
profilingStatus = assert.commandWorked(testDB.runCommand({profile: -1}));
|
||||
assert.eq(profilingStatus.was, 2, "Expected profiling level to be 2 after re-enabling");
|
||||
|
||||
// Now profiling should work again for normal operations.
|
||||
const countBeforeReEnable = testDB.system.profile.count();
|
||||
performOperations(coll, 3, "manualReenableTest-after-reenable");
|
||||
const countAfterReEnable = testDB.system.profile.count();
|
||||
assert.gt(
|
||||
countAfterReEnable,
|
||||
countBeforeReEnable,
|
||||
"Expected profiling to be re-enabled after setting level back to 2",
|
||||
);
|
||||
});
|
||||
|
||||
it("applies policy per database", function() {
|
||||
const firstHotDb = conn.getDB(jsTestName() + "_firstHotDb");
|
||||
const secondHotDb = conn.getDB(jsTestName() + "_secondHotDb");
|
||||
assert.commandWorked(firstHotDb.dropDatabase());
|
||||
assert.commandWorked(secondHotDb.dropDatabase());
|
||||
const firstHotColl = firstHotDb.getCollection("testColl");
|
||||
const secondHotColl = secondHotDb.getCollection("testColl");
|
||||
|
||||
// Enable profiling on both databases.
|
||||
assert.commandWorked(firstHotDb.setProfilingLevel(2));
|
||||
assert.commandWorked(secondHotDb.setProfilingLevel(2));
|
||||
|
||||
// Reset threshold to original value.
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand({
|
||||
setParameter: 1,
|
||||
internalProfilingMaxAbandonedWritesPerSecondPerDb: maxAbandonedWritesPerSecond,
|
||||
}),
|
||||
);
|
||||
|
||||
const stats1 = getProfilerStats();
|
||||
const initialDbsPastThreshold = stats1.dbsPastThreshold;
|
||||
|
||||
doSomeProblematicProfiling(
|
||||
firstHotColl, 2 * maxAbandonedWritesPerSecond, "test3-db1-disable");
|
||||
|
||||
// Verify profiling level is 0 for DB1.
|
||||
let profilingStatus = firstHotDb.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 0, "Expected profiling level to be 0 for DB1");
|
||||
assertNoteDocExists(firstHotDb);
|
||||
|
||||
// Verify profiling still works for DB2.
|
||||
profilingStatus = secondHotDb.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 2, "Expected profiling level to still be 2 for DB2");
|
||||
|
||||
const countBeforeDB2 = secondHotDb.system.profile.count();
|
||||
doSomeProblematicProfiling(secondHotColl, 3, "test3-db2-still-works");
|
||||
const countAfterDB2 = secondHotDb.system.profile.count();
|
||||
assert.gt(countAfterDB2, countBeforeDB2, "Expected profiling to still work in DB2");
|
||||
|
||||
// Verify dbsPastThreshold incremented by 1 (only for DB1).
|
||||
const stats2 = getProfilerStats();
|
||||
assert.eq(stats2.dbsPastThreshold,
|
||||
initialDbsPastThreshold + 1,
|
||||
"Expected dbsPastThreshold to increment by 1");
|
||||
|
||||
// Now disable profiling for DB2 as well.
|
||||
doSomeProblematicProfiling(
|
||||
secondHotColl, 2 * maxAbandonedWritesPerSecond, "test3-db2-disable");
|
||||
|
||||
// Verify profiling level is 0 for DB2.
|
||||
profilingStatus = secondHotDb.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 0, "Expected profiling level to be 0 for DB2");
|
||||
assertNoteDocExists(secondHotDb);
|
||||
|
||||
// Verify dbsPastThreshold incremented again (now for DB2).
|
||||
const stats3 = getProfilerStats();
|
||||
assert.eq(
|
||||
stats3.dbsPastThreshold,
|
||||
initialDbsPastThreshold + 2,
|
||||
"Expected dbsPastThreshold to increment by 2 total",
|
||||
);
|
||||
});
|
||||
|
||||
it("can be disabled multiple times", function() {
|
||||
const testDB = conn.getDB(jsTestName() + "_test4");
|
||||
assert.commandWorked(testDB.dropDatabase());
|
||||
const coll = testDB.getCollection("testColl");
|
||||
|
||||
// Reset threshold.
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand({
|
||||
setParameter: 1,
|
||||
internalProfilingMaxAbandonedWritesPerSecondPerDb: maxAbandonedWritesPerSecond,
|
||||
}),
|
||||
);
|
||||
|
||||
const stats1 = getProfilerStats();
|
||||
const initialDbsPastThreshold = stats1.dbsPastThreshold;
|
||||
|
||||
doSomeProblematicProfiling(coll, 2 * maxAbandonedWritesPerSecond, "test4-first-disable");
|
||||
|
||||
// Verify profiling level is 0.
|
||||
let profilingStatus = testDB.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 0, "Expected profiling level to be 0 after first disable");
|
||||
|
||||
// Verify dbsPastThreshold incremented.
|
||||
const stats2 = getProfilerStats();
|
||||
assert.eq(stats2.dbsPastThreshold,
|
||||
initialDbsPastThreshold + 1,
|
||||
"Expected dbsPastThreshold to increment");
|
||||
|
||||
// Re-enable profiling by setting the level back to 2.
|
||||
assert.commandWorked(testDB.setProfilingLevel(2));
|
||||
profilingStatus = testDB.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 2, "Expected profiling level to be 2 after re-enabling");
|
||||
|
||||
// Verify profiling works again.
|
||||
const countBefore = testDB.system.profile.count({note: {$exists: false}});
|
||||
doSomeProblematicProfiling(coll, 3, "test4-verify-reenabled");
|
||||
const countAfter = testDB.system.profile.count({note: {$exists: false}});
|
||||
assert.gt(countAfter, countBefore, "Expected profiling to be re-enabled");
|
||||
|
||||
// Now exceed the threshold again to disable profiling a second time.
|
||||
doSomeProblematicProfiling(coll, 2 * maxAbandonedWritesPerSecond, "test4-second-disable");
|
||||
|
||||
// Verify profiling level is 0 again.
|
||||
profilingStatus = testDB.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 0, "Expected profiling level to be 0 after second disable");
|
||||
|
||||
// Verify we can find a note about the second disable.
|
||||
const allNoteDocs = testDB.system.profile.find({note: {$exists: true}}).toArray();
|
||||
assert.gte(allNoteDocs.length, 2, "Expected at least 2 note documents after two disables");
|
||||
});
|
||||
|
||||
it("validates parameters correctly", function() {
|
||||
// Test internalProfilingMaxAbandonedWritesPerSecondPerDb.
|
||||
let result = adminDB.runCommand(
|
||||
{setParameter: 1, internalProfilingMaxAbandonedWritesPerSecondPerDb: -1});
|
||||
assert.commandFailed(
|
||||
result,
|
||||
"Expected setting internalProfilingMaxAbandonedWritesPerSecondPerDb to -1 to fail",
|
||||
);
|
||||
|
||||
// Test internalQueryGlobalProfilingLockDeadlineMs.
|
||||
result =
|
||||
adminDB.runCommand({setParameter: 1, internalQueryGlobalProfilingLockDeadlineMs: -1});
|
||||
assert.commandFailed(
|
||||
result, "Expected setting internalQueryGlobalProfilingLockDeadlineMs to -1 to fail");
|
||||
|
||||
// Test valid boundary values (0 is allowed).
|
||||
result = adminDB.runCommand(
|
||||
{setParameter: 1, internalProfilingMaxAbandonedWritesPerSecondPerDb: 0});
|
||||
assert.commandWorked(
|
||||
result,
|
||||
"Expected setting internalProfilingMaxAbandonedWritesPerSecondPerDb to 0 to succeed",
|
||||
);
|
||||
|
||||
result =
|
||||
adminDB.runCommand({setParameter: 1, internalQueryGlobalProfilingLockDeadlineMs: 0});
|
||||
assert.commandWorked(
|
||||
result, "Expected setting internalQueryGlobalProfilingLockDeadlineMs to 0 to succeed");
|
||||
|
||||
// Reset to reasonable values.
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand({
|
||||
setParameter: 1,
|
||||
internalProfilingMaxAbandonedWritesPerSecondPerDb: maxAbandonedWritesPerSecond,
|
||||
}),
|
||||
);
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand({setParameter: 1, internalQueryGlobalProfilingLockDeadlineMs: 1}));
|
||||
});
|
||||
|
||||
it("disables profiling immediately with zero tolerance threshold", function() {
|
||||
const testDB = conn.getDB(jsTestName() + "_zeroThreshold");
|
||||
assert.commandWorked(testDB.dropDatabase());
|
||||
const coll = testDB.getCollection("testColl");
|
||||
|
||||
// Enable profiling.
|
||||
assert.commandWorked(testDB.setProfilingLevel(2));
|
||||
|
||||
// Set threshold to 0 (zero tolerance for abandoned writes per second).
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand(
|
||||
{setParameter: 1, internalProfilingMaxAbandonedWritesPerSecondPerDb: 0}),
|
||||
);
|
||||
|
||||
const stats1 = getProfilerStats();
|
||||
const initialDbsPastThreshold = stats1.dbsPastThreshold;
|
||||
|
||||
// First, perform a non-problematic write (should not disable profiling, even with a zero
|
||||
// threshold).
|
||||
assert.commandWorked(coll.insert({x: 1}));
|
||||
// Profiling level should still be 2 as no problematic writes have occurred.
|
||||
let profilingStatusBefore = testDB.runCommand({profile: -1});
|
||||
assert.eq(profilingStatusBefore.was, 2);
|
||||
|
||||
// Do a single problematic write - even one failure should disable profiling.
|
||||
doSomeProblematicProfiling(coll, 1, "test6-threshold-zero");
|
||||
|
||||
// Verify profiling level is 0.
|
||||
let profilingStatus = testDB.runCommand({profile: -1});
|
||||
assert.eq(profilingStatus.was, 0, "Expected profiling level to be 0 with threshold of 0");
|
||||
|
||||
// Verify profiling is disabled.
|
||||
const stats2 = getProfilerStats();
|
||||
assert.eq(
|
||||
stats2.dbsPastThreshold,
|
||||
initialDbsPastThreshold + 1,
|
||||
"Expected dbsPastThreshold to increment with threshold of 0",
|
||||
);
|
||||
assertNoteDocExists(testDB);
|
||||
|
||||
// Reset threshold.
|
||||
assert.commandWorked(
|
||||
adminDB.runCommand({
|
||||
setParameter: 1,
|
||||
internalProfilingMaxAbandonedWritesPerSecondPerDb: maxAbandonedWritesPerSecond,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@ -1127,6 +1127,7 @@ mongo_cc_library(
|
||||
"//src/mongo/db/catalog:collection_options",
|
||||
"//src/mongo/db/collection_crud",
|
||||
"//src/mongo/db/concurrency:exception_util",
|
||||
"//src/mongo/db/query/util:throughput_gauge",
|
||||
"//src/mongo/util/concurrency:spin_lock",
|
||||
],
|
||||
)
|
||||
|
||||
@ -49,6 +49,9 @@
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/profile_settings.h"
|
||||
#include "mongo/db/query/util/deferred.h"
|
||||
#include "mongo/db/query/util/throughput_gauge.h"
|
||||
#include "mongo/db/repl/oplog.h"
|
||||
#include "mongo/db/repl/read_concern_args.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
@ -59,13 +62,15 @@
|
||||
#include "mongo/rpc/metadata/client_metadata.h"
|
||||
#include "mongo/s/shard_version.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
#include "mongo/util/concurrent_shared_values_map.h"
|
||||
#include "mongo/util/fail_point.h"
|
||||
#include "mongo/util/represent_as.h"
|
||||
#include "mongo/util/scopeguard.h"
|
||||
#include "mongo/util/str.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <ostream>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
@ -79,9 +84,43 @@ namespace mongo::profile_collection {
|
||||
|
||||
namespace {
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(forceLockTimeoutForProfiler);
|
||||
|
||||
AtomicWord<int64_t> profilerWritesTotal{0};
|
||||
AtomicWord<int64_t> profilerWritesActive{0};
|
||||
|
||||
// Under heavy load we will choose to abandon and drop profile writes to preserve availability.
|
||||
// The observability tool shouldn't cause an availability problem. This metric serves to capture
|
||||
// when this is happening. This mechanism operates on a db scope. One db could be abandoning
|
||||
// writes to the point where the profiler is entirely disabled, and another could be operating
|
||||
// smoothly.
|
||||
struct AbandonedWriteMetrics {
|
||||
ThroughputGauge throughputGauge;
|
||||
AtomicWord<Date_t> tsDisabled;
|
||||
};
|
||||
|
||||
ConcurrentSharedValuesMap<DatabaseName, AbandonedWriteMetrics> profilerAbandonmentMetrics;
|
||||
|
||||
// Track some overall counters to report in serverStatus. Reporting a map by dbName is potentially
|
||||
// too large for serverStatus.
|
||||
AtomicWord<int64_t> profilerWritesAbandondedGlobally{0};
|
||||
|
||||
// Please note that this counter will not ever reset/decrease, but writes to the profiler can be
|
||||
// re-activated by raising the cap. If the cap is raised and then hit again, this counter will
|
||||
// double-increment for the same db.
|
||||
AtomicWord<int64_t> dbsPastThreshold{0};
|
||||
|
||||
static const auto profilerDisabledWarningString =
|
||||
"The profiler in this db has been automatically disabled due to server load. This tool is "
|
||||
"known to have a high overhead and can cause performance problems if turned up too high. On a "
|
||||
"per-db basis, the system will watch for lock acquisition timeouts while attempting to acquire "
|
||||
"a lock for profiling purposes. The threshold for this timeout is controlled by the server "
|
||||
"parameter 'internalQueryGlobalProfilingLockDeadlineMs.' If there are more timeouts than the "
|
||||
"configured threshold given by 'internalProfilingMaxAbandonedWritesPerSecondPerDb', then all "
|
||||
"future profile writes are disabled by setting the profile level to 0 for this db. It is "
|
||||
"recommended that future attempts to profile use a lower sample rate to avoid an outsized "
|
||||
"impact.";
|
||||
|
||||
class ProfilerSection : public ServerStatusSection {
|
||||
public:
|
||||
using ServerStatusSection::ServerStatusSection;
|
||||
@ -97,18 +136,19 @@ public:
|
||||
BSONObjBuilder bob;
|
||||
bob.append("totalWrites", profilerWritesTotal.loadRelaxed());
|
||||
bob.append("activeWriters", profilerWritesActive.loadRelaxed());
|
||||
bob.append("totalAbandonedWrites", profilerWritesAbandondedGlobally.loadRelaxed());
|
||||
bob.append("dbsPastThreshold", dbsPastThreshold.loadRelaxed());
|
||||
return bob.obj();
|
||||
}
|
||||
};
|
||||
|
||||
auto& profilerSection = *ServerStatusSectionBuilder<ProfilerSection>("profiler").forShard();
|
||||
} // namespace
|
||||
|
||||
void profile(OperationContext* opCtx, NetworkOp op) {
|
||||
auto buildProfileObject(auto opCtx) {
|
||||
// Initialize with 1kb at start in order to avoid realloc later
|
||||
BufBuilder profileBufBuilder(1024);
|
||||
|
||||
BSONObjBuilder b(profileBufBuilder);
|
||||
BSONObjBuilder profileObjBuilder(profileBufBuilder);
|
||||
|
||||
{
|
||||
auto curOp = CurOp::get(opCtx);
|
||||
@ -124,95 +164,226 @@ void profile(OperationContext* opCtx, NetworkOp op) {
|
||||
storageMetrics,
|
||||
curOp->getPrepareReadConflicts(),
|
||||
false /*omitCommand*/,
|
||||
b);
|
||||
profileObjBuilder);
|
||||
}
|
||||
|
||||
b.appendDate("ts", Date_t::now());
|
||||
b.append("client", opCtx->getClient()->clientAddress());
|
||||
profileObjBuilder.appendDate("ts", Date_t::now());
|
||||
profileObjBuilder.append("client", opCtx->getClient()->clientAddress());
|
||||
|
||||
if (auto clientMetadata = ClientMetadata::get(opCtx->getClient())) {
|
||||
auto appName = clientMetadata->getApplicationName();
|
||||
if (!appName.empty()) {
|
||||
b.append("appName", appName);
|
||||
profileObjBuilder.append("appName", appName);
|
||||
}
|
||||
}
|
||||
|
||||
AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient());
|
||||
OpDebug::appendUserInfo(*CurOp::get(opCtx), b, authSession);
|
||||
OpDebug::appendUserInfo(*CurOp::get(opCtx), profileObjBuilder, authSession);
|
||||
return profileObjBuilder.done().redact(BSONObj::RedactLevel::sensitiveOnly);
|
||||
}
|
||||
|
||||
const BSONObj p = b.done().redact(BSONObj::RedactLevel::sensitiveOnly);
|
||||
BSONObj encodeProfileSettings(const ProfileSettings& dbProfileSettings) {
|
||||
BSONObjBuilder settingsBuilder;
|
||||
settingsBuilder.append("level", dbProfileSettings.level);
|
||||
if (dbProfileSettings.filter) {
|
||||
settingsBuilder.append("filter", dbProfileSettings.filter->serialize());
|
||||
} else {
|
||||
settingsBuilder.append("filter", "unset"_sd);
|
||||
}
|
||||
|
||||
const auto ns = CurOp::get(opCtx)->getNSS();
|
||||
return settingsBuilder.obj();
|
||||
}
|
||||
|
||||
try {
|
||||
// We create a new opCtx so that we aren't interrupted by having the original operation
|
||||
// killed or timed out. Those are the case we want to have profiling data.
|
||||
auto newClient = opCtx->getServiceContext()
|
||||
->getService(ClusterRole::ShardServer)
|
||||
->makeClient("profiling");
|
||||
auto newCtx = newClient->makeOperationContext();
|
||||
// Type tag to indicate at the call site that we want to opt out of a lock deadline.
|
||||
struct NoTimeoutTag {};
|
||||
|
||||
// We swap the lockers as that way we preserve locks held in transactions and any other
|
||||
// options set for the locker like maxLockTimeout.
|
||||
auto oldLocker = shard_role_details::swapLocker(
|
||||
opCtx, std::make_unique<Locker>(opCtx->getServiceContext()));
|
||||
auto emptyLocker = shard_role_details::swapLocker(newCtx.get(), std::move(oldLocker));
|
||||
ON_BLOCK_EXIT([&] {
|
||||
auto oldCtxLocker =
|
||||
shard_role_details::swapLocker(newCtx.get(), std::move(emptyLocker));
|
||||
shard_role_details::swapLocker(opCtx, std::move(oldCtxLocker));
|
||||
});
|
||||
AlternativeClientRegion acr(newClient);
|
||||
const auto dbProfilingNS = NamespaceString::makeSystemDotProfileNamespace(ns.dbName());
|
||||
void doProfile(auto opCtx,
|
||||
const auto& nss,
|
||||
const BSONObj& profileObj,
|
||||
std::variant<Milliseconds, NoTimeoutTag> lockTimeout) {
|
||||
// We create a new opCtx so that we aren't interrupted by having the original operation
|
||||
// killed or timed out. Those are the case we want to have profiling data.
|
||||
auto newClient =
|
||||
opCtx->getServiceContext()->getService(ClusterRole::ShardServer)->makeClient("profiling");
|
||||
auto newCtx = newClient->makeOperationContext();
|
||||
|
||||
profilerWritesActive.fetchAndAddRelaxed(1);
|
||||
ON_BLOCK_EXIT([&] { profilerWritesActive.fetchAndSubtractRelaxed(1); });
|
||||
// We swap the lockers as that way we preserve locks held in transactions and any other
|
||||
// options set for the locker like maxLockTimeout.
|
||||
auto oldLocker =
|
||||
shard_role_details::swapLocker(opCtx, std::make_unique<Locker>(opCtx->getServiceContext()));
|
||||
auto emptyLocker = shard_role_details::swapLocker(newCtx.get(), std::move(oldLocker));
|
||||
ON_BLOCK_EXIT([&] {
|
||||
auto oldCtxLocker = shard_role_details::swapLocker(newCtx.get(), std::move(emptyLocker));
|
||||
shard_role_details::swapLocker(opCtx, std::move(oldCtxLocker));
|
||||
});
|
||||
AlternativeClientRegion acr(newClient);
|
||||
const auto dbProfilingNS = NamespaceString::makeSystemDotProfileNamespace(nss.dbName());
|
||||
|
||||
boost::optional<CollectionAcquisition> profileCollection;
|
||||
while (true) {
|
||||
profileCollection.emplace(
|
||||
acquireCollection(newCtx.get(),
|
||||
CollectionAcquisitionRequest(
|
||||
dbProfilingNS,
|
||||
PlacementConcern{boost::none, ShardVersion::UNSHARDED()},
|
||||
repl::ReadConcernArgs::get(newCtx.get()),
|
||||
AcquisitionPrerequisites::kUnreplicatedWrite),
|
||||
MODE_IX));
|
||||
profilerWritesActive.fetchAndAddRelaxed(1);
|
||||
ON_BLOCK_EXIT([&] { profilerWritesActive.fetchAndSubtractRelaxed(1); });
|
||||
|
||||
Database* const db =
|
||||
DatabaseHolder::get(newCtx.get())->getDb(newCtx.get(), dbProfilingNS.dbName());
|
||||
if (!db) {
|
||||
// Database disappeared.
|
||||
LOGV2(
|
||||
20700, "note: not profiling because db went away for namespace", logAttrs(ns));
|
||||
return;
|
||||
}
|
||||
boost::optional<CollectionAcquisition> profileCollection;
|
||||
while (true) {
|
||||
const auto deadline =
|
||||
std::visit(OverloadedVisitor{[&](const NoTimeoutTag&) { return Date_t::max(); },
|
||||
[&](const Milliseconds& millis) {
|
||||
return Date_t::now() + millis;
|
||||
}},
|
||||
lockTimeout);
|
||||
|
||||
if (profileCollection->exists()) {
|
||||
break;
|
||||
}
|
||||
|
||||
uassertStatusOK(createProfileCollection(newCtx.get(), db));
|
||||
profileCollection.reset();
|
||||
profileCollection.emplace(acquireCollection(
|
||||
newCtx.get(),
|
||||
CollectionAcquisitionRequest(dbProfilingNS,
|
||||
PlacementConcern{boost::none, ShardVersion::UNSHARDED()},
|
||||
repl::ReadConcernArgs::get(newCtx.get()),
|
||||
AcquisitionPrerequisites::kUnreplicatedWrite,
|
||||
deadline),
|
||||
MODE_IX));
|
||||
if (MONGO_unlikely(forceLockTimeoutForProfiler.shouldFail()) &&
|
||||
!std::holds_alternative<NoTimeoutTag>(lockTimeout)) {
|
||||
uasserted(ErrorCodes::LockTimeout,
|
||||
str::stream() << "forcing LockTimeout based on 'forceLockTimeoutForProfiler' "
|
||||
"fail point. profileObj="
|
||||
<< profileObj);
|
||||
}
|
||||
|
||||
invariant(profileCollection && profileCollection->exists());
|
||||
Database* const db =
|
||||
DatabaseHolder::get(newCtx.get())->getDb(newCtx.get(), dbProfilingNS.dbName());
|
||||
if (!db) {
|
||||
// Database disappeared.
|
||||
LOGV2(20700, "note: not profiling because db went away for namespace", logAttrs(nss));
|
||||
return;
|
||||
}
|
||||
|
||||
WriteUnitOfWork wuow(newCtx.get());
|
||||
OpDebug* const nullOpDebug = nullptr;
|
||||
uassertStatusOK(collection_internal::insertDocument(newCtx.get(),
|
||||
profileCollection->getCollectionPtr(),
|
||||
InsertStatement(p),
|
||||
nullOpDebug,
|
||||
false));
|
||||
wuow.commit();
|
||||
profilerWritesTotal.fetchAndAddRelaxed(1);
|
||||
if (profileCollection->exists()) {
|
||||
break;
|
||||
}
|
||||
|
||||
uassertStatusOK(createProfileCollection(newCtx.get(), db));
|
||||
profileCollection.reset();
|
||||
}
|
||||
|
||||
invariant(profileCollection && profileCollection->exists());
|
||||
|
||||
WriteUnitOfWork wuow(newCtx.get());
|
||||
OpDebug* const nullOpDebug = nullptr;
|
||||
uassertStatusOK(collection_internal::insertDocument(newCtx.get(),
|
||||
profileCollection->getCollectionPtr(),
|
||||
InsertStatement(profileObj),
|
||||
nullOpDebug,
|
||||
false));
|
||||
wuow.commit();
|
||||
profilerWritesTotal.fetchAndAddRelaxed(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this abandoned write should be logged. This abandonment can happen a lot under
|
||||
* load. Let's log when this happens, but not every time.
|
||||
*/
|
||||
bool noteThereWasAnAbandonedWrite(auto opCtx, const auto& abandonmentMetrics) {
|
||||
abandonmentMetrics->throughputGauge.recordEvent(Date_t::now());
|
||||
profilerWritesAbandondedGlobally.fetchAndAddRelaxed(1);
|
||||
static Rarely sampler;
|
||||
if (sampler.tick()) {
|
||||
// Every once and a while (Rarely's frequency), log the event.
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
BSONObj metricsToBson(auto nAbandonedInLastSecond, Date_t tsDisabled) {
|
||||
BSONObjBuilder metricsObjBuilder;
|
||||
metricsObjBuilder.append("nAbandonedInLastSecond", nAbandonedInLastSecond);
|
||||
if (tsDisabled != Date_t::min()) {
|
||||
metricsObjBuilder.append("fullyDisabledAt", tsDisabled);
|
||||
}
|
||||
return metricsObjBuilder.obj();
|
||||
}
|
||||
|
||||
void disableProblematicProfiling(auto opCtx,
|
||||
const auto& nss,
|
||||
const Date_t tsDisabled,
|
||||
const auto& abandonmentMetrics,
|
||||
const auto& nAbandonedInLastSecond) {
|
||||
// Set profiling level to 0 to prevent future writes.
|
||||
auto& dbProfileSettings = DatabaseProfileSettings::get(opCtx->getServiceContext());
|
||||
ProfileSettings oldSettings{dbProfileSettings.getDatabaseProfileSettings(nss.dbName())};
|
||||
ProfileSettings newSettings{oldSettings};
|
||||
newSettings.level = 0;
|
||||
dbProfileSettings.setDatabaseProfileSettings(nss.dbName(), newSettings);
|
||||
|
||||
const auto maxAbandonedWrites = internalProfilingMaxAbandonedWritesPerSecondPerDb.loadRelaxed();
|
||||
|
||||
const auto metricsBson = metricsToBson(nAbandonedInLastSecond, tsDisabled);
|
||||
|
||||
LOGV2_WARNING(11119100,
|
||||
"Abandoned too many profile writes. In a further attempt to maintain "
|
||||
"performance, profiling is disabled for this db until settings are manually "
|
||||
"updated. Profile settings changed.",
|
||||
"db"_attr = nss.dbName(),
|
||||
"oldProfileSettings"_attr = encodeProfileSettings(oldSettings),
|
||||
"newProfileSettings"_attr = encodeProfileSettings(newSettings),
|
||||
"abandonmentMetrics"_attr = metricsBson,
|
||||
"maxAbandonedWritesPerSecond"_attr = maxAbandonedWrites);
|
||||
dbsPastThreshold.fetchAndAdd(1);
|
||||
|
||||
auto noteToStoreInProfile =
|
||||
BSON("ts" << Date_t::now() << "note" << profilerDisabledWarningString
|
||||
<< "internalQueryGlobalProfilingLockDeadlineMs"
|
||||
<< internalQueryGlobalProfilingLockDeadlineMs.loadRelaxed()
|
||||
<< "internalProfilingMaxAbandonedWritesPerSecondPerDb" << maxAbandonedWrites
|
||||
<< "slowms" << serverGlobalParams.slowMS.loadRelaxed() << "abandonmentMetrics"
|
||||
<< metricsBson << "profileSettings"
|
||||
<< BSON("was" << encodeProfileSettings(oldSettings) << "new"
|
||||
<< encodeProfileSettings(newSettings)));
|
||||
try {
|
||||
doProfile(opCtx, nss, noteToStoreInProfile, NoTimeoutTag{});
|
||||
} catch (const AssertionException& assertionEx) {
|
||||
LOGV2_WARNING(20703,
|
||||
"Caught Assertion while trying to profile operation",
|
||||
"operation"_attr = networkOpToString(op),
|
||||
logAttrs(ns),
|
||||
"assertion"_attr = redact(assertionEx));
|
||||
LOGV2_WARNING(11119104,
|
||||
"Caught Assertion while trying to write down decision to disable profiler",
|
||||
logAttrs(nss),
|
||||
"assertion"_attr = redact(assertionEx),
|
||||
"code"_attr = assertionEx.code());
|
||||
}
|
||||
}
|
||||
|
||||
bool profilingHasBecomeProblematic(OperationContext* opCtx,
|
||||
Date_t now,
|
||||
const auto& nAbandonedInLastSecond) {
|
||||
return nAbandonedInLastSecond > internalProfilingMaxAbandonedWritesPerSecondPerDb.loadRelaxed();
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void profile(OperationContext* opCtx, NetworkOp op) {
|
||||
const auto nss = CurOp::get(opCtx)->getNSS();
|
||||
auto abandonmentMetrics = profilerAbandonmentMetrics.getOrEmplace(nss.dbName());
|
||||
|
||||
try {
|
||||
doProfile(opCtx,
|
||||
nss,
|
||||
buildProfileObject(opCtx),
|
||||
Milliseconds(internalQueryGlobalProfilingLockDeadlineMs.loadRelaxed()));
|
||||
} catch (const AssertionException& assertionEx) {
|
||||
bool shouldLog = true;
|
||||
if (assertionEx.code() == ErrorCodes::LockTimeout) {
|
||||
shouldLog = noteThereWasAnAbandonedWrite(opCtx, abandonmentMetrics);
|
||||
}
|
||||
if (shouldLog) {
|
||||
LOGV2_WARNING(20703,
|
||||
"Caught Assertion while trying to profile operation",
|
||||
"operation"_attr = networkOpToString(op),
|
||||
logAttrs(nss),
|
||||
"assertion"_attr = redact(assertionEx),
|
||||
"code"_attr = assertionEx.code());
|
||||
}
|
||||
}
|
||||
|
||||
auto now = opCtx->fastClockSource().now();
|
||||
const auto nAbandonedInLastSecond =
|
||||
abandonmentMetrics->throughputGauge.nEventsInPreviousSecond(now);
|
||||
if (profilingHasBecomeProblematic(opCtx, now, nAbandonedInLastSecond)) {
|
||||
|
||||
disableProblematicProfiling(opCtx, nss, now, abandonmentMetrics, nAbandonedInLastSecond);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1320,6 +1320,32 @@ server_parameters:
|
||||
default: false
|
||||
redact: false
|
||||
|
||||
internalQueryGlobalProfilingLockDeadlineMs:
|
||||
description: >-
|
||||
A number of milliseconds to wait for a lock acquisition to perform writes to the
|
||||
system.profile collection.
|
||||
set_at: [startup, runtime]
|
||||
cpp_varname: internalQueryGlobalProfilingLockDeadlineMs
|
||||
cpp_vartype: AtomicWord<int>
|
||||
default: 1
|
||||
redact: false
|
||||
validator:
|
||||
gte: 0
|
||||
|
||||
internalProfilingMaxAbandonedWritesPerSecondPerDb:
|
||||
description: >-
|
||||
If there are at least this many lock timeouts when trying to profile, we will skip all
|
||||
future profiling for a given database. Lasts until profiling is manually re-enabled. The
|
||||
value can be zero, in which case even a single lock timeout will disable profiling for
|
||||
the db.
|
||||
set_at: [startup, runtime]
|
||||
cpp_varname: internalProfilingMaxAbandonedWritesPerSecondPerDb
|
||||
cpp_vartype: AtomicWord<int>
|
||||
default: 1000
|
||||
redact: false
|
||||
validator:
|
||||
gte: 0
|
||||
|
||||
internalQueryAggMulticastTimeoutMS:
|
||||
description: "Timeout in MS for requests to shard servers when aggregations are sent to all shard servers"
|
||||
set_at: [startup]
|
||||
|
||||
@ -51,6 +51,18 @@ mongo_cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "throughput_gauge",
|
||||
srcs = [
|
||||
],
|
||||
hdrs = [
|
||||
"throughput_gauge.h",
|
||||
],
|
||||
deps = [
|
||||
"//src/mongo:base",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "rank_fusion_util",
|
||||
srcs = [
|
||||
@ -85,12 +97,15 @@ mongo_cc_unit_test(
|
||||
"jparse_util_test.cpp",
|
||||
"memory_util_test.cpp",
|
||||
"string_util_test.cpp",
|
||||
"throughput_gauge_test.cpp",
|
||||
],
|
||||
tags = [
|
||||
"mongo_unittest_third_group",
|
||||
],
|
||||
deps = [
|
||||
":jparse_util",
|
||||
":throughput_gauge",
|
||||
"//src/mongo/db:query_exec",
|
||||
"//src/mongo/util:clock_source_mock",
|
||||
],
|
||||
)
|
||||
|
||||
77
src/mongo/db/query/util/throughput_gauge.h
Normal file
77
src/mongo/db/query/util/throughput_gauge.h
Normal file
@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/util/concurrency/with_lock.h"
|
||||
#include "mongo/util/represent_as.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* This class will help record the ops/second of the last 1 second. It is thread safe.
|
||||
*/
|
||||
class ThroughputGauge {
|
||||
public:
|
||||
/**
|
||||
* Records an event at a timestamp.
|
||||
* Removes all events older than 1 second.
|
||||
*/
|
||||
void recordEvent(Date_t ts) {
|
||||
std::lock_guard lk(_mutex);
|
||||
_eventTimes.push(ts);
|
||||
expireOldEvents(lk, ts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of stored events in the previous second.
|
||||
* Removes all events older than 1 second.
|
||||
*/
|
||||
int64_t nEventsInPreviousSecond(Date_t now) {
|
||||
std::lock_guard lk(_mutex);
|
||||
expireOldEvents(lk, now);
|
||||
return representAs<int64_t>(_eventTimes.size())
|
||||
.value_or(std::numeric_limits<int64_t>::max());
|
||||
}
|
||||
|
||||
private:
|
||||
void expireOldEvents(const WithLock& lk, Date_t now) {
|
||||
while (!_eventTimes.empty() && (now - _eventTimes.top() > Seconds(1))) {
|
||||
_eventTimes.pop();
|
||||
}
|
||||
}
|
||||
|
||||
mutable std::mutex _mutex;
|
||||
std::priority_queue<Date_t, std::vector<Date_t>, std::greater<Date_t>> _eventTimes{};
|
||||
};
|
||||
} // namespace mongo
|
||||
376
src/mongo/db/query/util/throughput_gauge_test.cpp
Normal file
376
src/mongo/db/query/util/throughput_gauge_test.cpp
Normal file
@ -0,0 +1,376 @@
|
||||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/query/util/throughput_gauge.h"
|
||||
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/stdx/thread.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/clock_source_mock.h"
|
||||
|
||||
#include <vector>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
/**
|
||||
* Test fixture that provides a mock clock source for testing timing-dependent behavior of
|
||||
* ThroughputGauge.
|
||||
*/
|
||||
class ThroughputGaugeTest : public unittest::Test {
|
||||
public:
|
||||
void setUp() override {
|
||||
_mockClock = std::make_shared<ClockSourceMock>();
|
||||
_mockClock->reset(Date_t::fromMillisSinceEpoch(1000));
|
||||
}
|
||||
|
||||
ClockSourceMock* getMockClock() {
|
||||
return _mockClock.get();
|
||||
}
|
||||
|
||||
Date_t now() {
|
||||
return getMockClock()->now();
|
||||
}
|
||||
|
||||
void advanceTime(Milliseconds ms) {
|
||||
getMockClock()->advance(ms);
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<ClockSourceMock> _mockClock;
|
||||
};
|
||||
|
||||
TEST_F(ThroughputGaugeTest, InitiallyEmpty) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 0);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, RecordSingleEvent) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
gauge.recordEvent(now());
|
||||
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 1);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, RecordMultipleEventsWithinOneSecond) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
// Record 5 events at different times within the same second.
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(200));
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(300));
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(300));
|
||||
gauge.recordEvent(now());
|
||||
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 5);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, EventsExpireAfterOneSecond) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
// Record an event at T=0.
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 1);
|
||||
|
||||
// Advance time by just under 1 second - event should still be counted.
|
||||
advanceTime(Milliseconds(999));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 2);
|
||||
|
||||
// Advance time by more than 1 second from first event - first event should expire.
|
||||
advanceTime(Milliseconds(2));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 2); // Only last 2 events.
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, AllEventsExpire) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
}
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 5);
|
||||
|
||||
// Advance time by more than 1 second - all events should expire.
|
||||
advanceTime(Seconds(2));
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 0);
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 1); // Only the new event.
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, SlidingWindowBehavior) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
// Record events at T=0, T=500ms, T=1000ms, T=1500ms.
|
||||
auto startTime = now();
|
||||
gauge.recordEvent(startTime);
|
||||
|
||||
advanceTime(Milliseconds(500));
|
||||
gauge.recordEvent(now());
|
||||
|
||||
advanceTime(Milliseconds(500));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3);
|
||||
|
||||
advanceTime(Milliseconds(500));
|
||||
gauge.recordEvent(now());
|
||||
// At T=1500ms, event at T=0 is > 1 second old (1500ms), so it's expired
|
||||
// Events remaining: T=500, T=1000, T=1500
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3);
|
||||
|
||||
// At T=2000ms, T=500 expires (1500ms old), but T=1000 stays (exactly 1000ms old, not > 1000ms)
|
||||
// Events remaining: T=1000, T=1500, T=2000
|
||||
advanceTime(Milliseconds(500));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, ConcurrentRecordEvents) {
|
||||
ThroughputGauge gauge;
|
||||
constexpr int numThreads = 10;
|
||||
constexpr int eventsPerThread = 100;
|
||||
|
||||
std::vector<stdx::thread> threads;
|
||||
AtomicWord<unsigned> ready{0};
|
||||
|
||||
// Each thread will record events.
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
threads.emplace_back([&, i]() {
|
||||
// Busy-wait until all threads are ready.
|
||||
ready.fetchAndAdd(1);
|
||||
while (ready.load() < numThreads) {
|
||||
}
|
||||
|
||||
// Record events.
|
||||
for (int j = 0; j < eventsPerThread; j++) {
|
||||
gauge.recordEvent(now());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto& thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
// All events should be recorded.
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), numThreads * eventsPerThread);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, ConcurrentRecordAndRead) {
|
||||
ThroughputGauge gauge;
|
||||
constexpr int numWriteThreads = 5;
|
||||
constexpr int numReadThreads = 5;
|
||||
constexpr int eventsPerThread = 50;
|
||||
|
||||
std::vector<stdx::thread> threads;
|
||||
AtomicWord<unsigned> ready{0};
|
||||
AtomicWord<bool> stopReading{false};
|
||||
|
||||
// Writer threads.
|
||||
for (int i = 0; i < numWriteThreads; i++) {
|
||||
threads.emplace_back([&]() {
|
||||
// Busy-wait until all threads are ready.
|
||||
ready.fetchAndAdd(1);
|
||||
while (ready.load() < (numWriteThreads + numReadThreads)) {
|
||||
}
|
||||
|
||||
for (int j = 0; j < eventsPerThread; j++) {
|
||||
gauge.recordEvent(now());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Reader threads.
|
||||
for (int i = 0; i < numReadThreads; i++) {
|
||||
threads.emplace_back([&]() {
|
||||
// Busy-wait until all threads are ready.
|
||||
ready.fetchAndAdd(1);
|
||||
while (ready.load() < (numWriteThreads + numReadThreads)) {
|
||||
}
|
||||
|
||||
// Continuously read while writers are working.
|
||||
while (!stopReading.load()) {
|
||||
auto count = gauge.nEventsInPreviousSecond(now());
|
||||
ASSERT_LESS_THAN_OR_EQUALS(count, numWriteThreads * eventsPerThread);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads to complete.
|
||||
for (int i = 0; i < numWriteThreads; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
stopReading.store(true);
|
||||
for (int i = numWriteThreads; i < numWriteThreads + numReadThreads; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Verify final count.
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), numWriteThreads * eventsPerThread);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, ConcurrentRecordWithExpiration) {
|
||||
ThroughputGauge gauge;
|
||||
constexpr int numThreads = 8;
|
||||
constexpr int eventsPerThread = 25;
|
||||
|
||||
std::vector<stdx::thread> threads;
|
||||
AtomicWord<unsigned> ready{0};
|
||||
|
||||
auto startTime = now();
|
||||
|
||||
// Phase 1: All threads record events at startTime
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
threads.emplace_back([&]() {
|
||||
ready.fetchAndAdd(1);
|
||||
while (ready.load() < numThreads) {
|
||||
}
|
||||
|
||||
for (int j = 0; j < eventsPerThread; j++) {
|
||||
gauge.recordEvent(startTime);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto& thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), numThreads * eventsPerThread);
|
||||
|
||||
// Phase 2: Advance time and record new events, old ones should expire.
|
||||
advanceTime(Milliseconds(1100));
|
||||
auto newTime = now();
|
||||
|
||||
threads.clear();
|
||||
ready.store(0);
|
||||
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
threads.emplace_back([&]() {
|
||||
ready.fetchAndAdd(1);
|
||||
while (ready.load() < numThreads) {
|
||||
}
|
||||
|
||||
for (int j = 0; j < eventsPerThread; j++) {
|
||||
gauge.recordEvent(newTime);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto& thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
// Only new events should be counted.
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), numThreads * eventsPerThread);
|
||||
}
|
||||
|
||||
|
||||
TEST_F(ThroughputGaugeTest, EventsAtExactOneSecondBoundary) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
auto startTime = now();
|
||||
gauge.recordEvent(startTime);
|
||||
|
||||
// Record event at exactly 1 second later.
|
||||
advanceTime(Seconds(1));
|
||||
gauge.recordEvent(now());
|
||||
|
||||
// The 1 second window is inclusive, so the old event should still be there.
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 2);
|
||||
|
||||
// Record event at 1 second + 1 millisecond.
|
||||
advanceTime(Milliseconds(1));
|
||||
gauge.recordEvent(now());
|
||||
|
||||
// Now the first event should be expired.
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 2);
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, InterleaveRecordAndExpire) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
// Record events at T=0, T=100, T=200.
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3);
|
||||
|
||||
// Advance to T=1100 and record - first event should expire.
|
||||
advanceTime(Milliseconds(900));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3); // Events at T=100, T=200, T=1100.
|
||||
|
||||
// Advance to T=1200 and record - T=100 event should expire.
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3); // Events at T=200, T=1100, T=1200.
|
||||
|
||||
// Advance to T=1300 and record - T=200 event should expire.
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3); // Events at T=1100, T=1200, T=1300.
|
||||
}
|
||||
|
||||
TEST_F(ThroughputGaugeTest, EventsExpireWhenThroughputIsExamined) {
|
||||
ThroughputGauge gauge;
|
||||
|
||||
// Record events at T=0, T=100, T=200.
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
advanceTime(Milliseconds(100));
|
||||
gauge.recordEvent(now());
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 3);
|
||||
|
||||
// Advance to T=1100 and check - first event should expire.
|
||||
advanceTime(Milliseconds(900));
|
||||
// Note: NOT calling gauge.recordEvent().
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 2); // Events at T=100, T=200.
|
||||
|
||||
// Advance to T=1200 and re-check - T=100 event should expire.
|
||||
advanceTime(Milliseconds(100));
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 1); // Event at T=200.
|
||||
|
||||
// Advance to T=1300 and re-check - T=200 event should expire.
|
||||
advanceTime(Milliseconds(100));
|
||||
ASSERT_EQ(gauge.nEventsInPreviousSecond(now()), 0);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
Loading…
Reference in New Issue
Block a user