SERVER-95570 Avoid deleting the same document twice in BatchedDeleteStage (#39247) (#39421)

Co-authored-by: Wei Hu <wei.hu@mongodb.com>
Co-authored-by: Max Hirschhorn <max.hirschhorn@mongodb.com>
GitOrigin-RevId: 0b11ebf1954025b0b191e80e0d61dede396db84e
This commit is contained in:
Yuhong Zhang 2025-10-07 16:07:35 -04:00 committed by MongoDB Bot
parent 781e2aa218
commit d8e609493f
9 changed files with 126 additions and 52 deletions

View File

@ -22,6 +22,9 @@ selector:
# These workloads use a verbose log level.
- jstests/concurrency/fsm_workloads/ddl/rename_collection/collection_uuid.js
# TODO BACKPORT-25689: reenable this workload after backport to 8.0.
- jstests/concurrency/fsm_workloads/query/remove/update_and_batched_delete.js
exclude_with_any_tags:
- requires_standalone
# The ability to shut down a node while its in the middle of applying ops is required for

View File

@ -468,6 +468,8 @@ last-continuous:
ticket: SERVER-108341
- test_file: jstests/core/timeseries/write/timeseries_insert_mixed_schema_docs.js
ticket: SERVER-107361
- test_file: jstests/concurrency/fsm_workloads/query/remove/update_and_batched_delete.js
ticket: SERVER-95570
suites: null
last-lts:
all:
@ -993,4 +995,6 @@ last-lts:
ticket: SERVER-108341
- test_file: jstests/core/timeseries/write/timeseries_insert_mixed_schema_docs.js
ticket: SERVER-107361
- test_file: jstests/concurrency/fsm_workloads/query/remove/update_and_batched_delete.js
ticket: SERVER-95570
suites: null

View File

@ -0,0 +1,45 @@
/**
* update_and_batched_delete.js
*
* Concurrently run batched deletes and updates which can move the documents physically.
* Adapted from Justin's repro in SERVER-95570.
*
* @tags: [
* # limit: 0 deletes.
* requires_non_retryable_writes,
* ]
*/
const timeToLiveSeconds = 1;
const maxDocuments = 10; // Thanks, Max, for lending us these documents.
export const $config = (function() {
function setup(db, collName, cluster) {
assert.commandWorked(db[collName].createIndex({updateTime: 1}));
}
const states = {
upsert: function upsert(db, collName) {
const now = ISODate();
const documentId = Math.floor(Random.rand() * maxDocuments);
assert.commandWorked(db[collName].update(
{_id: documentId},
{$set: {updateTime: new Date(now - 10000)}, $setOnInsert: {creationTime: now}},
{upsert: true}));
},
delete: function doDelete(db, collName) {
assert.commandWorked(db[collName].deleteMany(
{updateTime: {$lt: new Date(Date.now() - timeToLiveSeconds * 1000)}}));
}
};
return {
threadCount: 12,
iterations: 3000,
startState: 'upsert',
states: states,
setup: setup,
transitions: {upsert: {upsert: 1.0, delete: 1.0}, delete: {upsert: 1.0, delete: 1.0}},
};
})();

View File

@ -69,8 +69,12 @@ function runTest(conn) {
// Verify the change stream emits events for the batched deletion, and capture the events so we
// can test resumability later.
for (let docKey = 0; docKey < totalNumDocs; docKey++) {
assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList);
// The documents are processed in the reverse order for each batch of a batched delete.
for (let batch = 0; batch < serverStatusBatchesAfter; ++batch) {
const currBatchStart = Math.min(totalNumDocs - 1, (batch + 1) * docsPerBatch - 1);
for (let docKey = currBatchStart; docKey >= batch * docsPerBatch; --docKey) {
assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList);
}
}
assertNoChanges(changeStreamCursor);

View File

@ -135,7 +135,7 @@ auto& batchedDeletesSSS =
*ServerStatusSectionBuilder<BatchedDeletesSSS>("batchedDeletes").forShard();
// Wrapper for write_stage_common::ensureStillMatches() which also updates the 'refetchesDueToYield'
// serverStatus metric. As with ensureStillMatches, if false is returned, the WoringSetMember
// serverStatus metric. As with ensureStillMatches, if false is returned, the WorkingSetMember
// referenced by 'id' is no longer valid, and must not be used except for freeing the WSM.
bool ensureStillMatchesAndUpdateStats(const CollectionPtr& collection,
OperationContext* opCtx,
@ -212,7 +212,7 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
if (!_params->isExplain && _commitStagedDeletes) {
// Overwriting 'planStageState' potentially means throwing away the result produced from
// staging. We expect to commit deletes after a new documet is staged and the batch targets
// staging. We expect to commit deletes after a new document is staged and the batch targets
// are met (planStageState = PlanStage::NEED_TIME), after there are no more documents to
// stage (planStageState = PlanStage::IS_EOF), or when resuming to commit deletes in the
// buffer before more can be staged (planStageState = PlanStage::NEED_TIME by default).
@ -281,7 +281,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
std::set<WorkingSetID> recordsToSkip;
unsigned int docsDeleted = 0;
unsigned int bytesDeleted = 0;
unsigned int bufferOffset = 0;
unsigned int rBufferOffset = 0;
long long timeInBatch = 0;
try {
@ -290,7 +290,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
"BatchedDeleteStage::_deleteBatch",
[&] {
timeInBatch =
_commitBatch(out, &recordsToSkip, &docsDeleted, &bytesDeleted, &bufferOffset);
_commitBatch(out, &recordsToSkip, &docsDeleted, &bytesDeleted, &rBufferOffset);
return PlanStage::NEED_TIME;
},
[&] {
@ -322,11 +322,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
_specificStats.docsDeleted += docsDeleted;
_specificStats.bytesDeleted += bytesDeleted;
if (bufferOffset < _stagedDeletesBuffer.size()) {
// Note: 'rBufferOffset' stores the 0-based index of the last successfully processed buffer
// entry, which is why the total number of documents processed is 1 more.
if (auto docsProcessed = rBufferOffset + 1; docsProcessed < _stagedDeletesBuffer.size()) {
// targetBatchTimeMS was met. Remove staged deletes that have been evaluated
// (executed or skipped because they no longer match the query) from the buffer. If any
// staged deletes remain in the buffer, they will be retried in a subsequent batch.
_stagedDeletesBuffer.eraseUpToOffsetInclusive(bufferOffset);
_stagedDeletesBuffer.removeLastN(docsProcessed);
} else {
// The individual deletes staged in the buffer are preserved until the batch is committed so
// they can be retried in case of a write conflict.
@ -342,7 +344,7 @@ long long BatchedDeleteStage::_commitBatch(WorkingSetID* out,
std::set<WorkingSetID>* recordsToSkip,
unsigned int* docsDeleted,
unsigned int* bytesDeleted,
unsigned int* bufferOffset) {
unsigned int* rBufferOffset) {
// Estimate the size of the oplog entry that would result from committing the batch,
// to ensure we emit an oplog entry that's within the 16MB BSON limit.
size_t applyOpsBytes = kApplyOpsNonArrayEntryPaddingBytes;
@ -354,14 +356,27 @@ long long BatchedDeleteStage::_commitBatch(WorkingSetID* out,
WriteUnitOfWork wuow(opCtx(),
_stagedDeletesBuffer.size() > 1U ? WriteUnitOfWork::kGroupForTransaction
: WriteUnitOfWork::kDontGroup);
for (; *bufferOffset < _stagedDeletesBuffer.size(); ++*bufferOffset) {
// We iterate pending deletes in reverse order of staging to work around duplicate deletions
// that can result when the same document gets staged twice. When the batch of documents to
// delete comes from an index scan, it can contain duplicates in the rare case that a yield
// during the index scan coincides with an update to a document that advances its position
// in the index. This loop accounts for that possibility by refetching the document to
// verify that it still exists and should be deleted, but only when the staged deletion is
// from a prior snapshot, which leaves open the possibility that the second of two deletions
// does not get verified because it is from the current snapshot. Iterating in reverse
// ensures that, in these rare cases, the second time we encounter a document, it will always
// be from a prior snapshot and checked. Avoiding duplicate deletions is important because
// 'collection_level::deleteDocument()' expects its target to exist, and validation errors
// can result when it does not.
for (; *rBufferOffset < _stagedDeletesBuffer.size(); ++*rBufferOffset) {
if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) {
throwWriteConflictException(
str::stream() << "Hit failpoint '"
<< throwWriteConflictExceptionInBatchedDeleteStage.getName() << "'.");
}
auto workingSetMemberID = _stagedDeletesBuffer.at(*bufferOffset);
auto workingSetMemberID =
_stagedDeletesBuffer.at(_stagedDeletesBuffer.size() - 1 - *rBufferOffset);
WorkingSetMember* member = _ws->get(workingSetMemberID);
bool writeToOrphan = _params->fromMigrate;
@ -401,18 +416,18 @@ long long BatchedDeleteStage::_commitBatch(WorkingSetID* out,
tassert(
6515700, "Expected document to have an _id field present", bsonObjDoc.hasField("_id"));
applyOpsBytes += bsonObjDoc.getField("_id").size();
if (applyOpsBytes > BSONObjMaxUserSize && ((*bufferOffset) > 0)) {
if (applyOpsBytes > BSONObjMaxUserSize && ((*rBufferOffset) > 0)) {
// There's no room to fit this deletion in the current batch, as doing so
// would exceed 16MB of oplog entry: put this deletion back into the staging
// buffer and commit the batch. Very large _id fields may exceed this threshold. In that
// case, put them in their own batch.
(*bufferOffset)--;
(*rBufferOffset)--;
wuow.commit();
return batchTimer.millis();
}
tassert(10118000,
"batch size may only exceed BSON cap for single, large documents",
applyOpsBytes <= BSONObjMaxUserSize || ((*bufferOffset) == 0));
applyOpsBytes <= BSONObjMaxUserSize || ((*rBufferOffset) == 0));
collection_internal::deleteDocument(
opCtx(),

View File

@ -141,16 +141,21 @@ private:
// successfully deleted. Returns NEED_YIELD otherwise.
PlanStage::StageState _deleteBatch(WorkingSetID* out);
// Attempts to delete the documents staged for deletion in a WriteUnitOfWork. Updates
// recordsToSkip, docsDeleted, and buffferOffset to reflect which document deletes are skipped,
// executed, or remaining when the WriteUnitOfWork is committed.
// Processes as many deletes in a WriteUnitOfWork as can fit in one applyOps oplog entry, always
// at least one document. Returns the number of docs deleted and their size in bytes in the
// 'docsDeleted' and 'bytesDeleted' output parameters, respectively.
//
// Documents are deleted from the _end_ of the buffer, and 'rBufferOffset' is an in/out
// parameter whose input is the index of the first document to process and whose output is the
// index of the last document processed. In both cases, the index counts _backwards_, with 0 as
// the last element.
//
// Returns the time spent (milliseconds) committing the batch.
long long _commitBatch(WorkingSetID* out,
std::set<WorkingSetID>* recordsToSkip,
unsigned int* docsDeleted,
unsigned int* bytesDeleted,
unsigned int* bufferOffset);
unsigned int* rBufferOffset);
// Attempts to stage a new delete in the _stagedDeletesBuffer. Returns the PlanStage::StageState
// fetched directly from the child except when there is a document to stage. Converts

View File

@ -47,19 +47,17 @@ void BatchedDeleteStageBuffer::append(WorkingSetID id) {
_buffer.emplace_back(id);
}
void BatchedDeleteStageBuffer::eraseUpToOffsetInclusive(size_t bufferOffset) {
tassert(
6515701,
fmt::format("Cannot erase offset '{}' - beyond the size of the BatchedDeleteStageBuffer {}",
bufferOffset,
_buffer.size()),
bufferOffset < _buffer.size());
for (unsigned int i = 0; i <= bufferOffset; i++) {
auto id = _buffer.at(i);
_ws->free(id);
void BatchedDeleteStageBuffer::removeLastN(size_t n) {
tassert(6515701,
fmt::format(
"Cannot remove '{}' elements - beyond the size of the BatchedDeleteStageBuffer {}",
n,
_buffer.size()),
n <= _buffer.size());
for (unsigned int i = 0; i < n; i++) {
_ws->free(_buffer.back());
_buffer.pop_back();
}
_buffer.erase(_buffer.begin(), _buffer.begin() + bufferOffset + 1);
}
void BatchedDeleteStageBuffer::erase(const std::set<WorkingSetID>& idsToRemove) {

View File

@ -63,10 +63,10 @@ public:
}
/**
* Erases up to 'bufferOffset' where 'bufferOffset' is inclusive. Frees up resources of
* WorkingSetMembers associated with the removed entries.
* Removes the last n elements in the buffer. Frees up resources of WorkingSetMembers associated
* with the removed entries.
*/
void eraseUpToOffsetInclusive(size_t bufferOffset);
void removeLastN(size_t n);
/**
* Erases the subset of 'idsToRemove' that exist in the buffer. Frees up resources of the

View File

@ -653,17 +653,17 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSWithTargetBatc
std::vector<std::pair<BSONObj, Milliseconds>> timedBatch0{
{BSON("_id" << 1 << "a" << 1), Milliseconds(1)},
{BSON("_id" << 2 << "a" << 2), Milliseconds(0)},
{BSON("_id" << 3 << "a" << 3), Milliseconds(0)},
{BSON("_id" << 2 << "a" << 2), Milliseconds(1)},
};
std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{
{BSON("_id" << 3 << "a" << 3), Milliseconds(4)},
{BSON("_id" << 4 << "a" << 4), Milliseconds(0)},
{BSON("_id" << 5 << "a" << 5), Milliseconds(0)},
{BSON("_id" << 6 << "a" << 6), Milliseconds(0)},
{BSON("_id" << 7 << "a" << 7), Milliseconds(0)},
{BSON("_id" << 8 << "a" << 8), Milliseconds(4)},
};
std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{
{BSON("_id" << 9 << "a" << 9), Milliseconds(1)},
{BSON("_id" << 8 << "a" << 8), Milliseconds(0)},
{BSON("_id" << 9 << "a" << 9), Milliseconds(0)},
{BSON("_id" << 10 << "a" << 10), Milliseconds(1)},
};
@ -706,16 +706,16 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSWithTargetBatc
}
}
// Batch0 deletions.
// Batch1 deletions.
{
Timer timer(tickSource());
state = deleteStage->work(&id);
ASSERT_EQ(stats->docsDeleted, batchSize0);
ASSERT_EQ(stats->docsDeleted, batchSize1);
ASSERT_EQ(state, PlanStage::NEED_TIME);
ASSERT_GTE(Milliseconds(timer.millis()), targetBatchTimeMS);
}
// Batch1 deletions.
// Batch0 deletions.
{
Timer timer(tickSource());
@ -994,23 +994,23 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetPassTimeMSReachedBeforeTa
auto targetBatchTimeMS = Milliseconds(5);
auto targetPassTimeMS = Milliseconds(10);
// Reaches 'targetBatchDocs'.
// Reaches 'targetBatchDocs'. First delete batch.
std::vector<std::pair<BSONObj, Milliseconds>> batch0{
{BSON("_id" << 1 << "a" << 1), Milliseconds(1)},
{BSON("_id" << 2 << "a" << 2), Milliseconds(0)},
{BSON("_id" << 3 << "a" << 3), Milliseconds(0)},
};
// Reaches 'targetBatchTimeMS'.
// 'targetPassTimeMS' is met, the buffer is partially drained, this is the last batch to commit
// before pass completion. Third delete batch.
std::vector<std::pair<BSONObj, Milliseconds>> batch1{
{BSON("_id" << 4 << "a" << 4), Milliseconds(4)},
{BSON("_id" << 5 << "a" << 5), Milliseconds(6)},
{BSON("_id" << 6 << "a" << 6), Milliseconds(0)},
};
// 'targetPassTimeMS' is met, the buffer is partilly drained, this is the last batch to commit
// before pass completion.
// Reaches 'targetBatchTimeMS'. Second delete batch.
std::vector<std::pair<BSONObj, Milliseconds>> batch2{
{BSON("_id" << 6 << "a" << 6), Milliseconds(0)},
{BSON("_id" << 5 << "a" << 5), Milliseconds(6)},
{BSON("_id" << 4 << "a" << 4), Milliseconds(4)},
};
// Populate the collection before executing the BatchedDeleteStage.
@ -1084,10 +1084,10 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetPassTimeMSReachedBeforeTa
}
}
// Batch1 deletions.
// Batch2 deletions.
{
state = deleteStage->work(&id);
ASSERT_EQ(stats->docsDeleted, batch0.size() + batch1.size());
ASSERT_EQ(stats->docsDeleted, batch0.size() + batch2.size());
ASSERT_TRUE(stats->passTargetMet);
// Despite reaching the 'targetPassTimeMS', the remaining deletes staged in the buffer still