SERVER-121789 Add update support to containers (#50007)

Co-authored-by: Jan <jsteemann@users.noreply.github.com>
GitOrigin-RevId: d9520a6ed82ff2a41e04f4e0be52fff682ab99ef
This commit is contained in:
Parker Felix 2026-03-31 16:37:08 -04:00 committed by MongoDB Bot
parent eda6c21c82
commit 8c7a3f282a
31 changed files with 630 additions and 9 deletions

View File

@ -86,6 +86,48 @@ Status insert(OperationContext* opCtx,
return Status::OK();
}
Status update(OperationContext* opCtx,
RecoveryUnit& ru,
IntegerKeyedContainer& container,
int64_t key,
std::span<const char> value) {
uassert(ErrorCodes::NotWritablePrimary,
"Not primary while updating container",
repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(
opCtx, NamespaceString::kContainerNamespace));
auto status = container.update(ru, key, value);
if (!status.isOK()) {
return status;
}
opCtx->getServiceContext()->getOpObserver()->onContainerUpdate(
opCtx, container.ident()->getIdent(), key, value);
return Status::OK();
}
Status update(OperationContext* opCtx,
RecoveryUnit& ru,
StringKeyedContainer& container,
std::span<const char> key,
std::span<const char> value) {
uassert(ErrorCodes::NotWritablePrimary,
"Not primary while updating container",
repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(
opCtx, NamespaceString::kContainerNamespace));
auto status = container.update(ru, key, value);
if (!status.isOK()) {
return status;
}
opCtx->getServiceContext()->getOpObserver()->onContainerUpdate(
opCtx, container.ident()->getIdent(), key, value);
return Status::OK();
}
Status remove(OperationContext* opCtx,
RecoveryUnit& ru,
IntegerKeyedContainer& container,

View File

@ -61,6 +61,26 @@ Status insert(OperationContext* opCtx,
std::span<const char> value,
container::ExistingKeyPolicy policy);
/**
* Updates the value at the given key in the container and logs the operation in the oplog.
* The key must already exist.
*/
Status update(OperationContext* opCtx,
RecoveryUnit& ru,
IntegerKeyedContainer& container,
int64_t key,
std::span<const char> value);
/**
* Updates the value at the given key in the container and logs the operation in the oplog.
* The key must already exist.
*/
Status update(OperationContext* opCtx,
RecoveryUnit& ru,
StringKeyedContainer& container,
std::span<const char> key,
std::span<const char> value);
/**
* Removes from the given container and logs the operation in the oplog.
*/

View File

@ -63,12 +63,13 @@ void BatchedWriteContext::addBatchedOperation(OperationContext* opCtx,
invariant(_batchWrites);
assertNoMixedBatchedOps(/*isDDL=*/false);
// Current support is limited to only insert, update, delete, container insert, and container
// delete operations. No multi-doc transactions.
// Current support is limited to only insert, update, delete, and container operations. No
// multi-doc transactions.
invariant(operation.getOpType() == repl::OpTypeEnum::kDelete ||
operation.getOpType() == repl::OpTypeEnum::kInsert ||
operation.getOpType() == repl::OpTypeEnum::kUpdate ||
operation.getOpType() == repl::OpTypeEnum::kContainerInsert ||
operation.getOpType() == repl::OpTypeEnum::kContainerUpdate ||
operation.getOpType() == repl::OpTypeEnum::kContainerDelete);
invariant(!opCtx->inMultiDocumentTransaction());
invariant(shard_role_details::getLocker(opCtx)->inAWriteUnitOfWork());

View File

@ -309,6 +309,16 @@ public:
std::span<const char> key,
std::span<const char> value) = 0;
virtual void onContainerUpdate(OperationContext* opCtx,
StringData ident,
int64_t key,
std::span<const char> value) = 0;
virtual void onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::span<const char> key,
std::span<const char> value) = 0;
virtual void onContainerDelete(OperationContext* opCtx, StringData ident, int64_t key) = 0;
virtual void onContainerDelete(OperationContext* opCtx,

View File

@ -78,6 +78,7 @@
#include "mongo/db/shard_role/shard_catalog/scoped_collection_metadata.h"
#include "mongo/db/shard_role/transaction_resources.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/db/storage/container.h"
#include "mongo/db/storage/record_data.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/recovery_unit.h"
@ -1303,8 +1304,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
namespace {
BSONObj buildContainerOpObject(std::variant<int64_t, std::span<const char>> key,
boost::optional<std::span<const char>> value = boost::none) {
BSONObj buildContainerOpObject(
std::variant<int64_t, std::span<const char>> key,
boost::optional<std::span<const char>> value = boost::none,
boost::optional<container::UpdateOplogEntryVersion> version = boost::none) {
BSONObjBuilder builder;
std::visit(OverloadedVisitor{[&builder](int64_t key) { builder.append("k", key); },
[&builder](std::span<const char> key) {
@ -1315,6 +1318,9 @@ BSONObj buildContainerOpObject(std::variant<int64_t, std::span<const char>> key,
if (value) {
builder.appendBinData("v", value->size(), BinDataType::BinDataGeneral, value->data());
}
if (version) {
builder.append("$v", static_cast<int64_t>(*version));
}
return builder.obj();
}
@ -1384,7 +1390,7 @@ void _onContainerInsert(OperationContext* opCtx,
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
// We don't have any StmtId's so we don't need to call onWriteOpCompleted().
onWriteOpCompleted(opCtx, {kUninitializedStmtId}, sessionTxnRecord, ns);
}
OpTimeBundle logContainerDelete(OperationContext* opCtx,
@ -1451,7 +1457,78 @@ void _onContainerDelete(OperationContext* opCtx,
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
// We don't have any StmtId's so we don't need to call onWriteOpCompleted().
onWriteOpCompleted(opCtx, {kUninitializedStmtId}, sessionTxnRecord, ns);
}
OpTimeBundle logContainerUpdate(OperationContext* opCtx,
StringData container,
std::variant<int64_t, std::span<const char>> key,
std::span<const char> value,
OperationLogger& logger) {
const auto& ns = NamespaceString::kContainerNamespace;
MutableOplogEntry entry;
entry.setTid(ns.tenantId());
entry.setNss(ns);
entry.setContainer(container);
entry.setOpType(repl::OpTypeEnum::kContainerUpdate);
entry.setObject(
buildContainerOpObject(key, value, container::UpdateOplogEntryVersion::kFullReplacementV1));
OpTimeBundle opTime;
opTime.writeOpTime = logOperation(opCtx, &entry, true /*assignCommonFields*/, &logger);
opTime.wallClockTime = entry.getWallClockTime();
return opTime;
}
void _onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::variant<int64_t, std::span<const char>> key,
std::span<const char> value,
OperationLogger& logger) {
const auto& ns = NamespaceString::kContainerNamespace;
auto oplogDisabled = repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns);
auto txnParticipant = TransactionParticipant::get(opCtx);
auto inMultiDocumentTransaction =
txnParticipant && !oplogDisabled && txnParticipant.transactionIsOpen();
auto inBatchedWrite = BatchedWriteContext::get(opCtx).writesAreBatched();
auto makeOp = [&] {
repl::ReplOperation op;
op.setOpType(repl::OpTypeEnum::kContainerUpdate);
op.setVersionContextIfHasOperationFCV(VersionContext::getDecoration(opCtx));
op.setTid(ns.tenantId());
op.setNss(ns);
op.setContainer(ident);
op.setObject(buildContainerOpObject(
key, value, container::UpdateOplogEntryVersion::kFullReplacementV1));
return op;
};
if (inBatchedWrite) {
BatchedWriteContext::get(opCtx).addBatchedOperation(opCtx, makeOp());
return;
}
if (inMultiDocumentTransaction) {
txnParticipant.addTransactionOperation(opCtx, makeOp());
return;
}
if (_skipOplogOps(oplogDisabled,
inBatchedWrite,
inMultiDocumentTransaction,
ns,
{kUninitializedStmtId})) {
return;
}
auto opTime = logContainerUpdate(opCtx, ident, key, value, logger);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
onWriteOpCompleted(opCtx, {kUninitializedStmtId}, sessionTxnRecord, ns);
}
} // namespace
@ -1470,6 +1547,20 @@ void OpObserverImpl::onContainerInsert(OperationContext* opCtx,
_onContainerInsert(opCtx, ident, key, value, *_operationLogger);
}
void OpObserverImpl::onContainerUpdate(OperationContext* opCtx,
StringData ident,
int64_t key,
std::span<const char> value) {
_onContainerUpdate(opCtx, ident, key, value, *_operationLogger);
}
void OpObserverImpl::onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::span<const char> key,
std::span<const char> value) {
_onContainerUpdate(opCtx, ident, key, value, *_operationLogger);
}
void OpObserverImpl::onContainerDelete(OperationContext* opCtx, StringData ident, int64_t key) {
_onContainerDelete(opCtx, ident, key, *_operationLogger);
}
@ -2200,6 +2291,7 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx,
[[fallthrough]];
}
case repl::OpTypeEnum::kContainerDelete:
case repl::OpTypeEnum::kContainerUpdate:
case repl::OpTypeEnum::kContainerInsert: {
auto opTime = logOperation(
opCtx, &oplogEntry, true /*assignCommonFields*/, _operationLogger.get());

View File

@ -146,6 +146,16 @@ public:
std::span<const char> key,
std::span<const char> value) final;
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
int64_t key,
std::span<const char> value) final;
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::span<const char> key,
std::span<const char> value) final;
void onContainerDelete(OperationContext* opCtx, StringData ident, int64_t key) final;
void onContainerDelete(OperationContext* opCtx,

View File

@ -118,6 +118,16 @@ public:
std::span<const char> key,
std::span<const char> value) override {}
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
int64_t key,
std::span<const char> value) override {}
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::span<const char> key,
std::span<const char> value) override {}
void onContainerDelete(OperationContext* opCtx, StringData ident, int64_t key) override {}
void onContainerDelete(OperationContext* opCtx,

View File

@ -307,6 +307,26 @@ public:
}
}
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
int64_t key,
std::span<const char> value) override {
ReservedTimes times{opCtx};
for (auto&& observer : _observers) {
observer->onContainerUpdate(opCtx, ident, key, value);
}
}
void onContainerUpdate(OperationContext* opCtx,
StringData ident,
std::span<const char> key,
std::span<const char> value) override {
ReservedTimes times{opCtx};
for (auto&& observer : _observers) {
observer->onContainerUpdate(opCtx, ident, key, value);
}
}
void onContainerDelete(OperationContext* opCtx, StringData ident, int64_t key) override {
ReservedTimes times{opCtx};
for (auto&& observer : _observers) {

View File

@ -616,6 +616,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// should have been filtered out by the change stream's oplog match filter already.
tasserted(11888300, "Change stream encountered unexpected 'ci' oplog entry");
}
case repl::OpTypeEnum::kContainerUpdate: {
// Container update ('cu') oplog entries should not show up in change streams. They
// should have been filtered out by the change stream's oplog match filter already.
tasserted(12178900, "Change stream encountered unexpected 'cu' oplog entry");
}
case repl::OpTypeEnum::kContainerDelete: {
// Container delete ('cd') oplog entries should not show up in change streams. They
// should have been filtered out by the change stream's oplog match filter already.
@ -633,7 +638,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// simply adjust the number of expected oplog entry types below. If the new oplog entry
// type needs to be handled in change streams, add it to the code below and also to the
// change stream oplog match filter.
constexpr size_t kExpectedOplogEntryTypes = 8;
constexpr size_t kExpectedOplogEntryTypes = 9;
static_assert(
idlEnumCount<repl::OpTypeEnum> == kExpectedOplogEntryTypes,
"unexpected number of oplog entry types - when adding a new oplog entry type, "

View File

@ -498,6 +498,20 @@ DEATH_TEST_REGEX(ChangeStreamEventTransformDeathTest,
ASSERT_THROWS_CODE(applyTransformation(oplogEntry), AssertionException, 11888300);
}
DEATH_TEST_REGEX(ChangeStreamEventTransformDeathTest,
TestUnexpectedContainerUpdate,
"Tripwire assertion.*12178900") {
// Container update ("cu") oplog type is not supported by the event transformer.
auto oplogEntry =
repl::makeContainerUpdateOplogEntry(repl::OpTime(Timestamp(10, 10), 1 /* term */),
"containerIdent"_sd,
1LL,
BSONBinData("V", 1, BinDataGeneral));
// This will tassert in the event transformer, because it does not expect a 'cu' oplog entry.
ASSERT_THROWS_CODE(applyTransformation(oplogEntry), AssertionException, 12178900);
}
DEATH_TEST_REGEX(ChangeStreamEventTransformDeathTest,
TestUnexpectedContainerDelete,
"Tripwire assertion.*11888301") {

View File

@ -67,7 +67,7 @@ BSONObj getCRUDOplogEntryTypesFilter() {
// expected oplog entry types below. If the new oplog entry type needs to be handled in change
// streams, add it to the code below and also to the change stream event transformer, which
// transforms oplog entries into change events.
constexpr size_t kExpectedOplogEntryTypes = 8;
constexpr size_t kExpectedOplogEntryTypes = 9;
static_assert(idlEnumCount<repl::OpTypeEnum> == kExpectedOplogEntryTypes,
"unexpected number of oplog entry types - when adding a new oplog entry type, "
"please make sure that the change stream oplog filter handles it correctly!");
@ -81,6 +81,7 @@ BSONObj getCRUDOplogEntryTypesFilter() {
// - "c": command
// - "n": no-op
// - "ci": container insert
// - "cu": container update
// - "cd": container delete
// - "km": key material
return BSON("$in" << BSON_ARRAY("d" << "i" << "u"));

View File

@ -168,6 +168,98 @@ TEST_F(ApplyContainerOpsTest, ContainerOpApplyByteKey) {
ASSERT_EQ(0, std::memcmp(g3.getValue().get(), v3.data, v3.length));
}
TEST_F(ApplyContainerOpsTest, ContainerOpUpdateByteKey) {
const char k1[] = "K1", k2[] = "K2";
auto v1 = BSONBinData("A", 1, BinDataGeneral);
auto v2 = BSONBinData("B", 1, BinDataGeneral);
auto v1New = BSONBinData("X", 1, BinDataGeneral);
auto v2New = BSONBinData("Y", 1, BinDataGeneral);
auto makeInsert = [&](BSONBinData k, BSONBinData v) {
return makeContainerInsertOplogEntry(OpTime(), _bytesIdent, k, v);
};
auto makeUpdate = [&](BSONBinData k, BSONBinData v) {
return makeContainerUpdateOplogEntry(OpTime(), _bytesIdent, k, v);
};
auto makeGet = [&](auto k) {
return _get(_opCtx.get(), _bytesIdent, std::span<const char>(k, strlen(k)));
};
// Insert initial values
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeInsert({k1, 2, BinDataGeneral}, v1)));
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeInsert({k2, 2, BinDataGeneral}, v2)));
// Update values
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeUpdate({k1, 2, BinDataGeneral}, v1New)));
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeUpdate({k2, 2, BinDataGeneral}, v2New)));
// Read and verify updated values
auto g1 = makeGet(k1);
auto g2 = makeGet(k2);
ASSERT_OK(g1.getStatus());
ASSERT_OK(g2.getStatus());
ASSERT_EQ(0, std::memcmp(g1.getValue().get(), v1New.data, v1New.length));
ASSERT_EQ(0, std::memcmp(g2.getValue().get(), v2New.data, v2New.length));
}
TEST_F(ApplyContainerOpsTest, ContainerOpUpdateIntKey) {
int64_t k1 = 1, k2 = 2;
auto v1 = BSONBinData("A", 1, BinDataGeneral);
auto v2 = BSONBinData("B", 1, BinDataGeneral);
auto v1New = BSONBinData("X", 1, BinDataGeneral);
auto v2New = BSONBinData("Y", 1, BinDataGeneral);
auto makeInsert = [&](int64_t k, BSONBinData v) {
return makeContainerInsertOplogEntry(OpTime(), _intIdent, k, v);
};
auto makeUpdate = [&](int64_t k, BSONBinData v) {
return makeContainerUpdateOplogEntry(OpTime(), _intIdent, k, v);
};
auto makeGet = [&](int64_t k) {
return _get(_opCtx.get(), _intIdent, k);
};
// Insert initial values
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeInsert(k1, v1)));
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeInsert(k2, v2)));
// Update values
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeUpdate(k1, v1New)));
ASSERT_OK(applyContainerOpHelper(_opCtx.get(), makeUpdate(k2, v2New)));
// Read and verify updated values
auto g1 = makeGet(k1);
auto g2 = makeGet(k2);
ASSERT_OK(g1.getStatus());
ASSERT_OK(g2.getStatus());
ASSERT_EQ(0, std::memcmp(g1.getValue().get(), v1New.data, v1New.length));
ASSERT_EQ(0, std::memcmp(g2.getValue().get(), v2New.data, v2New.length));
}
TEST_F(ApplyContainerOpsTest, ContainerOpUpdateNonexistentKeyFails) {
int64_t kInserted = 1;
int64_t kMissing = 2;
auto v = BSONBinData("A", 1, BinDataGeneral);
// Insert a key so the container is non-empty.
ASSERT_OK(applyContainerOpHelper(
_opCtx.get(), makeContainerInsertOplogEntry(OpTime(), _intIdent, kInserted, v)));
// Updating a different, non-existent key should fail.
auto entry = makeContainerUpdateOplogEntry(OpTime(), _intIdent, kMissing, v);
auto status = applyContainerOpHelper(_opCtx.get(), entry);
ASSERT_EQ(status.code(), ErrorCodes::NoSuchKey);
}
TEST_F(ApplyContainerOpsTest, ContainerOpUpdateOplogVersion) {
// Container update oplog entries should have version 1.
int64_t k = 1;
auto v = BSONBinData("V", 1, BinDataGeneral);
auto entry = makeContainerUpdateOplogEntry(OpTime(), _intIdent, k, v);
ASSERT_EQ(entry.getObject()["$v"].safeNumberInt(),
static_cast<int64_t>(container::UpdateOplogEntryVersion::kFullReplacementV1));
}
TEST_F(ApplyContainerOpsTest, ContainerOpApplyIntKey) {
int64_t k1 = 1, k2 = 2, k3 = 3, k4 = 4;
auto v1 = BSONBinData("A", 1, BinDataGeneral);
@ -332,6 +424,57 @@ TEST_F(ApplyContainerOpsTest, ContainerOpsRejectMismatchedExistingCommitTimestam
ru->clearCommitTimestamp();
}
TEST_F(ApplyContainerOpsTest, ParseContainerUpdateFormatFailures) {
int64_t k = 1;
auto v = BSONBinData("V", 1, BinDataGeneral);
auto wrongTypeV = "notBinData";
auto version = static_cast<int64_t>(container::UpdateOplogEntryVersion::kFullReplacementV1);
auto base = [&]() {
return makeBaseParams(_nss,
_intIdent,
OpTypeEnum::kContainerUpdate,
BSON("k" << k << "v" << v << "$v" << version));
};
// missing container
{
auto p = base();
p.container = boost::none;
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 10704701);
}
// missing key
{
auto p = base();
p.oField = BSON("v" << v << "$v" << 1);
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 10704702);
}
// missing $v
{
auto p = base();
p.oField = BSON("k" << k << "v" << v);
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 12178904);
}
// $v must be numeric
{
auto p = base();
p.oField = BSON("k" << k << "v" << v << "$v" << "notANumber");
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 12178903);
}
// missing value
{
auto p = base();
p.oField = BSON("k" << k << "$v" << 1);
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 12178902);
}
// value type must be binData
{
auto p = base();
p.oField = BSON("k" << k << "v" << wrongTypeV << "$v" << version);
ASSERT_THROWS_CODE(DurableOplogEntry(p), DBException, 12178901);
}
}
TEST_F(ApplyContainerOpsTest, ParseContainerOpFormatFailures) {
int64_t k = 1;
auto v = BSONBinData("V", 1, BinDataGeneral);

View File

@ -132,6 +132,7 @@ Status _applyOps(OperationContext* opCtx,
switch (entry.getOpType()) {
case OpTypeEnum::kContainerInsert:
case OpTypeEnum::kContainerUpdate:
case OpTypeEnum::kContainerDelete: {
if (oplogApplicationMode == OplogApplication::Mode::kApplyOpsCmd) {
uassert(ErrorCodes::InvalidOptions,

View File

@ -2945,6 +2945,15 @@ Status applyContainerOperation(OperationContext* opCtx,
});
break;
}
case repl::OpTypeEnum::kContainerUpdate: {
int vlen = 0;
const char* v = o["v"].binData(vlen);
const std::span<const char> val{v, static_cast<size_t>(vlen)};
s = withKey(k, [&](auto key) {
return storage_engine_direct_crud::update(*engine, *ru, *ident, key, val);
});
break;
}
case repl::OpTypeEnum::kContainerDelete: {
s = withKey(k, [&](auto key) {
return storage_engine_direct_crud::remove(*engine, *ru, *ident, key);

View File

@ -362,6 +362,25 @@ DurableOplogEntry::DurableOplogEntry(BSONObj rawInput) : _raw(std::move(rawInput
vBSON.type() == BSONType::binData);
break;
}
case OpTypeEnum::kContainerUpdate: {
const BSONElement updateVersion = o["$v"];
uassert(12178904,
str::stream() << "missing $v element for update: " << redact(o),
updateVersion);
uassert(12178903,
str::stream()
<< "$v must be numeric, got " << typeName(updateVersion.type()),
updateVersion.isNumber());
const BSONElement vBSON = o["v"];
uassert(12178902,
str::stream() << "missing value element for update: " << redact(o),
vBSON);
uassert(12178901,
str::stream()
<< "value must be type binData, got " << typeName(vBSON.type()),
vBSON.type() == BSONType::binData);
break;
}
case OpTypeEnum::kContainerDelete: {
uassert(10704704,
str::stream() << "delete should not contain value: " << redact(o),
@ -433,6 +452,7 @@ bool DurableOplogEntry::isCrudOpType(OpTypeEnum opType) {
case OpTypeEnum::kUpdate:
return true;
case OpTypeEnum::kContainerInsert:
case OpTypeEnum::kContainerUpdate:
case OpTypeEnum::kContainerDelete:
case OpTypeEnum::kCommand:
case OpTypeEnum::kNoop:
@ -459,6 +479,7 @@ bool DurableOplogEntry::isUpdateOrDelete() const {
case OpTypeEnum::kInsert:
case OpTypeEnum::kCommand:
case OpTypeEnum::kContainerInsert:
case OpTypeEnum::kContainerUpdate:
case OpTypeEnum::kContainerDelete:
case OpTypeEnum::kNoop:
case OpTypeEnum::kKeyMaterial:
@ -468,7 +489,8 @@ bool DurableOplogEntry::isUpdateOrDelete() const {
}
bool DurableOplogEntry::isContainerOpType(OpTypeEnum opType) {
return opType == OpTypeEnum::kContainerInsert || opType == OpTypeEnum::kContainerDelete;
return opType == OpTypeEnum::kContainerInsert || opType == OpTypeEnum::kContainerUpdate ||
opType == OpTypeEnum::kContainerDelete;
}
bool DurableOplogEntry::shouldPrepare() const {

View File

@ -55,6 +55,7 @@ enums:
kUpdate: "u"
kDelete: "d"
kContainerInsert: "ci"
kContainerUpdate: "cu"
kContainerDelete: "cd"
kNoop: "n"
kKeyMaterial: "km"

View File

@ -35,6 +35,7 @@
#include "mongo/db/index_builds/index_builds_common.h"
#include "mongo/db/shard_role/shard_catalog/catalog_raii.h"
#include "mongo/db/shard_role/shard_catalog/collection_catalog.h"
#include "mongo/db/storage/container.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/unittest/assert.h"
@ -141,6 +142,38 @@ OplogEntry makeContainerInsertOplogEntry(OpTime opTime,
}}};
}
OplogEntry makeContainerUpdateOplogEntry(OpTime opTime,
StringData containerIdent,
int64_t key,
BSONBinData value) {
return {DurableOplogEntry{DurableOplogEntryParams{
.opTime = opTime,
.opType = OpTypeEnum::kContainerUpdate,
.nss = NamespaceString::kContainerNamespace,
.container = containerIdent,
.oField = BSON(
"k" << key << "v" << value << "$v"
<< static_cast<int64_t>(container::UpdateOplogEntryVersion::kFullReplacementV1)),
.wallClockTime = Date_t::now(),
}}};
}
OplogEntry makeContainerUpdateOplogEntry(OpTime opTime,
StringData containerIdent,
BSONBinData key,
BSONBinData value) {
return {DurableOplogEntry{DurableOplogEntryParams{
.opTime = opTime,
.opType = OpTypeEnum::kContainerUpdate,
.nss = NamespaceString::kContainerNamespace,
.container = containerIdent,
.oField = BSON(
"k" << key << "v" << value << "$v"
<< static_cast<int64_t>(container::UpdateOplogEntryVersion::kFullReplacementV1)),
.wallClockTime = Date_t::now(),
}}};
}
OplogEntry makeContainerDeleteOplogEntry(OpTime opTime, StringData containerIdent, int64_t key) {
return {DurableOplogEntry{DurableOplogEntryParams{
.opTime = opTime,

View File

@ -95,6 +95,16 @@ OplogEntry makeContainerInsertOplogEntry(OpTime opTime,
BSONBinData key,
BSONBinData value);
OplogEntry makeContainerUpdateOplogEntry(OpTime opTime,
StringData containerIdent,
int64_t key,
BSONBinData value);
OplogEntry makeContainerUpdateOplogEntry(OpTime opTime,
StringData containerIdent,
BSONBinData key,
BSONBinData value);
OplogEntry makeContainerDeleteOplogEntry(OpTime opTime, StringData containerIdent, int64_t key);
OplogEntry makeContainerDeleteOplogEntry(OpTime opTime, StringData containerIdent, BSONBinData key);

View File

@ -237,6 +237,7 @@ std::vector<OplogEntry> SessionUpdateTracker::_flush(const OplogEntry& entry) {
switch (entry.getOpType()) {
case OpTypeEnum::kInsert:
case OpTypeEnum::kContainerInsert:
case OpTypeEnum::kContainerUpdate:
case OpTypeEnum::kContainerDelete:
case OpTypeEnum::kNoop:
case OpTypeEnum::kKeyMaterial:

View File

@ -110,6 +110,14 @@ public:
return Status::OK();
}
Status update(RecoveryUnit&, int64_t key, std::span<const char> value) override {
auto it = std::find_if(_entries.begin(), _entries.end(), [key](const Entry& entry) {
return entry.first == key;
});
it->second = std::string(value.begin(), value.end());
return Status::OK();
}
Status remove(RecoveryUnit&, int64_t key) override {
_entries.erase(std::find_if(_entries.begin(), _entries.end(), [key](const Entry& entry) {
return entry.first == key;

View File

@ -50,6 +50,17 @@ enum class ExistingKeyPolicy {
reject,
};
/**
* The format for container update oplog entries.
*/
enum class UpdateOplogEntryVersion {
// Full replacement semantics, currently the only supported oplog format.
kFullReplacementV1 = 1,
// Must be last.
kNumVersions
};
} // namespace container
/**
@ -94,6 +105,12 @@ public:
std::span<const char> value,
container::ExistingKeyPolicy policy) = 0;
/**
* Updates the value at the given key. The key must already exist. Must be in an active storage
* transaction.
*/
virtual Status update(RecoveryUnit& ru, int64_t key, std::span<const char> value) = 0;
/**
* Removes the given key (and its corresponding value) from the container. Must be in an active
* storage transaction.
@ -148,6 +165,14 @@ public:
std::span<const char> value,
container::ExistingKeyPolicy policy) = 0;
/**
* Updates the value at the given key. The key must already exist. Must be in an active storage
* transaction.
*/
virtual Status update(RecoveryUnit& ru,
std::span<const char> key,
std::span<const char> value) = 0;
/**
* Removes the given key (and its corresponding value) from the container. Must be in an active
* storage transaction.

View File

@ -250,6 +250,13 @@ public:
return Status::OK();
}
Status updateInIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key,
std::span<const char> value) override {
return Status::OK();
}
StatusWith<UniqueBuffer> getFromIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key) override {

View File

@ -257,6 +257,13 @@ public:
MONGO_UNREACHABLE;
}
Status updateInIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key,
std::span<const char> value) override {
MONGO_UNREACHABLE;
}
StatusWith<UniqueBuffer> getFromIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key) override {

View File

@ -536,6 +536,18 @@ public:
IdentKey key,
std::span<const char> value) = 0;
/**
* Updates the value associated with 'key' in the specified 'ident'. The key must already exist.
* Must be called from within a storage transaction.
*
* Returns OK on success, 'NoSuchKey' if the key does not exist, or the error returned by
* the underlying storage engine on other failures.
*/
virtual Status updateInIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key,
std::span<const char> value) = 0;
/**
* Retrieves the value associated with 'key' from the specified 'ident'.
*

View File

@ -56,6 +56,22 @@ Status insert(StorageEngine& engine,
return engine.getEngine()->insertIntoIdent(ru, ident, key, value);
}
Status update(StorageEngine& engine,
RecoveryUnit& ru,
StringData ident,
std::span<const char> key,
std::span<const char> value) {
return engine.getEngine()->updateInIdent(ru, ident, key, value);
}
Status update(StorageEngine& engine,
RecoveryUnit& ru,
StringData ident,
int64_t key,
std::span<const char> value) {
return engine.getEngine()->updateInIdent(ru, ident, key, value);
}
StatusWith<UniqueBuffer> get(StorageEngine& engine,
RecoveryUnit& ru,
StringData ident,

View File

@ -57,6 +57,20 @@ Status insert(StorageEngine& engine,
int64_t key,
std::span<const char> value);
// This function has the same behavior as KVEngine::updateInIdent()
Status update(StorageEngine& engine,
RecoveryUnit& ru,
StringData ident,
std::span<const char> key,
std::span<const char> value);
// This function has the same behavior as KVEngine::updateInIdent()
Status update(StorageEngine& engine,
RecoveryUnit& ru,
StringData ident,
int64_t key,
std::span<const char> value);
// This function has the same behavior as KVEngine::getFromIdent()
StatusWith<UniqueBuffer> get(StorageEngine& engine,
RecoveryUnit& ru,

View File

@ -43,6 +43,9 @@ public:
container::ExistingKeyPolicy policy) final {
return Status::OK();
}
Status update(RecoveryUnit& ru, int64_t key, std::span<const char> value) final {
return Status::OK();
}
Status remove(RecoveryUnit& ru, int64_t key) final {
return Status::OK();
}
@ -60,6 +63,9 @@ public:
container::ExistingKeyPolicy policy) final {
return Status::OK();
}
Status update(RecoveryUnit& ru, std::span<const char> key, std::span<const char> value) final {
return Status::OK();
}
Status remove(RecoveryUnit& ru, std::span<const char> key) final {
return Status::OK();
}

View File

@ -102,6 +102,25 @@ int WiredTigerIntegerKeyedContainer::insert(WiredTigerRecoveryUnit& ru,
return WT_OP_CHECK(wiredTigerCursorInsert(ru, &cursor));
}
Status WiredTigerIntegerKeyedContainer::update(RecoveryUnit& ru,
int64_t key,
std::span<const char> value) {
auto& wtRu = WiredTigerRecoveryUnit::get(ru);
WiredTigerCursor cursor{getWiredTigerCursorParams(wtRu, tableId()), uri(), *wtRu.getSession()};
wtRu.assertInActiveTxn();
int ret = update(wtRu, *cursor.get(), key, value);
return wtRCToStatus(ret, cursor->session);
}
int WiredTigerIntegerKeyedContainer::update(WiredTigerRecoveryUnit& ru,
WT_CURSOR& cursor,
int64_t key,
std::span<const char> value) {
cursor.set_key(&cursor, key);
cursor.set_value(&cursor, WiredTigerItem{value}.get());
return WT_OP_CHECK(wiredTigerCursorUpdate(ru, &cursor));
}
Status WiredTigerIntegerKeyedContainer::remove(RecoveryUnit& ru, int64_t key) {
auto& wtRu = WiredTigerRecoveryUnit::get(ru);
WiredTigerCursor cursor{getWiredTigerCursorParams(wtRu, tableId()), uri(), *wtRu.getSession()};
@ -176,6 +195,25 @@ int WiredTigerStringKeyedContainer::insert(WiredTigerRecoveryUnit& ru,
return WT_OP_CHECK(wiredTigerCursorInsert(ru, &cursor));
}
Status WiredTigerStringKeyedContainer::update(RecoveryUnit& ru,
std::span<const char> key,
std::span<const char> value) {
auto& wtRu = WiredTigerRecoveryUnit::get(ru);
WiredTigerCursor cursor{getWiredTigerCursorParams(wtRu, tableId()), uri(), *wtRu.getSession()};
wtRu.assertInActiveTxn();
int ret = update(wtRu, *cursor.get(), key, value);
return wtRCToStatus(ret, cursor->session);
}
int WiredTigerStringKeyedContainer::update(WiredTigerRecoveryUnit& ru,
WT_CURSOR& cursor,
std::span<const char> key,
std::span<const char> value) {
cursor.set_key(&cursor, WiredTigerItem{key}.get());
cursor.set_value(&cursor, WiredTigerItem{value}.get());
return WT_OP_CHECK(wiredTigerCursorUpdate(ru, &cursor));
}
Status WiredTigerStringKeyedContainer::remove(RecoveryUnit& ru, std::span<const char> key) {
auto& wtRu = WiredTigerRecoveryUnit::get(ru);
WiredTigerCursor cursor{getWiredTigerCursorParams(wtRu, tableId()), uri(), *wtRu.getSession()};

View File

@ -86,6 +86,13 @@ public:
int64_t key,
std::span<const char> value);
Status update(RecoveryUnit& ru, int64_t key, std::span<const char> value) final;
int update(WiredTigerRecoveryUnit& ru,
WT_CURSOR& cursor,
int64_t key,
std::span<const char> value);
Status remove(RecoveryUnit& ru, int64_t key) final;
int remove(WiredTigerRecoveryUnit& ru, WT_CURSOR& cursor, int64_t key);
@ -120,6 +127,13 @@ public:
std::span<const char> key,
std::span<const char> value);
Status update(RecoveryUnit& ru, std::span<const char> key, std::span<const char> value) final;
int update(WiredTigerRecoveryUnit& ru,
WT_CURSOR& cursor,
std::span<const char> key,
std::span<const char> value);
Status remove(RecoveryUnit& ru, std::span<const char> key) final;
int remove(WiredTigerRecoveryUnit& ru, WT_CURSOR& cursor, std::span<const char> key);

View File

@ -516,6 +516,30 @@ Status WiredTigerKVEngineBase::insertIntoIdent(RecoveryUnit& ru,
return wtRCToStatus(rc, cursor->session);
}
Status WiredTigerKVEngineBase::updateInIdent(RecoveryUnit& ru,
StringData ident,
std::variant<std::span<const char>, int64_t> key,
std::span<const char> value) {
invariant(ru.inUnitOfWork());
auto& wtRu = WiredTigerRecoveryUnit::get(ru);
// TODO (SERVER-109454): `genTableId()` may be replaced with different cache logic.
WiredTigerCursor cursor{getWiredTigerCursorParams(wtRu, WiredTigerUtil::genTableId()),
WiredTigerUtil::buildTableUri(ident),
*wtRu.getSession()};
wtRu.assertInActiveTxn();
WT_CURSOR* c = cursor.get();
setKeyOnCursor(c, key);
c->set_value(c, WiredTigerItem{value}.get());
int rc = WT_OP_CHECK(wiredTigerCursorUpdate(wtRu, c));
if (rc == WT_NOTFOUND)
return Status(ErrorCodes::NoSuchKey, "No such key exists in ident");
return wtRCToStatus(rc, cursor->session);
}
StatusWith<UniqueBuffer> WiredTigerKVEngineBase::getFromIdent(
RecoveryUnit& ru, StringData ident, std::variant<std::span<const char>, int64_t> key) {
auto& wtRu = WiredTigerRecoveryUnit::get(ru);

View File

@ -286,6 +286,11 @@ public:
IdentKey key,
std::span<const char> value) override;
Status updateInIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key,
std::span<const char> value) override;
StatusWith<UniqueBuffer> getFromIdent(RecoveryUnit& ru,
StringData ident,
IdentKey key) override;