Co-authored-by: Solomon Lifshits <solomon.lifshits@mongodb.com> GitOrigin-RevId: e8f95b94c4e9600662ebdc5654234e8f06e62310
This commit is contained in:
parent
3facd5a845
commit
2d5f499469
@ -267,6 +267,9 @@ void WriteConflictRetryAlgorithm::_handleStorageUnavailable(const Status& status
|
||||
*/
|
||||
void WriteConflictRetryAlgorithm::_handleWriteConflictException(const Status& s) {
|
||||
++_wceCount;
|
||||
if (MONGO_unlikely(_dumpStateRetryCount && (_wceCount % _dumpStateRetryCount) == 0)) {
|
||||
_opCtx->getServiceContext()->getStorageEngine()->dump();
|
||||
}
|
||||
recordWriteConflict(_opCtx);
|
||||
_recoveryUnit().abandonSnapshot();
|
||||
_emitLog(s.reason());
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/concurrency/exception_util_gen.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/storage/recovery_unit.h"
|
||||
@ -122,8 +123,18 @@ public:
|
||||
RecoveryUnit& ru,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
boost::optional<size_t> retryLimit)
|
||||
: _opCtx{opCtx}, _ru(ru), _opStr{opStr}, _nssOrUUID{nssOrUUID}, _retryLimit{retryLimit} {
|
||||
boost::optional<size_t> retryLimit,
|
||||
int dumpStateRetryCount)
|
||||
: _opCtx{opCtx},
|
||||
_ru(ru),
|
||||
_opStr{opStr},
|
||||
_nssOrUUID{nssOrUUID},
|
||||
_retryLimit{retryLimit},
|
||||
_dumpStateRetryCount(
|
||||
dumpStateRetryCount
|
||||
? std::max(dumpStateRetryCount,
|
||||
gMinimalWriteConflictRetryCountForStateDump.loadRelaxed())
|
||||
: 0) {
|
||||
invariant(_opCtx);
|
||||
invariant(shard_role_details::getLocker(_opCtx));
|
||||
}
|
||||
@ -131,12 +142,18 @@ public:
|
||||
std::function<RecoveryUnit&()> ru,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
boost::optional<size_t> retryLimit)
|
||||
boost::optional<size_t> retryLimit,
|
||||
int dumpStateRetryCount)
|
||||
: _opCtx{opCtx},
|
||||
_ru(std::move(ru)),
|
||||
_opStr{opStr},
|
||||
_nssOrUUID{nssOrUUID},
|
||||
_retryLimit{retryLimit} {
|
||||
_retryLimit{retryLimit},
|
||||
_dumpStateRetryCount(
|
||||
dumpStateRetryCount
|
||||
? std::max(dumpStateRetryCount,
|
||||
gMinimalWriteConflictRetryCountForStateDump.loadRelaxed())
|
||||
: 0) {
|
||||
invariant(_opCtx);
|
||||
invariant(shard_role_details::getLocker(_opCtx));
|
||||
}
|
||||
@ -180,7 +197,6 @@ private:
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void _emitLog(StringData reason);
|
||||
void _assertRetryLimit() const;
|
||||
void _handleStorageUnavailable(const Status& e);
|
||||
@ -201,7 +217,7 @@ private:
|
||||
const StringData _opStr;
|
||||
const NamespaceStringOrUUID& _nssOrUUID;
|
||||
const boost::optional<size_t> _retryLimit;
|
||||
|
||||
const int _dumpStateRetryCount = 0;
|
||||
size_t _attemptCount = 0;
|
||||
size_t _wceCount = 0;
|
||||
size_t _tempUnavailableCount = 0;
|
||||
@ -228,26 +244,34 @@ private:
|
||||
* TODO (SERVER-105773): Remove the overload without RecoveryUnit.
|
||||
*/
|
||||
template <typename F>
|
||||
auto writeConflictRetry(OperationContext* opCtx,
|
||||
RecoveryUnit& ru,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
F&& f,
|
||||
boost::optional<size_t> retryLimit = boost::none) {
|
||||
return WriteConflictRetryAlgorithm{opCtx, ru, opStr, nssOrUUID, retryLimit}(std::forward<F>(f));
|
||||
auto writeConflictRetry(
|
||||
OperationContext* opCtx,
|
||||
RecoveryUnit& ru,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
F&& f,
|
||||
boost::optional<size_t> retryLimit = boost::none,
|
||||
/* Dump the WT state on every N times when you hit a WCE, where N == dumpStateRetryCount. */
|
||||
int dumpStateRetryCount = 0) {
|
||||
return WriteConflictRetryAlgorithm{
|
||||
opCtx, ru, opStr, nssOrUUID, retryLimit, dumpStateRetryCount}(std::forward<F>(f));
|
||||
}
|
||||
template <typename F>
|
||||
auto writeConflictRetry(OperationContext* opCtx,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
F&& f,
|
||||
boost::optional<size_t> retryLimit = boost::none) {
|
||||
auto writeConflictRetry(
|
||||
OperationContext* opCtx,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
F&& f,
|
||||
boost::optional<size_t> retryLimit = boost::none,
|
||||
/* Dump the WT state on every N times when you hit a WCE, where N == dumpStateRetryCount. */
|
||||
int dumpStateRetryCount = 0) {
|
||||
return WriteConflictRetryAlgorithm{
|
||||
opCtx,
|
||||
[opCtx]() -> RecoveryUnit& { return *shard_role_details::getRecoveryUnit(opCtx); },
|
||||
opStr,
|
||||
nssOrUUID,
|
||||
retryLimit}(std::forward<F>(f));
|
||||
retryLimit,
|
||||
dumpStateRetryCount}(std::forward<F>(f));
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@ -40,6 +40,16 @@ server_parameters:
|
||||
gte: 0
|
||||
redact: false
|
||||
|
||||
minimalWriteConflictRetryCountForStateDump:
|
||||
description: "We won't dump storage engine state more frequently than once in this many write conflict retries"
|
||||
set_at: [startup, runtime]
|
||||
cpp_varname: "gMinimalWriteConflictRetryCountForStateDump"
|
||||
cpp_vartype: AtomicWord<int>
|
||||
default: 10000
|
||||
validator:
|
||||
gte: 10000
|
||||
redact: false
|
||||
|
||||
temporarilyUnavailableBackoffBaseMs:
|
||||
description:
|
||||
"The base period of time to wait between each TemporarilyUnavailable retry
|
||||
|
||||
@ -198,7 +198,8 @@ Status _applyOps(OperationContext* opCtx,
|
||||
false, /* alwaysUpsert */
|
||||
oplogApplicationMode,
|
||||
isDataConsistent);
|
||||
});
|
||||
},
|
||||
oplogApplicationMode == repl::OplogApplication::Mode::kSecondary);
|
||||
} catch (const DBException& ex) {
|
||||
ab.append(false);
|
||||
result->append("applied", ++(*numApplied));
|
||||
|
||||
@ -1340,17 +1340,22 @@ Status applyOperation_inlock(OperationContext* opCtx,
|
||||
"mode should be in initialSync or recovering",
|
||||
mode == OplogApplication::Mode::kInitialSync ||
|
||||
OplogApplication::inRecovering(mode));
|
||||
writeConflictRetryWithLimit(opCtx, "applyOps_imageInvalidation", op.getNss(), [&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
writeToImageCollection(opCtx,
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
op.getNeedsRetryImage().value(),
|
||||
BSONObj(),
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
wuow.commit();
|
||||
});
|
||||
writeConflictRetryWithLimit(
|
||||
opCtx,
|
||||
"applyOps_imageInvalidation",
|
||||
op.getNss(),
|
||||
[&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
writeToImageCollection(opCtx,
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
op.getNeedsRetryImage().value(),
|
||||
BSONObj(),
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
wuow.commit();
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
}
|
||||
uassert(ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "Failed to apply operation due to missing collection ("
|
||||
@ -1643,24 +1648,30 @@ Status applyOperation_inlock(OperationContext* opCtx,
|
||||
request.setFromOplogApplication(true);
|
||||
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY);
|
||||
|
||||
writeConflictRetryWithLimit(opCtx, "applyOps_upsert", op.getNss(), [&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
// If `haveWrappingWriteUnitOfWork` is true, do not timestamp the write.
|
||||
if (assignOperationTimestamp && timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(
|
||||
timestamp));
|
||||
}
|
||||
writeConflictRetryWithLimit(
|
||||
opCtx,
|
||||
"applyOps_upsert",
|
||||
op.getNss(),
|
||||
[&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
// If `haveWrappingWriteUnitOfWork` is true, do not timestamp the write.
|
||||
if (assignOperationTimestamp && timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(
|
||||
timestamp));
|
||||
}
|
||||
|
||||
UpdateResult res = update(opCtx, collectionAcquisition, request);
|
||||
if (res.numMatched == 0 && res.upsertedId.isEmpty()) {
|
||||
LOGV2_ERROR(21257,
|
||||
"No document was updated even though we got a DuplicateKey "
|
||||
"error when inserting");
|
||||
fassertFailedNoTrace(28750);
|
||||
}
|
||||
wuow.commit();
|
||||
});
|
||||
UpdateResult res = update(opCtx, collectionAcquisition, request);
|
||||
if (res.numMatched == 0 && res.upsertedId.isEmpty()) {
|
||||
LOGV2_ERROR(
|
||||
21257,
|
||||
"No document was updated even though we got a DuplicateKey "
|
||||
"error when inserting");
|
||||
fassertFailedNoTrace(28750);
|
||||
}
|
||||
wuow.commit();
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
}
|
||||
|
||||
if (incrementOpsAppliedStats) {
|
||||
@ -1759,111 +1770,118 @@ Status applyOperation_inlock(OperationContext* opCtx,
|
||||
// the logical session ID being written is checked out when we do the write.
|
||||
// We can still get a write conflict on the primary as a delete done as part of expired
|
||||
// session cleanup can race with a use of the expired session.
|
||||
auto status = writeConflictRetryWithLimit(opCtx, "applyOps_update", op.getNss(), [&] {
|
||||
auto write_status = Status::OK();
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
if (timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(timestamp));
|
||||
}
|
||||
auto status = writeConflictRetryWithLimit(
|
||||
opCtx,
|
||||
"applyOps_update",
|
||||
op.getNss(),
|
||||
[&] {
|
||||
auto write_status = Status::OK();
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
if (timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(timestamp));
|
||||
}
|
||||
|
||||
if (recordChangeStreamPreImage && request.shouldReturnNewDocs()) {
|
||||
// Load the document version before update to be used as the change stream
|
||||
// pre-image since the update operation will load the new version of the
|
||||
// document.
|
||||
invariant(op.getObject2());
|
||||
auto&& documentId = *op.getObject2();
|
||||
if (recordChangeStreamPreImage && request.shouldReturnNewDocs()) {
|
||||
// Load the document version before update to be used as the change stream
|
||||
// pre-image since the update operation will load the new version of the
|
||||
// document.
|
||||
invariant(op.getObject2());
|
||||
auto&& documentId = *op.getObject2();
|
||||
|
||||
auto documentFound = Helpers::findById(
|
||||
opCtx, collection->ns(), documentId, changeStreamPreImage);
|
||||
invariant(documentFound);
|
||||
}
|
||||
auto documentFound = Helpers::findById(
|
||||
opCtx, collection->ns(), documentId, changeStreamPreImage);
|
||||
invariant(documentFound);
|
||||
}
|
||||
|
||||
UpdateResult ur = update(opCtx, collectionAcquisition, request);
|
||||
if (ur.numMatched == 0 && ur.upsertedId.isEmpty()) {
|
||||
if (collection && collection->isCapped() &&
|
||||
mode == OplogApplication::Mode::kSecondary) {
|
||||
// We can't assume there was a problem when the collection is capped,
|
||||
// because the item may have been deleted by the cappedDeleter. This
|
||||
// only matters for steady-state mode, because all errors on missing
|
||||
// updates are ignored at a higher level for recovery and initial sync.
|
||||
LOGV2_DEBUG(2170003,
|
||||
2,
|
||||
"couldn't find doc in capped collection",
|
||||
"op"_attr = redact(op.toBSONForLogging()));
|
||||
UpdateResult ur = update(opCtx, collectionAcquisition, request);
|
||||
if (ur.numMatched == 0 && ur.upsertedId.isEmpty()) {
|
||||
if (collection && collection->isCapped() &&
|
||||
mode == OplogApplication::Mode::kSecondary) {
|
||||
// We can't assume there was a problem when the collection is capped,
|
||||
// because the item may have been deleted by the cappedDeleter. This
|
||||
// only matters for steady-state mode, because all errors on missing
|
||||
// updates are ignored at a higher level for recovery and initial sync.
|
||||
LOGV2_DEBUG(2170003,
|
||||
2,
|
||||
"couldn't find doc in capped collection",
|
||||
"op"_attr = redact(op.toBSONForLogging()));
|
||||
|
||||
} else {
|
||||
// this could happen benignly on an oplog duplicate replay of an upsert
|
||||
// (because we are idempotent), if a regular non-mod update fails the
|
||||
// item is (presumably) missing.
|
||||
if (!upsert) {
|
||||
static constexpr char msg[] = "Update of non-mod failed";
|
||||
LOGV2_ERROR(21260, msg, "op"_attr = redact(op.toBSONForLogging()));
|
||||
write_status = Status(ErrorCodes::UpdateOperationFailed,
|
||||
str::stream() << msg << ": "
|
||||
<< redact(op.toBSONForLogging()));
|
||||
} else {
|
||||
// this could happen benignly on an oplog duplicate replay of an upsert
|
||||
// (because we are idempotent), if a regular non-mod update fails the
|
||||
// item is (presumably) missing.
|
||||
if (!upsert) {
|
||||
static constexpr char msg[] = "Update of non-mod failed";
|
||||
LOGV2_ERROR(21260, msg, "op"_attr = redact(op.toBSONForLogging()));
|
||||
write_status = Status(
|
||||
ErrorCodes::UpdateOperationFailed,
|
||||
str::stream() << msg << ": " << redact(op.toBSONForLogging()));
|
||||
}
|
||||
}
|
||||
} else if (!upsertOplogEntry && !ur.upsertedId.isEmpty() &&
|
||||
!(collection && collection->isCapped())) {
|
||||
// This indicates we upconverted an update to an upsert, and it did indeed
|
||||
// upsert. In steady state mode this is unexpected.
|
||||
if (mode == OplogApplication::Mode::kSecondary) {
|
||||
opCounters->gotUpdateOnMissingDoc();
|
||||
logOplogConstraintViolation(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kUpdateOnMissingDoc,
|
||||
"update",
|
||||
redact(op.toBSONForLogging()),
|
||||
boost::none /* status */);
|
||||
|
||||
// We shouldn't be doing upserts in secondary mode when enforcing steady
|
||||
// state constraints.
|
||||
invariant(!oplogApplicationEnforcesSteadyStateConstraints);
|
||||
} else if (inStableRecovery) {
|
||||
repl::OplogApplication::checkOnOplogFailureForRecovery(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
redact(op.toBSONForLogging()),
|
||||
std::string{repl::kUpdateOnMissingDocMsg});
|
||||
} else if (mode == OplogApplication::Mode::kInitialSync) {
|
||||
// TODO (SERVER-87994): Revisit the verbosity of the logging.
|
||||
LOGV2_DEBUG(
|
||||
8776803,
|
||||
1,
|
||||
"INFO: InitialSync oplog application upconverted an update to "
|
||||
"an upsert.",
|
||||
"oplogEntry"_attr = redact(op.toBSONForLogging()));
|
||||
}
|
||||
}
|
||||
} else if (!upsertOplogEntry && !ur.upsertedId.isEmpty() &&
|
||||
!(collection && collection->isCapped())) {
|
||||
// This indicates we upconverted an update to an upsert, and it did indeed
|
||||
// upsert. In steady state mode this is unexpected.
|
||||
if (mode == OplogApplication::Mode::kSecondary) {
|
||||
opCounters->gotUpdateOnMissingDoc();
|
||||
logOplogConstraintViolation(
|
||||
|
||||
if (op.getNeedsRetryImage()) {
|
||||
writeToImageCollection(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kUpdateOnMissingDoc,
|
||||
"update",
|
||||
redact(op.toBSONForLogging()),
|
||||
boost::none /* status */);
|
||||
|
||||
// We shouldn't be doing upserts in secondary mode when enforcing steady
|
||||
// state constraints.
|
||||
invariant(!oplogApplicationEnforcesSteadyStateConstraints);
|
||||
} else if (inStableRecovery) {
|
||||
repl::OplogApplication::checkOnOplogFailureForRecovery(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
redact(op.toBSONForLogging()),
|
||||
std::string{repl::kUpdateOnMissingDocMsg});
|
||||
} else if (mode == OplogApplication::Mode::kInitialSync) {
|
||||
// TODO (SERVER-87994): Revisit the verbosity of the logging.
|
||||
LOGV2_DEBUG(8776803,
|
||||
1,
|
||||
"INFO: InitialSync oplog application upconverted an update to "
|
||||
"an upsert.",
|
||||
"oplogEntry"_attr = redact(op.toBSONForLogging()));
|
||||
}
|
||||
}
|
||||
|
||||
if (op.getNeedsRetryImage()) {
|
||||
writeToImageCollection(opCtx,
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
op.getNeedsRetryImage().value(),
|
||||
// If we did not request an image because we're in
|
||||
// initial sync, the value passed in here is conveniently
|
||||
// the empty BSONObj.
|
||||
ur.requestedDocImage,
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
}
|
||||
|
||||
if (recordChangeStreamPreImage) {
|
||||
if (!request.shouldReturnNewDocs()) {
|
||||
// A document version before update was loaded by the update operation.
|
||||
invariant(!ur.requestedDocImage.isEmpty());
|
||||
changeStreamPreImage = ur.requestedDocImage;
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
op.getNeedsRetryImage().value(),
|
||||
// If we did not request an image because we're in
|
||||
// initial sync, the value passed in here is conveniently
|
||||
// the empty BSONObj.
|
||||
ur.requestedDocImage,
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
}
|
||||
|
||||
// Write a pre-image of a document for change streams.
|
||||
writeChangeStreamPreImage(opCtx, collection, op, changeStreamPreImage);
|
||||
}
|
||||
if (recordChangeStreamPreImage) {
|
||||
if (!request.shouldReturnNewDocs()) {
|
||||
// A document version before update was loaded by the update operation.
|
||||
invariant(!ur.requestedDocImage.isEmpty());
|
||||
changeStreamPreImage = ur.requestedDocImage;
|
||||
}
|
||||
|
||||
wuow.commit();
|
||||
return write_status;
|
||||
});
|
||||
// Write a pre-image of a document for change streams.
|
||||
writeChangeStreamPreImage(opCtx, collection, op, changeStreamPreImage);
|
||||
}
|
||||
|
||||
wuow.commit();
|
||||
return write_status;
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
|
||||
if (!status.isOK()) {
|
||||
if (inStableRecovery) {
|
||||
@ -1911,133 +1929,143 @@ Status applyOperation_inlock(OperationContext* opCtx,
|
||||
// Determine if a change stream pre-image has to be recorded for the oplog entry.
|
||||
const bool recordChangeStreamPreImage = shouldRecordChangeStreamPreImage();
|
||||
|
||||
writeConflictRetryWithLimit(opCtx, "applyOps_delete", op.getNss(), [&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
if (timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(timestamp));
|
||||
}
|
||||
|
||||
DeleteRequest request;
|
||||
request.setNsString(requestNss);
|
||||
request.setQuery(deleteCriteria);
|
||||
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY);
|
||||
if (mode != OplogApplication::Mode::kInitialSync &&
|
||||
op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage &&
|
||||
isDataConsistent) {
|
||||
// When in initial sync, we'll pass an empty image into
|
||||
// `writeToImageCollection`.
|
||||
request.setReturnDeleted(true);
|
||||
}
|
||||
|
||||
if (recordChangeStreamPreImage) {
|
||||
// Request loading of the document version before delete operation to be
|
||||
// used as change stream pre-image.
|
||||
request.setReturnDeleted(true);
|
||||
}
|
||||
|
||||
DeleteResult result = deleteObject(opCtx, collectionAcquisition, request);
|
||||
if (op.getNeedsRetryImage()) {
|
||||
// Even if `result.nDeleted` is 0, we want to perform a write to the
|
||||
// imageCollection to advance the txnNumber/ts and invalidate the image.
|
||||
// This isn't strictly necessary for correctness -- the
|
||||
// `config.transactions` table is responsible for whether to retry. The
|
||||
// motivation here is to simply reduce the number of states related
|
||||
// documents in the two collections can be in.
|
||||
writeToImageCollection(opCtx,
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
repl::RetryImageEnum::kPreImage,
|
||||
result.requestedPreImage.value_or(BSONObj()),
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
}
|
||||
|
||||
if (recordChangeStreamPreImage) {
|
||||
invariant(result.requestedPreImage);
|
||||
writeChangeStreamPreImage(opCtx, collection, op, *(result.requestedPreImage));
|
||||
}
|
||||
|
||||
if (result.nDeleted == 0) {
|
||||
const auto errMsg = !collection
|
||||
? str::stream() << "(NamespaceNotFound): Failed to apply operation due "
|
||||
"to missing collection ("
|
||||
<< requestNss.toStringForErrorMsg() << ")"
|
||||
: "Applied a delete which did not delete anything."s;
|
||||
if (inStableRecovery) {
|
||||
repl::OplogApplication::checkOnOplogFailureForRecovery(
|
||||
opCtx, op.getNss(), redact(op.toBSONForLogging()), errMsg);
|
||||
} else if (mode == OplogApplication::Mode::kInitialSync) {
|
||||
// TODO (SERVER-87994): Revisit the verbosity of the logging.
|
||||
LOGV2_DEBUG(8776802,
|
||||
1,
|
||||
"INFO: Error applying operation while initialSync.",
|
||||
"oplogEntry"_attr = redact(op.toBSONForLogging()),
|
||||
"error"_attr = errMsg);
|
||||
writeConflictRetryWithLimit(
|
||||
opCtx,
|
||||
"applyOps_delete",
|
||||
op.getNss(),
|
||||
[&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
if (timestamp != Timestamp::min()) {
|
||||
uassertStatusOK(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestamp(timestamp));
|
||||
}
|
||||
}
|
||||
// It is legal for a delete operation on the pre-images collection to delete
|
||||
// zero documents - pre-image collections are not guaranteed to contain the same
|
||||
// set of documents at all times. The same holds for change-collections as they
|
||||
// both rely on unreplicated deletes when
|
||||
// "featureFlagUseUnreplicatedTruncatesForDeletions" is enabled.
|
||||
//
|
||||
// TODO SERVER-70591: Remove feature flag requirement in comment above.
|
||||
//
|
||||
// It is also legal for a delete operation on the config.image_collection (used
|
||||
// for find-and-modify retries) to delete zero documents. Since we do not write
|
||||
// updates to this collection which are in the same batch as later deletes, a
|
||||
// rollback to the middle of a batch with both an update and a delete may result
|
||||
// in a missing document, which may be later deleted.
|
||||
if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary &&
|
||||
!requestNss.isChangeStreamPreImagesCollection() &&
|
||||
!requestNss.isChangeCollection() && !requestNss.isConfigImagesCollection()) {
|
||||
// In FCV 4.4, each node is responsible for deleting the excess documents in
|
||||
// capped collections. This implies that capped deletes may not be
|
||||
// synchronized between nodes at times. When upgraded to FCV 5.0, the
|
||||
// primary will generate delete oplog entries for capped collections.
|
||||
// However, if any secondary was behind in deleting excess documents while
|
||||
// in FCV 4.4, the primary would have no way of knowing and it would delete
|
||||
// the first document it sees locally. Eventually, when secondaries step up
|
||||
// and start deleting capped documents, they will first delete previously
|
||||
// missed documents that may already be deleted on other nodes. For this
|
||||
// reason we skip returning NoSuchKey for capped collections when oplog
|
||||
// application is enforcing steady state constraints.
|
||||
bool isCapped = false;
|
||||
const auto& opObj = redact(op.toBSONForLogging());
|
||||
if (collection) {
|
||||
isCapped = collection->isCapped();
|
||||
opCounters->gotDeleteWasEmpty();
|
||||
logOplogConstraintViolation(opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kDeleteWasEmpty,
|
||||
"delete",
|
||||
opObj,
|
||||
boost::none /* status */);
|
||||
} else {
|
||||
opCounters->gotDeleteFromMissingNamespace();
|
||||
logOplogConstraintViolation(
|
||||
|
||||
DeleteRequest request;
|
||||
request.setNsString(requestNss);
|
||||
request.setQuery(deleteCriteria);
|
||||
request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY);
|
||||
if (mode != OplogApplication::Mode::kInitialSync &&
|
||||
op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage &&
|
||||
isDataConsistent) {
|
||||
// When in initial sync, we'll pass an empty image into
|
||||
// `writeToImageCollection`.
|
||||
request.setReturnDeleted(true);
|
||||
}
|
||||
|
||||
if (recordChangeStreamPreImage) {
|
||||
// Request loading of the document version before delete operation to be
|
||||
// used as change stream pre-image.
|
||||
request.setReturnDeleted(true);
|
||||
}
|
||||
|
||||
DeleteResult result = deleteObject(opCtx, collectionAcquisition, request);
|
||||
if (op.getNeedsRetryImage()) {
|
||||
// Even if `result.nDeleted` is 0, we want to perform a write to the
|
||||
// imageCollection to advance the txnNumber/ts and invalidate the image.
|
||||
// This isn't strictly necessary for correctness -- the
|
||||
// `config.transactions` table is responsible for whether to retry. The
|
||||
// motivation here is to simply reduce the number of states related
|
||||
// documents in the two collections can be in.
|
||||
writeToImageCollection(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kDeleteOnMissingNs,
|
||||
"delete",
|
||||
opObj,
|
||||
boost::none /* status */);
|
||||
op.getSessionId().value(),
|
||||
op.getTxnNumber().value(),
|
||||
op.getApplyOpsTimestamp().value_or(op.getTimestamp()),
|
||||
repl::RetryImageEnum::kPreImage,
|
||||
result.requestedPreImage.value_or(BSONObj()),
|
||||
getInvalidatingReason(mode, isDataConsistent));
|
||||
}
|
||||
|
||||
if (!isCapped) {
|
||||
// This error is fatal when we are enforcing steady state constraints
|
||||
// for non-capped collections.
|
||||
uassert(collection ? ErrorCodes::NoSuchKey : ErrorCodes::NamespaceNotFound,
|
||||
str::stream()
|
||||
<< "Applied a delete which did not delete anything in "
|
||||
"steady state replication : "
|
||||
<< redact(op.toBSONForLogging()),
|
||||
!oplogApplicationEnforcesSteadyStateConstraints);
|
||||
if (recordChangeStreamPreImage) {
|
||||
invariant(result.requestedPreImage);
|
||||
writeChangeStreamPreImage(
|
||||
opCtx, collection, op, *(result.requestedPreImage));
|
||||
}
|
||||
}
|
||||
wuow.commit();
|
||||
});
|
||||
|
||||
if (result.nDeleted == 0) {
|
||||
const auto errMsg = !collection
|
||||
? str::stream() << "(NamespaceNotFound): Failed to apply operation due "
|
||||
"to missing collection ("
|
||||
<< requestNss.toStringForErrorMsg() << ")"
|
||||
: "Applied a delete which did not delete anything."s;
|
||||
if (inStableRecovery) {
|
||||
repl::OplogApplication::checkOnOplogFailureForRecovery(
|
||||
opCtx, op.getNss(), redact(op.toBSONForLogging()), errMsg);
|
||||
} else if (mode == OplogApplication::Mode::kInitialSync) {
|
||||
// TODO (SERVER-87994): Revisit the verbosity of the logging.
|
||||
LOGV2_DEBUG(8776802,
|
||||
1,
|
||||
"INFO: Error applying operation while initialSync.",
|
||||
"oplogEntry"_attr = redact(op.toBSONForLogging()),
|
||||
"error"_attr = errMsg);
|
||||
}
|
||||
}
|
||||
// It is legal for a delete operation on the pre-images collection to delete
|
||||
// zero documents - pre-image collections are not guaranteed to contain the same
|
||||
// set of documents at all times. The same holds for change-collections as they
|
||||
// both rely on unreplicated deletes when
|
||||
// "featureFlagUseUnreplicatedTruncatesForDeletions" is enabled.
|
||||
//
|
||||
// TODO SERVER-70591: Remove feature flag requirement in comment above.
|
||||
//
|
||||
// It is also legal for a delete operation on the config.image_collection (used
|
||||
// for find-and-modify retries) to delete zero documents. Since we do not write
|
||||
// updates to this collection which are in the same batch as later deletes, a
|
||||
// rollback to the middle of a batch with both an update and a delete may result
|
||||
// in a missing document, which may be later deleted.
|
||||
if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary &&
|
||||
!requestNss.isChangeStreamPreImagesCollection() &&
|
||||
!requestNss.isChangeCollection() &&
|
||||
!requestNss.isConfigImagesCollection()) {
|
||||
// In FCV 4.4, each node is responsible for deleting the excess documents in
|
||||
// capped collections. This implies that capped deletes may not be
|
||||
// synchronized between nodes at times. When upgraded to FCV 5.0, the
|
||||
// primary will generate delete oplog entries for capped collections.
|
||||
// However, if any secondary was behind in deleting excess documents while
|
||||
// in FCV 4.4, the primary would have no way of knowing and it would delete
|
||||
// the first document it sees locally. Eventually, when secondaries step up
|
||||
// and start deleting capped documents, they will first delete previously
|
||||
// missed documents that may already be deleted on other nodes. For this
|
||||
// reason we skip returning NoSuchKey for capped collections when oplog
|
||||
// application is enforcing steady state constraints.
|
||||
bool isCapped = false;
|
||||
const auto& opObj = redact(op.toBSONForLogging());
|
||||
if (collection) {
|
||||
isCapped = collection->isCapped();
|
||||
opCounters->gotDeleteWasEmpty();
|
||||
logOplogConstraintViolation(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kDeleteWasEmpty,
|
||||
"delete",
|
||||
opObj,
|
||||
boost::none /* status */);
|
||||
} else {
|
||||
opCounters->gotDeleteFromMissingNamespace();
|
||||
logOplogConstraintViolation(
|
||||
opCtx,
|
||||
op.getNss(),
|
||||
OplogConstraintViolationEnum::kDeleteOnMissingNs,
|
||||
"delete",
|
||||
opObj,
|
||||
boost::none /* status */);
|
||||
}
|
||||
|
||||
if (!isCapped) {
|
||||
// This error is fatal when we are enforcing steady state constraints
|
||||
// for non-capped collections.
|
||||
uassert(collection ? ErrorCodes::NoSuchKey
|
||||
: ErrorCodes::NamespaceNotFound,
|
||||
str::stream()
|
||||
<< "Applied a delete which did not delete anything in "
|
||||
"steady state replication : "
|
||||
<< redact(op.toBSONForLogging()),
|
||||
!oplogApplicationEnforcesSteadyStateConstraints);
|
||||
}
|
||||
}
|
||||
wuow.commit();
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
|
||||
if (incrementOpsAppliedStats) {
|
||||
incrementOpsAppliedStats();
|
||||
|
||||
@ -339,9 +339,14 @@ template <typename F>
|
||||
auto writeConflictRetryWithLimit(OperationContext* opCtx,
|
||||
StringData opStr,
|
||||
const NamespaceStringOrUUID& nssOrUUID,
|
||||
F&& f) {
|
||||
return writeConflictRetry(
|
||||
opCtx, opStr, nssOrUUID, f, repl::writeConflictRetryLimit.loadRelaxed());
|
||||
F&& f,
|
||||
bool dump = false) {
|
||||
return writeConflictRetry(opCtx,
|
||||
opStr,
|
||||
nssOrUUID,
|
||||
f,
|
||||
repl::writeConflictRetryLimit.loadRelaxed(),
|
||||
dump ? repl::writeConflictRetryCountForDumpState.loadRelaxed() : 0);
|
||||
}
|
||||
|
||||
} // namespace repl
|
||||
|
||||
@ -722,6 +722,19 @@ server_parameters:
|
||||
gte: 0
|
||||
redact: false
|
||||
|
||||
writeConflictRetryCountForDumpState:
|
||||
description: >-
|
||||
The number of write conflict retries after which we dump a storage engine state
|
||||
to assist with diagnostics during secondary oplog application unexpected behavior. If this parameter is less than
|
||||
gMinimalWriteConflictRetryCountForStateDump, then the latter will be used to determine dump rate.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: AtomicWord<int>
|
||||
cpp_varname: writeConflictRetryCountForDumpState
|
||||
default: 1000
|
||||
validator:
|
||||
gte: 1000
|
||||
redact: false
|
||||
|
||||
allowEmptyOplogBatchesToPropagateCommitPoint:
|
||||
description: >-
|
||||
Whether oplog cursors should skip waiting for new data to return if the commit point advances.
|
||||
|
||||
@ -307,7 +307,10 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
|
||||
Status status = Status::OK();
|
||||
|
||||
repl::writeConflictRetryWithLimit(
|
||||
opCtx, "replaying prepared transaction", NamespaceString(dbName), [&] {
|
||||
opCtx,
|
||||
"replaying prepared transaction",
|
||||
NamespaceString(dbName),
|
||||
[&] {
|
||||
WriteUnitOfWork wunit(opCtx);
|
||||
|
||||
// We might replay a prepared transaction behind oldest timestamp.
|
||||
@ -335,7 +338,8 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
|
||||
shard_role_details::getRecoveryUnit(opCtx)->setDurableTimestamp(durableTimestamp);
|
||||
wunit.commit();
|
||||
}
|
||||
});
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
|
||||
return status;
|
||||
}
|
||||
@ -637,7 +641,10 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
|
||||
}
|
||||
|
||||
return repl::writeConflictRetryWithLimit(
|
||||
opCtx, "applying prepare transaction", prepareOp.getNss(), [&] {
|
||||
opCtx,
|
||||
"applying prepare transaction",
|
||||
prepareOp.getNss(),
|
||||
[&] {
|
||||
// The write on transaction table may be applied concurrently, so refreshing
|
||||
// state from disk may read that write, causing starting a new transaction
|
||||
// on an existing txnNumber. Thus, we start a new transaction without
|
||||
@ -723,7 +730,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
|
||||
|
||||
txnParticipant.stashTransactionResources(opCtx);
|
||||
return Status::OK();
|
||||
});
|
||||
},
|
||||
mode == repl::OplogApplication::Mode::kSecondary);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Loading…
Reference in New Issue
Block a user