SERVER-120318 Add metric for bytes of uncompressed oplog entries applied to track progress of PIT restore (#52091)

Co-authored-by: Evelyn Wu <evelyn.wu@mongodb.com>
GitOrigin-RevId: 26e7a2fe95a5906f8e22a4dfba6ae2e4448c02d3
This commit is contained in:
Evelyn Wu 2026-04-22 12:09:32 -04:00 committed by MongoDB Bot
parent 8fc72f1a28
commit 0f6c4edb04
6 changed files with 48 additions and 3 deletions

View File

@ -35,6 +35,8 @@ last-continuous:
ticket: SERVER-121533
- test_file: jstests/core/timeseries/query/timeseries_lookup_index_use.js
ticket: SERVER-95352
- test_file: jstests/replsets/server_status_metrics.js
ticket: SERVER-120318
- test_file: jstests/core/index/geo/geo_circle_spilling.js
ticket: SERVER-124376
- test_file: jstests/noPassthrough/sharded_cluster_topology/shard_registry_rsm_removal_race.js
@ -686,6 +688,8 @@ last-lts:
ticket: SERVER-106165
- test_file: jstests/replsets/oplog_fetch_lag_metric.js
ticket: SERVER-116300
- test_file: jstests/replsets/server_status_metrics.js
ticket: SERVER-120318
- test_file: jstests/sharding/resharding_collection_cloner.js
ticket: SERVER-120210
- test_file: jstests/noPassthrough/replication/killOp_against_repl_threads.js

View File

@ -55,6 +55,7 @@ const expectedReplSection = {
"attemptsToBecomeSecondary": 1,
"batchSize": 0,
"batches": {"num": 0, "totalMillis": 0},
"bytes": 0,
"ops": 0,
},
"heartBeat": {"handleQueueSize": 0, "maxSeenHandleQueueSize": 0},

View File

@ -22,7 +22,14 @@ import {
/**
* Test replication metrics
*/
function _testSecondaryMetricsHelper(secondary, opCount, baseOpsApplied, baseOpsReceived, baseOpsWritten) {
function _testSecondaryMetricsHelper(
secondary,
opCount,
baseOpsApplied,
baseOpsReceived,
baseOpsWritten,
baseOpsBytes,
) {
let ss = secondary.getDB("test").serverStatus();
jsTestLog(`Secondary ${secondary.host} metrics: ${tojson(ss.metrics)}`);
@ -67,6 +74,7 @@ function _testSecondaryMetricsHelper(secondary, opCount, baseOpsApplied, baseOps
assert(ss.metrics.repl.apply.batches.num > 0, "no applied batches");
assert(ss.metrics.repl.apply.batches.totalMillis >= 0, "missing applied batch time");
assert.eq(ss.metrics.repl.apply.ops, opCount + baseOpsApplied, "wrong number of applied ops");
assert.gt(ss.metrics.repl.apply.bytes, baseOpsBytes, "apply bytes did not increase");
if (FeatureFlagUtil.isPresentAndEnabled(secondary, "ReduceMajorityWriteLatency")) {
assert.eq(ss.metrics.repl.write.batchSize, opCount + baseOpsWritten, "write ops batch size mismatch");
@ -79,10 +87,17 @@ function _testSecondaryMetricsHelper(secondary, opCount, baseOpsApplied, baseOps
// Metrics are racy, e.g. repl.buffer.apply.count could over- or under-reported briefly. Retry on
// error.
function testSecondaryMetrics(secondary, opCount, baseOpsApplied, baseOpsReceived, baseOpsWritten) {
function testSecondaryMetrics(secondary, opCount, baseOpsApplied, baseOpsReceived, baseOpsWritten, baseOpsBytes) {
assert.soon(() => {
try {
_testSecondaryMetricsHelper(secondary, opCount, baseOpsApplied, baseOpsReceived, baseOpsWritten);
_testSecondaryMetricsHelper(
secondary,
opCount,
baseOpsApplied,
baseOpsReceived,
baseOpsWritten,
baseOpsBytes,
);
return true;
} catch (exc) {
jsTestLog(`Caught ${exc}, retrying`);
@ -136,6 +151,7 @@ let secondaryBaseOplogOpsReceived = ss.metrics.repl.apply.batchSize;
let secondaryBaseOplogOpsWritten = FeatureFlagUtil.isPresentAndEnabled(secondary, "ReduceMajorityWriteLatency")
? ss.metrics.repl.write.batchSize
: undefined;
let secondaryBaseOplogBytes = ss.metrics.repl.apply.bytes;
// Disable batching of inserts so each one creates an oplog entry.
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalInsertMaxBatchSize: 1}));
@ -153,6 +169,7 @@ testSecondaryMetrics(
secondaryBaseOplogOpsApplied,
secondaryBaseOplogOpsReceived,
secondaryBaseOplogOpsWritten,
secondaryBaseOplogBytes,
);
let options = {writeConcern: {w: 2}, multi: true, upsert: true};
@ -164,6 +181,7 @@ testSecondaryMetrics(
secondaryBaseOplogOpsApplied,
secondaryBaseOplogOpsReceived,
secondaryBaseOplogOpsWritten,
secondaryBaseOplogBytes,
);
// Test that the number of oplog getMore requested by the secondary and processed by the primary has

View File

@ -1174,6 +1174,7 @@ mongo_cc_library(
"//src/mongo/db/stats:timer_stats",
"//src/mongo/db/storage:storage_control",
"//src/mongo/db/storage:storage_options",
"//src/mongo/otel/metrics:otel_metrics_service",
"//src/mongo/util/concurrency:thread_pool",
"//src/mongo/util/net:network",
],

View File

@ -72,6 +72,11 @@
#include "mongo/logv2/attribute_storage.h"
#include "mongo/logv2/log.h"
#include "mongo/logv2/log_util.h"
#include "mongo/otel/metrics/metric_names.h"
#include "mongo/otel/metrics/metric_unit.h"
#include "mongo/otel/metrics/metrics_counter.h"
#include "mongo/otel/metrics/metrics_service.h"
#include "mongo/otel/metrics/server_status_options.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/compiler.h"
#include "mongo/platform/random.h"
@ -124,6 +129,16 @@ auto& oplogApplicationBatchSize = *MetricBuilder<Counter64>{"repl.apply.batchSiz
// Number and time of each ApplyOps worker pool round
auto& applyBatchStats = *MetricBuilder<TimerStats>("repl.apply.batches");
// Total bytes of uncompressed oplog entries applied.
auto& oplogBytesApplied = otel::metrics::MetricsService::instance().createInt64Counter(
otel::metrics::MetricNames::kOplogApplyBytes,
"Total bytes of uncompressed oplog entries applied.",
otel::metrics::MetricUnit::kBytes,
{.serverStatusOptions = otel::metrics::ServerStatusOptions{
.dottedPath = "repl.apply.bytes",
.role = ClusterRole{ClusterRole::None},
}});
/**
* Used for logging a report of ops that take longer than "slowMS" to apply. This is called
* right before returning from applyOplogEntryOrGroupedInserts, and it returns the same status.
@ -571,6 +586,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
const auto lastOpTimeInBatch = lastOpInBatch.getOpTime();
const auto lastWallTimeInBatch = lastOpInBatch.getWallClockTime();
const auto lastAppliedOpTimeAtStartOfBatch = _replCoord->getMyLastAppliedOpTime();
const auto batchByteSize = ops.byteSize();
// Make sure the oplog doesn't go back in time or repeat an entry.
if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) {
@ -619,6 +635,8 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
fassertNoTrace(34437, swLastOpTimeAppliedInBatch);
invariant(swLastOpTimeAppliedInBatch.getValue() == lastOpTimeInBatch);
oplogBytesApplied.add(batchByteSize);
// Update various things that care about our last applied optime.
// 1. Ensure that the last applied op time hasn't changed since the start of this batch.

View File

@ -157,6 +157,9 @@ public:
static constexpr MetricName kIndexBuildKeysGeneratedFromScan = {
"index_builds.keys_generated_from_scan"};
// Replication Team Metrics
static constexpr MetricName kOplogApplyBytes = {"oplog.apply.bytes"};
// Query Integration Team Metrics
// Op Counters