SERVER-123334 Integrate latest size/count into Collection API (#51746)
GitOrigin-RevId: f9f42356600fe2c7c34a30878ecd4fda33f7ba99
This commit is contained in:
parent
575b682bda
commit
7442fa78cd
@ -16,11 +16,13 @@
|
||||
* # it to fail due to not finishing quickly enough.
|
||||
* incompatible_with_concurrency_simultaneous,
|
||||
* requires_collstats,
|
||||
* requires_capped
|
||||
* requires_capped,
|
||||
* # TODO(SERVER-124037): Remove.
|
||||
* featureFlagReplicatedFastCount_incompatible,
|
||||
* ]
|
||||
*/
|
||||
export const $config = (function () {
|
||||
// TODO: This workload may fail if an iteration multiplier is specified.
|
||||
// TODO(SERVER-124041): This workload may fail if an iteration multiplier is specified.
|
||||
let data = {prefix: "convert_to_capped_collection"};
|
||||
|
||||
let states = (function () {
|
||||
|
||||
@ -20,7 +20,9 @@
|
||||
* # it to fail due to not finishing quickly enough.
|
||||
* incompatible_with_concurrency_simultaneous,
|
||||
* requires_collstats,
|
||||
* requires_capped
|
||||
* requires_capped,
|
||||
* # TODO(SERVER-124037): Remove.
|
||||
* featureFlagReplicatedFastCount_incompatible,
|
||||
* ]
|
||||
*/
|
||||
import {extendWorkload} from "jstests/concurrency/fsm_libs/extend_workload.js";
|
||||
|
||||
@ -10,6 +10,8 @@
|
||||
* # convertToCapped requires a global lock and any background operations on the database causes
|
||||
* # it to fail due to not finishing quickly enough.
|
||||
* incompatible_with_concurrency_simultaneous,
|
||||
* # TODO(SERVER-124037): Remove.
|
||||
* featureFlagReplicatedFastCount_incompatible,
|
||||
* ]
|
||||
*/
|
||||
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
/**
|
||||
* Test TTL docs are not deleted from secondaries directly
|
||||
* @tags: [requires_replication]
|
||||
* @tags: [
|
||||
* requires_replication,
|
||||
* featureFlagReplicatedFastCount_incompatible
|
||||
* ]
|
||||
*/
|
||||
|
||||
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/op_observer/batched_write_context.h"
|
||||
#include "mongo/db/op_observer/op_observer.h"
|
||||
#include "mongo/db/op_observer/op_observer_util.h"
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_enabled.h"
|
||||
@ -119,8 +120,21 @@ void cappedDeleteUntilBelowConfiguredMaximum(OperationContext* opCtx,
|
||||
invariant(shard_role_details::getLocker(opCtx)->getLockMode(
|
||||
ResourceId(RESOURCE_METADATA, nss)) == MODE_X);
|
||||
|
||||
const long long currentDataSize = collection->dataSize(opCtx);
|
||||
const long long currentNumRecords = collection->numRecords(opCtx);
|
||||
// When writes are batched, the capped collection insert is not written to the oplog until the
|
||||
// top-level WriteUnitOfWork commits. When writes are not batched, the capped collection insert
|
||||
// is written to the oplog immediately. latestSizeCount() scans the oplog to compute the latest
|
||||
// collection size/count, so it misses the latest insert when writes are batched. To correctly
|
||||
// compute currentDataSize and currentNumRecords, we include the uncommitted size/count changes
|
||||
// if and only if writes are batched.
|
||||
const bool batched = BatchedWriteContext::get(opCtx).writesAreBatched();
|
||||
const CollectionSizeCount uncommittedChanges = (batched)
|
||||
? UncommittedFastCountChange::getForRead(opCtx).find(collection->uuid())
|
||||
: CollectionSizeCount{.size = 0, .count = 0};
|
||||
|
||||
const auto [latestSize, latestCount] = collection->latestSizeCount(opCtx);
|
||||
|
||||
const long long currentDataSize = latestSize + uncommittedChanges.size;
|
||||
const long long currentNumRecords = latestCount + uncommittedChanges.count;
|
||||
|
||||
const auto cappedMaxSize = collection->getCollectionOptions().cappedSize;
|
||||
const long long sizeOverCap =
|
||||
|
||||
@ -201,7 +201,8 @@ public:
|
||||
max = shardKeyPattern.normalizeShardKey(max);
|
||||
}
|
||||
|
||||
const long long numRecords = collection.getCollectionPtr()->numRecords(opCtx);
|
||||
const auto [dataSize, numRecords] =
|
||||
collection.getCollectionPtr()->latestSizeCount(opCtx);
|
||||
reply.setNumObjects(numRecords);
|
||||
|
||||
if (numRecords == 0) {
|
||||
@ -220,8 +221,7 @@ public:
|
||||
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
|
||||
if (min.isEmpty() && max.isEmpty()) {
|
||||
if (estimate) {
|
||||
reply.setSize(
|
||||
static_cast<long long>(collection.getCollectionPtr()->dataSize(opCtx)));
|
||||
reply.setSize(dataSize);
|
||||
reply.setMillis(timer.millis());
|
||||
return reply;
|
||||
}
|
||||
@ -268,8 +268,7 @@ public:
|
||||
const auto maxObjects = cmd.getMaxObjects();
|
||||
|
||||
std::remove_const_t<decltype(maxSize)> size = 0;
|
||||
std::remove_const_t<decltype(size)> avgObjSize =
|
||||
collection.getCollectionPtr()->dataSize(opCtx) / numRecords;
|
||||
std::remove_const_t<decltype(size)> avgObjSize = dataSize / numRecords;
|
||||
std::remove_const_t<decltype(maxObjects)> numObjects = 0;
|
||||
|
||||
try {
|
||||
|
||||
@ -56,7 +56,7 @@ PlanStage::StageState RecordStoreFastCountStage::doWork(WorkingSetID* out) {
|
||||
// This stage never returns a working set member.
|
||||
*out = WorkingSet::INVALID_ID;
|
||||
|
||||
long long nCounted = collectionPtr()->numRecords(opCtx());
|
||||
long long nCounted = collectionPtr()->latestSizeCount(opCtx()).count;
|
||||
|
||||
if (_skip) {
|
||||
nCounted -= _skip;
|
||||
|
||||
@ -213,6 +213,26 @@ mongo_cc_unit_test(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "replicated_fast_count_manager_test",
|
||||
srcs = [
|
||||
"replicated_fast_count_manager_test.cpp",
|
||||
],
|
||||
tags = [
|
||||
"mongo_unittest_third_group",
|
||||
"server-collection-write-path",
|
||||
],
|
||||
deps = [
|
||||
"replicated_fast_count_init",
|
||||
"replicated_fast_count_manager",
|
||||
"replicated_fast_count_test_helpers",
|
||||
"size_count_store",
|
||||
"size_count_timestamp_store",
|
||||
"//src/mongo/db/shard_role/shard_catalog:catalog_test_fixture",
|
||||
"//src/mongo/db/storage:recovery_unit_base",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "persisted_size_count_test",
|
||||
srcs = [
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_advance_checkpoint.h"
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_delta_utils.h"
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_read.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_raii.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/clustered_collection_util.h"
|
||||
#include "mongo/db/update/document_diff_calculator.h"
|
||||
#include "mongo/db/update/update_oplog_entry_serialization.h"
|
||||
@ -197,8 +198,37 @@ CollectionSizeCount ReplicatedFastCountManager::find(const UUID& uuid) const {
|
||||
return {};
|
||||
}
|
||||
|
||||
CollectionSizeCount ReplicatedFastCountManager::findLatest(OperationContext* opCtx,
|
||||
UUID uuid) const {
|
||||
// Callers sometimes acquire a collection lock before reading findLatest(). When we try to
|
||||
// acquire an additional oplog collection lock here, an assertion can be triggered when multiple
|
||||
// lock acquisitions are disallowed, for example, during capped collection deletes. These
|
||||
// operations should be thread safe, though, since we only acquire IS locks here, but this RAII
|
||||
// wrapper suppresses the assertion for now.
|
||||
AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(
|
||||
shard_role_details::getLocker(opCtx));
|
||||
|
||||
// The oplog visible timestamp managed by the WiredTigerOplogManager is at most the WiredTiger
|
||||
// all_durable timestamp which never includes oplog holes. However, some callers of findLatest()
|
||||
// expect to see all committed changes, including those beyond one or more oplog holes. We
|
||||
// override the RecoveryUnit's oplog visible timestamp here to allow reading all oplog entries
|
||||
// for the duration of this function.
|
||||
ScopedOplogVisibleTimestamp scopedOplogVisibleTimestamp(
|
||||
shard_role_details::getRecoveryUnit(opCtx), boost::none);
|
||||
|
||||
const AutoGetOplogFastPath oplogRead(opCtx, OplogAccessMode::kRead);
|
||||
const auto& oplogColl = oplogRead.getCollection();
|
||||
massert(123334, "oplog collection not found", oplogColl);
|
||||
|
||||
auto oplogCursor = oplogColl->getRecordStore()->getCursor(
|
||||
opCtx, *shard_role_details::getRecoveryUnit(opCtx), /*forward=*/true);
|
||||
|
||||
return replicated_fast_count::readLatest(
|
||||
opCtx, _sizeCountStore, _timestampStore, *oplogCursor, uuid);
|
||||
}
|
||||
|
||||
CollectionSizeCount ReplicatedFastCountManager::findPersisted(OperationContext* opCtx,
|
||||
const UUID& uuid) const {
|
||||
UUID uuid) const {
|
||||
return replicated_fast_count::readPersisted(opCtx, _sizeCountStore, uuid);
|
||||
}
|
||||
|
||||
|
||||
@ -94,6 +94,13 @@ class MONGO_MOD_PUBLIC ReplicatedFastCountManager {
|
||||
using FastSizeCountMap = absl::flat_hash_map<UUID, StoredSizeCount>;
|
||||
|
||||
public:
|
||||
MONGO_MOD_PRIVATE ReplicatedFastCountManager(
|
||||
replicated_fast_count::SizeCountStore sizeCountStore,
|
||||
replicated_fast_count::SizeCountTimestampStore timestampStore)
|
||||
: _sizeCountStore(std::move(sizeCountStore)), _timestampStore(std::move(timestampStore)) {
|
||||
initializeFastCountCommitFn();
|
||||
}
|
||||
|
||||
static ReplicatedFastCountManager& get(ServiceContext* svcCtx);
|
||||
|
||||
ReplicatedFastCountManager() {
|
||||
@ -141,10 +148,23 @@ public:
|
||||
*/
|
||||
CollectionSizeCount find(const UUID& uuid) const;
|
||||
|
||||
/**
|
||||
* Returns the number of records (count) and data size for the collection with `uuid` as of the
|
||||
* last committed change.
|
||||
*
|
||||
* This function traverses the oplog to compute latest size/count of the collection with `uuid`.
|
||||
* This traversal ignores oplog visibility rules and thus accumulates oplog entries beyond oplog
|
||||
* holes.
|
||||
*
|
||||
* WARNING: This function is much less performant than `findPersisted()`. Only use
|
||||
* `findLatest()` when precise size/count information is required for correctness.
|
||||
*/
|
||||
CollectionSizeCount findLatest(OperationContext* opCtx, UUID uuid) const;
|
||||
|
||||
/**
|
||||
* Returns the persisted number of records (count) and data size for the collection with `uuid`.
|
||||
*/
|
||||
CollectionSizeCount findPersisted(OperationContext* opCtx, const UUID& uuid) const;
|
||||
CollectionSizeCount findPersisted(OperationContext* opCtx, UUID uuid) const;
|
||||
|
||||
/**
|
||||
* Signals the background thread to perform a flush.
|
||||
|
||||
@ -0,0 +1,150 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_manager.h"
|
||||
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_init.h"
|
||||
#include "mongo/db/replicated_fast_count/replicated_fast_count_test_helpers.h"
|
||||
#include "mongo/db/replicated_fast_count/size_count_store.h"
|
||||
#include "mongo/db/replicated_fast_count/size_count_timestamp_store.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_test_fixture.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo::replicated_fast_count {
|
||||
namespace {
|
||||
|
||||
class ReplicatedFastCountManagerTest : public CatalogTestFixture {
|
||||
public:
|
||||
ReplicatedFastCountManagerTest()
|
||||
: CatalogTestFixture(Options().setPersistenceProvider(
|
||||
std::make_unique<replicated_fast_count_test_helpers::
|
||||
ReplicatedFastCountTestPersistenceProvider>())) {}
|
||||
|
||||
protected:
|
||||
void setUp() override {
|
||||
CatalogTestFixture::setUp();
|
||||
|
||||
ASSERT_OK(createReplicatedFastCountCollection(storageInterface(), operationContext()));
|
||||
ASSERT_OK(
|
||||
createReplicatedFastCountTimestampCollection(storageInterface(), operationContext()));
|
||||
|
||||
manager =
|
||||
std::make_unique<ReplicatedFastCountManager>(sizeCountStore, sizeCountTimestampStore);
|
||||
}
|
||||
|
||||
test_helpers::NsAndUUID collA = {
|
||||
.nss = NamespaceString::createNamespaceString_forTest("find_test", "collA"),
|
||||
.uuid = UUID::gen()};
|
||||
test_helpers::NsAndUUID collB = {
|
||||
.nss = NamespaceString::createNamespaceString_forTest("find_test", "collB"),
|
||||
.uuid = UUID::gen()};
|
||||
|
||||
SizeCountStore sizeCountStore;
|
||||
SizeCountTimestampStore sizeCountTimestampStore;
|
||||
std::unique_ptr<ReplicatedFastCountManager> manager;
|
||||
};
|
||||
|
||||
using ReplicatedFastCountManagerFindLatestTest = ReplicatedFastCountManagerTest;
|
||||
|
||||
TEST_F(ReplicatedFastCountManagerFindLatestTest, FindLatestCombinesStoredValuesWithOplogDeltas) {
|
||||
test_helpers::insertSizeCountEntry(operationContext(),
|
||||
sizeCountStore,
|
||||
collA.uuid,
|
||||
{.timestamp = Timestamp::min(), .size = 5, .count = 1});
|
||||
test_helpers::insertSizeCountTimestamp(
|
||||
operationContext(), sizeCountTimestampStore, Timestamp::min());
|
||||
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(1, 1), collA, repl::OpTypeEnum::kInsert, /*sizeDelta=*/10));
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(2, 2), collA, repl::OpTypeEnum::kUpdate, /*sizeDelta=*/100));
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(3, 3), collA, repl::OpTypeEnum::kDelete, /*sizeDelta=*/-50));
|
||||
|
||||
const CollectionSizeCount result = manager->findLatest(operationContext(), collA.uuid);
|
||||
EXPECT_EQ(result.size, 5 + 10 + 100 - 50);
|
||||
EXPECT_EQ(result.count, 1 + 1 - 1);
|
||||
}
|
||||
|
||||
TEST_F(ReplicatedFastCountManagerFindLatestTest, FindLatestReturnsStoredValuesWhenNoOplogDeltas) {
|
||||
test_helpers::insertSizeCountEntry(operationContext(),
|
||||
sizeCountStore,
|
||||
collA.uuid,
|
||||
{.timestamp = Timestamp::min(), .size = 42, .count = 7});
|
||||
test_helpers::insertSizeCountTimestamp(
|
||||
operationContext(), sizeCountTimestampStore, Timestamp::min());
|
||||
|
||||
const CollectionSizeCount result = manager->findLatest(operationContext(), collA.uuid);
|
||||
EXPECT_EQ(result.size, 42);
|
||||
EXPECT_EQ(result.count, 7);
|
||||
}
|
||||
|
||||
TEST_F(ReplicatedFastCountManagerFindLatestTest, FindLatestFiltersToRequestedUuid) {
|
||||
test_helpers::insertSizeCountEntry(operationContext(),
|
||||
sizeCountStore,
|
||||
collA.uuid,
|
||||
{.timestamp = Timestamp::min(), .size = 5, .count = 1});
|
||||
test_helpers::insertSizeCountEntry(operationContext(),
|
||||
sizeCountStore,
|
||||
collB.uuid,
|
||||
{.timestamp = Timestamp::min(), .size = 100, .count = 10});
|
||||
test_helpers::insertSizeCountTimestamp(
|
||||
operationContext(), sizeCountTimestampStore, Timestamp::min());
|
||||
|
||||
// Write interleaved oplog entries for both collections.
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(1, 1), collA, repl::OpTypeEnum::kInsert, /*sizeDelta=*/10));
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(2, 2), collB, repl::OpTypeEnum::kInsert, /*sizeDelta=*/200));
|
||||
test_helpers::writeToOplog(
|
||||
operationContext(),
|
||||
test_helpers::makeOplogEntry(
|
||||
Timestamp(3, 3), collA, repl::OpTypeEnum::kDelete, /*sizeDelta=*/-3));
|
||||
|
||||
const CollectionSizeCount resultA = manager->findLatest(operationContext(), collA.uuid);
|
||||
EXPECT_EQ(resultA.size, 5 + 10 - 3);
|
||||
EXPECT_EQ(resultA.count, 1 + 1 - 1);
|
||||
|
||||
const CollectionSizeCount resultB = manager->findLatest(operationContext(), collB.uuid);
|
||||
EXPECT_EQ(resultB.size, 100 + 200);
|
||||
EXPECT_EQ(resultB.count, 10 + 1);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo::replicated_fast_count
|
||||
@ -38,11 +38,9 @@ CollectionSizeCount readLatest(OperationContext* opCtx,
|
||||
const SizeCountTimestampStore& timestampStore,
|
||||
SeekableRecordCursor& cursor,
|
||||
UUID uuid) {
|
||||
const auto entry = sizeCountStore.read(opCtx, uuid);
|
||||
massert(12092100,
|
||||
fmt::format("Expected the size/count store to contain an entry for UUID={}",
|
||||
uuid.toString()),
|
||||
entry.has_value());
|
||||
const auto entry =
|
||||
sizeCountStore.read(opCtx, uuid)
|
||||
.value_or(SizeCountStore::Entry{.timestamp = Timestamp::min(), .size = 0, .count = 0});
|
||||
// We default to Timestamp::min() when the timestamp store does not yet contain a timestamp.
|
||||
const Timestamp timestamp = timestampStore.read(opCtx).value_or(Timestamp::min());
|
||||
|
||||
@ -50,17 +48,17 @@ CollectionSizeCount readLatest(OperationContext* opCtx,
|
||||
// timestamp store to determine where to begin reading the oplog. The size and count entry is
|
||||
// valid as of the maximum timestamp, so we only need to aggregate deltas after that point in
|
||||
// time.
|
||||
const Timestamp seekAfterTs = std::max(entry->timestamp, timestamp);
|
||||
const Timestamp seekAfterTs = std::max(entry.timestamp, timestamp);
|
||||
const auto deltas = aggregateSizeCountDeltasInOplog(cursor, seekAfterTs, uuid).deltas;
|
||||
|
||||
// If there are no oplog entries for this UUID after seekAfterTs, the stored values are already
|
||||
// accurate and no delta adjustment is needed.
|
||||
if (!deltas.contains(uuid)) {
|
||||
return CollectionSizeCount{.size = entry->size, .count = entry->count};
|
||||
return CollectionSizeCount{.size = entry.size, .count = entry.count};
|
||||
}
|
||||
|
||||
return CollectionSizeCount{.size = entry->size + deltas.at(uuid).sizeCount.size,
|
||||
.count = entry->count + deltas.at(uuid).sizeCount.count};
|
||||
return CollectionSizeCount{.size = entry.size + deltas.at(uuid).sizeCount.size,
|
||||
.count = entry.count + deltas.at(uuid).sizeCount.count};
|
||||
}
|
||||
|
||||
[[nodiscard]] CollectionSizeCount readPersisted(OperationContext* opCtx,
|
||||
|
||||
@ -45,8 +45,6 @@ namespace mongo::replicated_fast_count {
|
||||
* `timestampStore` are used to determine where to begin traversing `cursor`.
|
||||
*
|
||||
* `cursor` must be positioned on an oplog collection.
|
||||
*
|
||||
* If `uuid` is not contained in `sizeCountStore`, readLatest() throws an assertion error.
|
||||
*/
|
||||
[[nodiscard]] CollectionSizeCount readLatest(OperationContext* opCtx,
|
||||
const SizeCountStore& sizeCountStore,
|
||||
|
||||
@ -134,9 +134,8 @@ TEST_F(ReadLatestTest, UuidNotFoundInSizeCountStore) {
|
||||
OplogCursorMock cursor(std::list<repl::OplogEntry>{});
|
||||
const UUID uuid = UUID::gen();
|
||||
|
||||
EXPECT_THROW(std::ignore =
|
||||
readLatest(operationContext(), sizeCountStore, timestampStore, cursor, uuid),
|
||||
DBException);
|
||||
EXPECT_EQ(readLatest(operationContext(), sizeCountStore, timestampStore, cursor, uuid),
|
||||
CollectionSizeCount({.size = 0, .count = 0}));
|
||||
}
|
||||
|
||||
TEST_F(ReadLatestTest, UuidNotFoundInSizeCountTimestampStore) {
|
||||
|
||||
@ -748,117 +748,60 @@ TEST_F(ReplicatedFastCountTest, ApplyOpsDeletesAreCorrectlyAccountedFor) {
|
||||
replicated_fast_count_test_helpers::checkUncommittedFastCountChanges(_opCtx, _uuid1, 0, 0);
|
||||
}
|
||||
|
||||
TEST_F(ReplicatedFastCountTest, CappedDeletesUpdateFastCountWhenHittingCapCount) {
|
||||
enum class CapType { kCount, kSize };
|
||||
|
||||
class ReplicatedFastCountCappedCollectionTest : public ReplicatedFastCountTest,
|
||||
public ::testing::WithParamInterface<CapType> {};
|
||||
|
||||
TEST_P(ReplicatedFastCountCappedCollectionTest, CorrectSizeCountAfterCapReached) {
|
||||
RAIIServerParameterControllerForTest featureFlag("featureFlagReplicatedFastCount", true);
|
||||
|
||||
// Make this an unreplicated block so that the capped insert and delete combo doesn't violate
|
||||
// the multi‑timestamp constraint.
|
||||
repl::UnreplicatedWritesBlock uwb(_opCtx);
|
||||
|
||||
const int cappedCollMaxCount = 5;
|
||||
|
||||
NamespaceString nssCapped = NamespaceString::createNamespaceString_forTest(
|
||||
const int maxDocs = 5;
|
||||
const NamespaceString nssCapped = NamespaceString::createNamespaceString_forTest(
|
||||
"replicated_fast_count_test", "cappedWithMaxCount");
|
||||
|
||||
auto uuidCapped = UUID::gen();
|
||||
|
||||
ASSERT_OK(
|
||||
createCollection(_opCtx,
|
||||
nssCapped.dbName(),
|
||||
BSON("create" << nssCapped.coll() << "capped" << true << "size"
|
||||
<< cappedCollMaxCount * sampleDocForInsert.objsize() *
|
||||
5 // Make the size larger so that count is the limiting bound
|
||||
<< "max" << cappedCollMaxCount)));
|
||||
|
||||
// Insert up until the max capped doc count.
|
||||
AutoGetCollection cappedColl(_opCtx, nssCapped, LockMode::MODE_IX);
|
||||
|
||||
uuidCapped = cappedColl->uuid();
|
||||
|
||||
for (int i = 0; i < cappedCollMaxCount; ++i) {
|
||||
WriteUnitOfWork wuow(_opCtx);
|
||||
ASSERT_OK(Helpers::insert(_opCtx, *cappedColl, docGeneratorForInsert(i)));
|
||||
replicated_fast_count_test_helpers::checkUncommittedFastCountChanges(
|
||||
_opCtx, uuidCapped, 1, sampleDocForInsert.objsize());
|
||||
wuow.commit();
|
||||
const auto expectedCommittedCount = i + 1;
|
||||
replicated_fast_count_test_helpers::checkCommittedFastCountChanges(
|
||||
uuidCapped,
|
||||
_fastCountManager,
|
||||
expectedCommittedCount,
|
||||
expectedCommittedCount * sampleDocForInsert.objsize());
|
||||
if (GetParam() == CapType::kCount) {
|
||||
// The size parameter is not optional when specifying the max count. We intentionally make
|
||||
// the size larger here so the count is the limiting bound.
|
||||
ASSERT_OK(createCollection(_opCtx,
|
||||
nssCapped.dbName(),
|
||||
BSON("create" << nssCapped.coll() << "capped" << true << "size"
|
||||
<< maxDocs * sampleDocForInsert.objsize() * 5
|
||||
<< "max" << maxDocs)));
|
||||
} else {
|
||||
ASSERT_OK(createCollection(_opCtx,
|
||||
nssCapped.dbName(),
|
||||
BSON("create" << nssCapped.coll() << "capped" << true << "size"
|
||||
<< maxDocs * sampleDocForInsert.objsize())));
|
||||
}
|
||||
|
||||
// Insert more docs. Our committed count and size should still be at the cap.
|
||||
for (int i = cappedCollMaxCount + 1; i < 3 * cappedCollMaxCount; ++i) {
|
||||
WriteUnitOfWork wuow(_opCtx);
|
||||
ASSERT_OK(Helpers::insert(_opCtx, *cappedColl, docGeneratorForInsert(i)));
|
||||
// Insert + delete of same size cancel each other out.
|
||||
replicated_fast_count_test_helpers::checkUncommittedFastCountChanges(
|
||||
_opCtx, uuidCapped, 0, 0);
|
||||
wuow.commit();
|
||||
replicated_fast_count_test_helpers::checkCommittedFastCountChanges(
|
||||
uuidCapped,
|
||||
_fastCountManager,
|
||||
cappedCollMaxCount,
|
||||
cappedCollMaxCount * sampleDocForInsert.objsize());
|
||||
AutoGetCollection cappedColl(_opCtx, nssCapped, LockMode::MODE_IX);
|
||||
|
||||
for (int i = 0; i < maxDocs + 5; ++i) {
|
||||
// Using the query-level InsertCommandRequest path here lets us avoid handling capped
|
||||
// collection multi-timestamp constraints manually.
|
||||
write_ops::InsertCommandRequest insertCmd(nssCapped);
|
||||
insertCmd.setDocuments({docGeneratorForInsert(i)});
|
||||
const auto result = write_ops_exec::performInserts(_opCtx, insertCmd);
|
||||
ASSERT_EQ(result.results.size(), 1);
|
||||
ASSERT_OK(result.results[0].getStatus());
|
||||
|
||||
if (i < maxDocs - 1) {
|
||||
const auto [actualSize, actualCount] = cappedColl->latestSizeCount(_opCtx);
|
||||
const long long expectedCount = i + 1;
|
||||
EXPECT_EQ(actualSize, expectedCount * sampleDocForInsert.objsize());
|
||||
EXPECT_EQ(actualCount, expectedCount);
|
||||
} else {
|
||||
// After the collection cap has been reached, the size and count should stay the same.
|
||||
const auto [actualSize, actualCount] = cappedColl->latestSizeCount(_opCtx);
|
||||
EXPECT_EQ(actualSize, maxDocs * sampleDocForInsert.objsize());
|
||||
EXPECT_EQ(actualCount, maxDocs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ReplicatedFastCountTest, CappedDeletesUpdateFastCountWhenHittingCapSize) {
|
||||
RAIIServerParameterControllerForTest featureFlag("featureFlagReplicatedFastCount", true);
|
||||
|
||||
// Make this an unreplicated block so that the capped insert and delete combo doesn't violate
|
||||
// the multi‑timestamp constraint.
|
||||
repl::UnreplicatedWritesBlock uwb(_opCtx);
|
||||
|
||||
const int cappedCollMaxCount = 5;
|
||||
|
||||
NamespaceString nssCapped = NamespaceString::createNamespaceString_forTest(
|
||||
"replicated_fast_count_test", "cappedWithMaxSize");
|
||||
|
||||
auto uuidCapped = UUID::gen();
|
||||
|
||||
ASSERT_OK(
|
||||
createCollection(_opCtx,
|
||||
nssCapped.dbName(),
|
||||
BSON("create" << nssCapped.coll() << "capped" << true << "size"
|
||||
<< cappedCollMaxCount * sampleDocForInsert.objsize())));
|
||||
|
||||
// Insert up until the max capped doc size.
|
||||
AutoGetCollection cappedColl(_opCtx, nssCapped, LockMode::MODE_IX);
|
||||
|
||||
uuidCapped = cappedColl->uuid();
|
||||
|
||||
for (int i = 0; i < cappedCollMaxCount; ++i) {
|
||||
WriteUnitOfWork wuow(_opCtx);
|
||||
ASSERT_OK(Helpers::insert(_opCtx, *cappedColl, docGeneratorForInsert(i)));
|
||||
replicated_fast_count_test_helpers::checkUncommittedFastCountChanges(
|
||||
_opCtx, uuidCapped, 1, sampleDocForInsert.objsize());
|
||||
wuow.commit();
|
||||
const auto expectedCommittedCount = i + 1;
|
||||
replicated_fast_count_test_helpers::checkCommittedFastCountChanges(
|
||||
uuidCapped,
|
||||
_fastCountManager,
|
||||
expectedCommittedCount,
|
||||
expectedCommittedCount * sampleDocForInsert.objsize());
|
||||
}
|
||||
|
||||
// Insert more docs. Our committed count and size should still be at the cap.
|
||||
for (int i = cappedCollMaxCount + 1; i < 3 * cappedCollMaxCount; ++i) {
|
||||
WriteUnitOfWork wuow(_opCtx);
|
||||
ASSERT_OK(Helpers::insert(_opCtx, *cappedColl, docGeneratorForInsert(i)));
|
||||
// Insert + delete of same size cancel each other out.
|
||||
replicated_fast_count_test_helpers::checkUncommittedFastCountChanges(
|
||||
_opCtx, uuidCapped, 0, 0);
|
||||
wuow.commit();
|
||||
replicated_fast_count_test_helpers::checkCommittedFastCountChanges(
|
||||
uuidCapped,
|
||||
_fastCountManager,
|
||||
cappedCollMaxCount,
|
||||
cappedCollMaxCount * sampleDocForInsert.objsize());
|
||||
}
|
||||
}
|
||||
INSTANTIATE_TEST_SUITE_P(,
|
||||
ReplicatedFastCountCappedCollectionTest,
|
||||
::testing::Values(CapType::kCount, CapType::kSize));
|
||||
|
||||
TEST_F(ReplicatedFastCountTest, ReplicatedFastCountDoesNotTrackLocalCollections) {
|
||||
const NamespaceString internalNss =
|
||||
|
||||
@ -42,8 +42,13 @@ namespace mongo::replicated_fast_count {
|
||||
|
||||
boost::optional<SizeCountStore::Entry> SizeCountStore::read(OperationContext* opCtx,
|
||||
UUID uuid) const {
|
||||
const auto acquisition = acquireFastCountCollectionForRead(opCtx).value();
|
||||
const CollectionPtr& coll = acquisition.getCollectionPtr();
|
||||
const auto acquisition = acquireFastCountCollectionForRead(opCtx);
|
||||
if (!acquisition.has_value()) {
|
||||
// TODO(SERVER-123051): Revisit this.
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
const CollectionPtr& coll = acquisition->getCollectionPtr();
|
||||
const RecordId rid =
|
||||
record_id_helpers::keyForDoc(BSON("_id" << uuid),
|
||||
clustered_util::makeDefaultClusteredIdIndex().getIndexSpec(),
|
||||
|
||||
@ -44,6 +44,12 @@ namespace {
|
||||
|
||||
class SizeCountStoreTest : public CatalogTestFixture {};
|
||||
|
||||
TEST_F(SizeCountStoreTest, ReadReturnsNoneWhenCollectionDoesNotExist) {
|
||||
const SizeCountStore store;
|
||||
|
||||
EXPECT_FALSE(store.read(operationContext(), UUID::gen()).has_value());
|
||||
}
|
||||
|
||||
TEST_F(SizeCountStoreTest, ReadReturnsNoneWhenDocumentDoesNotExist) {
|
||||
ASSERT_OK(createReplicatedFastCountCollection(storageInterface(), operationContext()));
|
||||
const SizeCountStore store;
|
||||
|
||||
@ -80,8 +80,12 @@ boost::optional<CollectionOrViewAcquisition> acquireTimestampCollectionForWrite(
|
||||
} // namespace
|
||||
|
||||
boost::optional<Timestamp> SizeCountTimestampStore::read(OperationContext* opCtx) const {
|
||||
const auto acquisition = acquireTimestampCollectionForRead(opCtx).value();
|
||||
const CollectionPtr& coll = acquisition.getCollectionPtr();
|
||||
const auto acquisition = acquireTimestampCollectionForRead(opCtx);
|
||||
if (!acquisition.has_value()) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
const CollectionPtr& coll = acquisition->getCollectionPtr();
|
||||
const RecordId rid =
|
||||
record_id_helpers::keyForDoc(BSON("_id" << kTimestampDocId),
|
||||
clustered_util::makeDefaultClusteredIdIndex().getIndexSpec(),
|
||||
|
||||
@ -51,6 +51,12 @@ TEST_F(SizeCountTimestampStoreTest, WriteMassertsWithoutWriteUnitOfWork) {
|
||||
ASSERT_THROWS_CODE(store.write(operationContext(), Timestamp(10, 1)), DBException, 12280400);
|
||||
}
|
||||
|
||||
TEST_F(SizeCountTimestampStoreTest, ReadReturnsNoneWhenCollectionDoesNotExist) {
|
||||
const SizeCountTimestampStore store;
|
||||
|
||||
EXPECT_FALSE(store.read(operationContext()).has_value());
|
||||
}
|
||||
|
||||
TEST_F(SizeCountTimestampStoreTest, ReadReturnsNoneWhenDocumentDoesNotExist) {
|
||||
ASSERT_OK(createReplicatedFastCountTimestampCollection(storageInterface(), operationContext()));
|
||||
const SizeCountTimestampStore store;
|
||||
|
||||
@ -507,6 +507,10 @@ public:
|
||||
return _coll->dataSize(opCtx);
|
||||
}
|
||||
|
||||
CollectionSizeCount latestSizeCount(OperationContext* opCtx) const override {
|
||||
return _coll->latestSizeCount(opCtx);
|
||||
}
|
||||
|
||||
CollectionSizeCount persistedSizeCount(OperationContext* opCtx) const override {
|
||||
return _coll->persistedSizeCount(opCtx);
|
||||
}
|
||||
|
||||
@ -680,6 +680,20 @@ public:
|
||||
|
||||
virtual long long numRecords(OperationContext* opCtx) const = 0;
|
||||
|
||||
/**
|
||||
* Returns uncompressed collection data size in bytes.
|
||||
*/
|
||||
virtual long long dataSize(OperationContext* opCtx) const = 0;
|
||||
|
||||
/**
|
||||
* Returns the collection's uncompressed data size and number of records as of the last
|
||||
* committed change.
|
||||
*
|
||||
* WARNING: This function may be much less performant than `persistedSizeCount()`. Only use
|
||||
* `latestSizeCount()` when precise size/count information is required for correctness.
|
||||
*/
|
||||
virtual CollectionSizeCount latestSizeCount(OperationContext* opCtx) const = 0;
|
||||
|
||||
/**
|
||||
* Returns the last persisted uncompressed collection data size and number of records.
|
||||
*
|
||||
@ -688,11 +702,6 @@ public:
|
||||
*/
|
||||
virtual CollectionSizeCount persistedSizeCount(OperationContext* opCtx) const = 0;
|
||||
|
||||
/**
|
||||
* Return uncompressed collection data size in bytes
|
||||
*/
|
||||
virtual long long dataSize(OperationContext* opCtx) const = 0;
|
||||
|
||||
/**
|
||||
* Return the size on disk in bytes
|
||||
*/
|
||||
|
||||
@ -65,6 +65,7 @@
|
||||
#include "mongo/db/matcher/expression.h"
|
||||
#include "mongo/db/matcher/expression_tree.h"
|
||||
#include "mongo/db/matcher/extensions_callback_noop.h"
|
||||
#include "mongo/db/op_observer/batched_write_context.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/pipeline/expression_context.h"
|
||||
#include "mongo/db/pipeline/expression_context_builder.h"
|
||||
@ -798,12 +799,28 @@ bool CollectionImpl::isCappedAndNeedsDelete(OperationContext* opCtx) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (dataSize(opCtx) > getCollectionOptions().cappedSize) {
|
||||
// When writes are batched, the capped collection insert is not written to the oplog until the
|
||||
// top-level WriteUnitOfWork commits. When writes are not batched, the capped collection insert
|
||||
// is written to the oplog immediately. latestSizeCount() scans the oplog to compute the latest
|
||||
// collection size/count, so it misses the latest insert when writes are batched. To correctly
|
||||
// compute currentDataSize and currentNumRecords, we include the uncommitted size/count changes
|
||||
// if and only if writes are batched.
|
||||
const bool batched = BatchedWriteContext::get(opCtx).writesAreBatched();
|
||||
const CollectionSizeCount uncommittedChanges = (batched)
|
||||
? UncommittedFastCountChange::getForRead(opCtx).find(uuid())
|
||||
: CollectionSizeCount{.size = 0, .count = 0};
|
||||
|
||||
const auto [latestSize, latestCount] = latestSizeCount(opCtx);
|
||||
|
||||
const long long currentDataSize = latestSize + uncommittedChanges.size;
|
||||
const long long currentNumRecords = latestCount + uncommittedChanges.count;
|
||||
|
||||
if (currentDataSize > getCollectionOptions().cappedSize) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto cappedMaxDocs = getCollectionOptions().cappedMaxDocs;
|
||||
if ((cappedMaxDocs != 0) && (numRecords(opCtx) > cappedMaxDocs)) {
|
||||
if ((cappedMaxDocs != 0) && (currentNumRecords > cappedMaxDocs)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1118,6 +1135,13 @@ long long CollectionImpl::dataSize(OperationContext* opCtx) const {
|
||||
: _shared->_recordStore->dataSize();
|
||||
}
|
||||
|
||||
CollectionSizeCount CollectionImpl::latestSizeCount(OperationContext* opCtx) const {
|
||||
return (isReplicatedFastCountEnabled(opCtx) && isReplicatedFastCountEligible(_ns))
|
||||
? ReplicatedFastCountManager::get(opCtx->getServiceContext()).findLatest(opCtx, uuid())
|
||||
: CollectionSizeCount{_shared->_recordStore->dataSize(),
|
||||
_shared->_recordStore->numRecords()};
|
||||
}
|
||||
|
||||
CollectionSizeCount CollectionImpl::persistedSizeCount(OperationContext* opCtx) const {
|
||||
return (isReplicatedFastCountEnabled(opCtx) && isReplicatedFastCountEligible(_ns))
|
||||
? ReplicatedFastCountManager::get(opCtx->getServiceContext()).findPersisted(opCtx, uuid())
|
||||
|
||||
@ -279,6 +279,8 @@ public:
|
||||
|
||||
long long dataSize(OperationContext* opCtx) const final;
|
||||
|
||||
CollectionSizeCount latestSizeCount(OperationContext* opCtx) const final;
|
||||
|
||||
CollectionSizeCount persistedSizeCount(OperationContext* opCtx) const final;
|
||||
|
||||
int64_t sizeOnDisk(OperationContext* opCtx, const StorageEngine& storageEngine) const final;
|
||||
|
||||
@ -296,6 +296,10 @@ public:
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
CollectionSizeCount latestSizeCount(OperationContext* opCtx) const override {
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
CollectionSizeCount persistedSizeCount(OperationContext* opCtx) const override {
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
@ -491,6 +491,11 @@ public:
|
||||
return _shared->_recordStore->dataSize();
|
||||
}
|
||||
|
||||
CollectionSizeCount latestSizeCount(OperationContext* opCtx) const final {
|
||||
return CollectionSizeCount{_shared->_recordStore->dataSize(),
|
||||
_shared->_recordStore->numRecords()};
|
||||
}
|
||||
|
||||
CollectionSizeCount persistedSizeCount(OperationContext* opCtx) const final {
|
||||
return CollectionSizeCount{_shared->_recordStore->dataSize(),
|
||||
_shared->_recordStore->numRecords()};
|
||||
|
||||
@ -1407,7 +1407,8 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
|
||||
switch (fastCountType) {
|
||||
case CollectionValidation::FastCountType::legacySizeStorer:
|
||||
if (enforceCount) {
|
||||
if (const auto fastCount = coll->numRecords(opCtx); fastCount != _numRecords) {
|
||||
if (const auto fastCount = coll->latestSizeCount(opCtx).count;
|
||||
fastCount != _numRecords) {
|
||||
results->addError(
|
||||
fmt::format("fast count ({}) does not match number of "
|
||||
"records ({}) for collection '{}'",
|
||||
@ -1417,7 +1418,8 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
if (enforceSize) {
|
||||
if (const auto fastSize = coll->dataSize(opCtx); fastSize != dataSizeTotal) {
|
||||
if (const auto fastSize = coll->latestSizeCount(opCtx).size;
|
||||
fastSize != dataSizeTotal) {
|
||||
results->addError(
|
||||
fmt::format("fast size ({}) does not match data size ({}) for "
|
||||
"collection '{}'",
|
||||
@ -1429,7 +1431,8 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
|
||||
break;
|
||||
case CollectionValidation::FastCountType::replicated:
|
||||
if (enforceCount) {
|
||||
if (const auto fastCount = coll->numRecords(opCtx); fastCount != _numRecords) {
|
||||
if (const auto fastCount = coll->latestSizeCount(opCtx).count;
|
||||
fastCount != _numRecords) {
|
||||
results->addError(
|
||||
fmt::format("replicated fast count ({}) does not match number of "
|
||||
"records ({}) for collection '{}'",
|
||||
@ -1439,7 +1442,8 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
if (enforceSize) {
|
||||
if (const auto fastSize = coll->dataSize(opCtx); fastSize != dataSizeTotal) {
|
||||
if (const auto fastSize = coll->latestSizeCount(opCtx).size;
|
||||
fastSize != dataSizeTotal) {
|
||||
results->addError(
|
||||
fmt::format("replicated fast size ({}) does not match data size ({}) "
|
||||
"for collection '{}'",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user