mongo/jstests/change_streams/server_status_change_streams.js
romanskas a32fc4ba6e SERVER-126106 Robustify server_status_change_streams.js (#53395)
GitOrigin-RevId: b0f805ebf411e39c2c21ce7b26d88aa9c480cb1e
2026-05-11 07:16:25 +00:00

206 lines
9.0 KiB
JavaScript

/**
* Tests that the change stream server status metrics are correctly reflected in the serverStatus
* command output.
*
* @tags: [
* # The test assumes that serverStatus metrics are retrieved from the same instance that the
* # change streams are opened on, which is not guaranteed when change streams are opened on secondaries.
* assumes_read_preference_unchanged,
* change_stream_does_not_expect_txns,
* # Can be removed once last-lts and last-continuous are >= 9.0.
* requires_fcv_90,
* uses_change_streams,
* uses_parallel_shell,
* ]
*/
import {after, afterEach, before, beforeEach, describe, it} from "jstests/libs/mochalite.js";
import {assertDropAndRecreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
import {CursorList} from "jstests/libs/query/change_stream_util.js";
import {ServerStatusMetrics, TestDataModifyGuard} from "jstests/change_streams/change_stream_metrics_util.js";
const testDB = db.getSiblingDB(jsTestName());
const testColl = testDB.getCollection("test");
function retryOnUnaccountedCursor(fn) {
const ctx = {};
assert.retryNoExcept(
() => {
ctx.cursorList = new CursorList();
try {
fn.call(ctx);
} finally {
ctx.cursorList.closeAll();
}
return true;
},
"change stream server status gauge test failed after multiple retries",
10 /*attempts*/,
100 /*interval*/,
{runHangAnalyzer: false},
);
}
before(function () {
assertDropAndRecreateCollection(testDB, testColl.getName());
assert.commandWorked(testColl.insert({_id: 1}));
// The test is incompatible with multi-router setup, as we're checking per-process metrics.
this.testDataChange = new TestDataModifyGuard("pinToSingleMongos", true);
});
after(function () {
assertDropCollection(testDB, testColl.getName());
this.testDataChange.restore();
});
describe("change stream cursor counter metrics in serverStatus", function () {
beforeEach(function () {
const csMetrics = ServerStatusMetrics.getCsMetrics(db);
this.totalOpenedBefore = csMetrics.cursor.totalOpened;
this.lifespanBefore = csMetrics.cursor.lifespan;
this.cursorList = new CursorList();
});
afterEach(function () {
// Close any cursors left open by a test (e.g. on failure).
this.cursorList.closeAll();
});
it("changeStreams.cursor.totalOpened increases as change stream cursors are opened", function () {
this.cursorList.push(testColl.watch(), testColl.watch(), testColl.watch());
assert.gte(
ServerStatusMetrics.getCsMetrics(db).cursor.totalOpened,
this.totalOpenedBefore + this.cursorList.length(),
"totalOpened should increase by the number of opened change stream cursors",
);
});
it("changeStreams.cursor.totalOpened does not decrease when change stream cursors are closed", function () {
this.cursorList.push(testColl.watch(), testColl.watch());
const afterOpen = ServerStatusMetrics.getCsMetrics(db).cursor.totalOpened;
this.cursorList.closeAll();
// totalOpened is a counter - it must not decrease on close.
assert.gte(ServerStatusMetrics.getCsMetrics(db).cursor.totalOpened, afterOpen);
});
it("changeStreams.cursor.lifespan histogram is populated after a change stream cursor is closed", function () {
this.cursorList.push(testColl.watch());
this.cursorList.closeAll();
assert.gte(
ServerStatusMetrics.getCsMetrics(db).cursor.lifespan.totalCount,
this.lifespanBefore.totalCount + 1,
"lifespan totalCount should increase by 1 after closing a change stream cursor",
);
});
});
describe("change stream cursor open gauge metrics in serverStatus", function () {
it("changeStreams.cursor.open.total increases as change stream cursors are opened", function () {
retryOnUnaccountedCursor(function () {
const csMetricsBefore = ServerStatusMetrics.getCsMetrics(db);
this.cursorList.push(testColl.watch(), testColl.watch(), testColl.watch());
// 'changeStreams.cursor.open.total' should increase, while 'changeStreams.cursor.open.pinned' remain same as before.
const csMetricsAfter = ServerStatusMetrics.getCsMetrics(db);
assert.eq(csMetricsBefore.cursor.open.pinned, csMetricsAfter.cursor.open.pinned);
assert.eq(csMetricsBefore.cursor.open.total + this.cursorList.length(), csMetricsAfter.cursor.open.total);
});
});
it("changeStreams.cursor.open.total decreases as change stream cursors are closed", function () {
retryOnUnaccountedCursor(function () {
const csMetricsBefore = ServerStatusMetrics.getCsMetrics(db);
this.cursorList.push(testColl.watch(), testColl.watch(), testColl.watch());
assert.eq(
ServerStatusMetrics.getCsMetrics(db).cursor.open.total,
csMetricsBefore.cursor.open.total + this.cursorList.length(),
);
this.cursorList.closeAll();
const csMetricsAfter = ServerStatusMetrics.getCsMetrics(db);
assert.eq(csMetricsBefore.cursor.open.total, csMetricsAfter.cursor.open.total);
assert.eq(csMetricsBefore.cursor.open.pinned, csMetricsAfter.cursor.open.pinned);
});
});
it("changeStreams.cursor.open.pinned increases while a getMore is in progress", function () {
retryOnUnaccountedCursor(function () {
const csMetricsBefore = ServerStatusMetrics.getCsMetrics(db);
// Enable the failpoint that pauses getMore after the cursor is pinned but before the pin
// is released.
const fp = configureFailPoint(db, "waitAfterPinningCursorBeforeGetMoreBatch");
let joinGetMore;
try {
// In a parallel shell, open a change stream and call hasNext() to trigger a getMore, which
// pins the cursor and hits the failpoint.
joinGetMore = startParallelShell(
funWithArgs(
function (dbName, collName) {
const csCursor = db.getSiblingDB(dbName).getCollection(collName).watch();
csCursor.hasNext();
csCursor.close();
},
testDB.getName(),
testColl.getName(),
),
db.getMongo().port,
);
// Wait until the parallel shell's getMore hits the failpoint (cursor is now pinned).
fp.wait();
// Assert the pinned metric has increased by 1 while the cursor is pinned.
assert.eq(
csMetricsBefore.cursor.open.pinned + 1,
ServerStatusMetrics.getCsMetrics(db).cursor.open.pinned,
);
} finally {
// Release the failpoint to let the parallel shell's getMore complete.
fp.off();
if (joinGetMore) {
// Wait for the parallel shell to finish.
joinGetMore();
}
}
// Assert the pinned metric has returned to the original value after the cursor is unpinned.
assert.eq(csMetricsBefore.cursor.open.pinned, ServerStatusMetrics.getCsMetrics(db).cursor.open.pinned);
});
});
it("a regular (non-change-stream) cursor does not affect change stream metrics", function () {
retryOnUnaccountedCursor(function () {
const csMetricsBefore = ServerStatusMetrics.getCsMetrics(db);
const regularCursorOpenBefore = ServerStatusMetrics.getSsMetrics(db).cursor.open.total;
// Open a regular find cursor with batchSize:0. This returns an empty first batch, but
// leaves the cursor open on the server, making it a live multi-batch cursor without it
// being a change stream.
const cursor = this.cursorList.push(
new DBCommandCursor(
testDB,
assert.commandWorked(testDB.runCommand({find: testColl.getName(), filter: {}, batchSize: 0})),
0,
),
);
assert.neq(cursor.getId(), 0, "unexpected empty cursorId");
const regularCursorOpenAfter = ServerStatusMetrics.getSsMetrics(db).cursor.open.total;
assert.eq(regularCursorOpenBefore + 1, regularCursorOpenAfter);
const csMetricsAfter = ServerStatusMetrics.getCsMetrics(db);
assert.eq(csMetricsBefore.cursor.open.pinned, csMetricsAfter.cursor.open.pinned);
assert.eq(csMetricsBefore.cursor.open.total, csMetricsAfter.cursor.open.total);
assert.eq(csMetricsBefore.cursor.totalOpened, csMetricsAfter.cursor.totalOpened);
});
});
});