Co-authored-by: Evelyn Wu <evelyn.wu@mongodb.com> GitOrigin-RevId: 57ca066d7f2ef6bce7789053226034b7e32045f1
This commit is contained in:
parent
db35cc3354
commit
73dadb8ec4
@ -700,7 +700,8 @@ public:
|
||||
!failAfterReachingTransitioningState.shouldFail());
|
||||
|
||||
if (request.getPhase() == SetFCVPhaseEnum::kStart) {
|
||||
invariant(role && role->has(ClusterRole::ShardServer));
|
||||
invariant(role);
|
||||
invariant(role->has(ClusterRole::ShardServer));
|
||||
|
||||
// This helper function is only for any actions that should be done specifically on
|
||||
// shard servers during phase 1 of the 3-phase setFCV protocol for sharded clusters.
|
||||
@ -761,7 +762,8 @@ public:
|
||||
}
|
||||
|
||||
if (request.getPhase() == SetFCVPhaseEnum::kPrepare) {
|
||||
invariant(role && role->has(ClusterRole::ShardServer));
|
||||
invariant(role);
|
||||
invariant(role->has(ClusterRole::ShardServer));
|
||||
// If we are only running the 'prepare' phase, then we are done
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -108,7 +108,9 @@ void ReplicationStateTransitionLockGuard::_unlock() {
|
||||
// any exceptions to be thrown between _enqueueLock and waitForLockUntil because that would
|
||||
// delay cleaning up any failed RSTL lock attempt state from lock manager.
|
||||
invariant(
|
||||
!(_result == LOCK_WAITING && shard_role_details::getLocker(_opCtx)->inAWriteUnitOfWork()));
|
||||
!(_result == LOCK_WAITING && shard_role_details::getLocker(_opCtx)->inAWriteUnitOfWork()),
|
||||
str::stream() << "Lock result: " << _result << ". In a write unit of work: "
|
||||
<< shard_role_details::getLocker(_opCtx)->inAWriteUnitOfWork());
|
||||
shard_role_details::getLocker(_opCtx)->unlock(resourceIdReplicationStateTransitionLock);
|
||||
_result = LOCK_INVALID;
|
||||
}
|
||||
|
||||
@ -40,6 +40,8 @@
|
||||
#include "mongo/db/transaction_resources.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
@ -93,7 +95,9 @@ void writeToImageCollection(OperationContext* opCtx, OpStateAccumulator* opAccum
|
||||
curOp->setNS(clientLock, existingNs);
|
||||
}
|
||||
|
||||
invariant(res.numDocsModified == 1 || !res.upsertedId.isEmpty());
|
||||
invariant(res.numDocsModified == 1 || !res.upsertedId.isEmpty(),
|
||||
str::stream() << "NumDocsModified: " << res.numDocsModified
|
||||
<< ". Upserted Id: " << res.upsertedId.toString());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
@ -52,7 +52,8 @@ OpObserver::ReservedTimes::ReservedTimes(OperationContext* const opCtx)
|
||||
}
|
||||
|
||||
invariant(_times._recursionDepth > 0);
|
||||
invariant(_times._recursionDepth == 1 || !opCtx->writesAreReplicated());
|
||||
invariant(_times._recursionDepth == 1 || !opCtx->writesAreReplicated(),
|
||||
str::stream() << "writes are replicated: " << opCtx->writesAreReplicated());
|
||||
}
|
||||
|
||||
OpObserver::ReservedTimes::~ReservedTimes() {
|
||||
|
||||
@ -215,7 +215,8 @@ std::pair<Future<void>, TaskExecutor::EventHandle> BaseCloner::runOnExecutorEven
|
||||
TaskExecutor* executor) {
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
invariant(!_active && !_startedAsync);
|
||||
invariant(!_active);
|
||||
invariant(!_startedAsync);
|
||||
_startedAsync = true;
|
||||
}
|
||||
auto pf = makePromiseFuture<void>();
|
||||
|
||||
@ -289,10 +289,20 @@ void createIndexForApplyOps(OperationContext* opCtx,
|
||||
OplogApplication::Mode mode) {
|
||||
// Uncommitted collections support creating indexes using relaxed locking if they are part of a
|
||||
// multi-document transaction.
|
||||
invariant(shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss, MODE_X) ||
|
||||
(UncommittedCatalogUpdates::get(opCtx).isCreatedCollection(opCtx, indexNss) &&
|
||||
shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss, MODE_IX) &&
|
||||
opCtx->inMultiDocumentTransaction()));
|
||||
invariant(
|
||||
shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss, MODE_X) ||
|
||||
(UncommittedCatalogUpdates::get(opCtx).isCreatedCollection(opCtx, indexNss) &&
|
||||
shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss, MODE_IX) &&
|
||||
opCtx->inMultiDocumentTransaction()),
|
||||
str::stream() << "isCollectionLockedForModeX: "
|
||||
<< shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss,
|
||||
MODE_X)
|
||||
<< ", isCreatedCollection: "
|
||||
<< UncommittedCatalogUpdates::get(opCtx).isCreatedCollection(opCtx, indexNss)
|
||||
<< ", isCollectionLockedForModeIX: "
|
||||
<< shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(indexNss,
|
||||
MODE_IX)
|
||||
<< ", inMultiDocumentTransaction: " << opCtx->inMultiDocumentTransaction());
|
||||
|
||||
// Check if collection exists.
|
||||
auto databaseHolder = DatabaseHolder::get(opCtx);
|
||||
|
||||
@ -332,7 +332,8 @@ void OplogApplierBatcher::_consume(OperationContext* opCtx, OplogBuffer* oplogBu
|
||||
// for us to consume here. Since our postcondition is already met, it is safe to return
|
||||
// successfully.
|
||||
BSONObj opToPopAndDiscard;
|
||||
invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || _oplogApplier->inShutdown());
|
||||
invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || _oplogApplier->inShutdown(),
|
||||
str::stream() << "Oplog Applier in shutdown: " << _oplogApplier->inShutdown());
|
||||
}
|
||||
|
||||
void OplogApplierBatcher::_run(StorageInterface* storageInterface) {
|
||||
|
||||
@ -95,7 +95,8 @@ StatusWith<OplogInterface::Iterator::Value> OplogIteratorLocal::next() {
|
||||
}
|
||||
|
||||
// Non-yielding collection scans from InternalPlanner will never error.
|
||||
invariant(PlanExecutor::ADVANCED == state || PlanExecutor::IS_EOF == state);
|
||||
invariant(PlanExecutor::ADVANCED == state || PlanExecutor::IS_EOF == state,
|
||||
str::stream() << "Plan Executor state: " << state);
|
||||
|
||||
return StatusWith<Value>(std::make_pair(obj.getOwned(), recordId));
|
||||
}
|
||||
|
||||
@ -94,7 +94,10 @@ OplogVisibilityManager::const_iterator OplogVisibilityManager::trackTimestamps(
|
||||
ON_BLOCK_EXIT(notifyCappedWaitersIfVisibilityChanged(visibilityChanged, _rs));
|
||||
|
||||
stdx::lock_guard<stdx::mutex> lock(_mutex);
|
||||
invariant(first <= last && first > _latestTimeSeen);
|
||||
invariant(first <= last && first > _latestTimeSeen,
|
||||
str::stream() << "first timestamp: " << first.toString()
|
||||
<< ", last timestamp: " << last.toString()
|
||||
<< ", lastestTimeSeen: " << _latestTimeSeen.toString());
|
||||
|
||||
if (_oplogTimestampList.empty()) {
|
||||
visibilityChanged = _setOplogVisibilityTimestamp(lock, first - 1);
|
||||
|
||||
@ -631,8 +631,14 @@ PrimaryOnlyService::lookupInstance(OperationContext* opCtx, const InstanceID& id
|
||||
// interrupted at stepdown to prevent deadlocks.
|
||||
invariant(
|
||||
!shard_role_details::getLocker(opCtx)->isLocked() ||
|
||||
opCtx->shouldAlwaysInterruptAtStepDownOrUp() ||
|
||||
shard_role_details::getLocker(opCtx)->wasGlobalLockTakenInModeConflictingWithWrites());
|
||||
opCtx->shouldAlwaysInterruptAtStepDownOrUp() ||
|
||||
shard_role_details::getLocker(opCtx)->wasGlobalLockTakenInModeConflictingWithWrites(),
|
||||
str::stream() << "isLocked: " << shard_role_details::getLocker(opCtx)->isLocked()
|
||||
<< ", interruptibleByStepDownOrUp: "
|
||||
<< opCtx->shouldAlwaysInterruptAtStepDownOrUp()
|
||||
<< ", globalLockConflictingWithWrites: "
|
||||
<< shard_role_details::getLocker(opCtx)
|
||||
->wasGlobalLockTakenInModeConflictingWithWrites());
|
||||
|
||||
stdx::unique_lock lk(_mutex);
|
||||
_waitForStateNotRebuilding(opCtx, lk);
|
||||
@ -660,8 +666,14 @@ std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyService::g
|
||||
// interrupted at stepdown to prevent deadlocks.
|
||||
invariant(
|
||||
!shard_role_details::getLocker(opCtx)->isLocked() ||
|
||||
opCtx->shouldAlwaysInterruptAtStepDownOrUp() ||
|
||||
shard_role_details::getLocker(opCtx)->wasGlobalLockTakenInModeConflictingWithWrites());
|
||||
opCtx->shouldAlwaysInterruptAtStepDownOrUp() ||
|
||||
shard_role_details::getLocker(opCtx)->wasGlobalLockTakenInModeConflictingWithWrites(),
|
||||
str::stream() << "isLocked: " << shard_role_details::getLocker(opCtx)->isLocked()
|
||||
<< ", interruptibleByStepDownOrUp: "
|
||||
<< opCtx->shouldAlwaysInterruptAtStepDownOrUp()
|
||||
<< ", globalLockConflictingWithWrites: "
|
||||
<< shard_role_details::getLocker(opCtx)
|
||||
->wasGlobalLockTakenInModeConflictingWithWrites());
|
||||
|
||||
std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> instances;
|
||||
|
||||
|
||||
@ -261,7 +261,8 @@ public:
|
||||
* {level: "snapshot", atClusterTime: <ts>}.
|
||||
*/
|
||||
void setArgsAtClusterTimeForSnapshot(Timestamp ts) {
|
||||
invariant(_level && _level == ReadConcernLevel::kSnapshotReadConcern);
|
||||
invariant(_level);
|
||||
invariant(_level == ReadConcernLevel::kSnapshotReadConcern);
|
||||
// Only overwrite a server-selected atClusterTime, not user-supplied.
|
||||
invariant(_atClusterTime.is_initialized() == _atClusterTimeSelected);
|
||||
_afterClusterTime = boost::none;
|
||||
|
||||
@ -383,7 +383,9 @@ StatusWith<int> findSelfInConfig(ReplicationCoordinatorExternalState* externalSt
|
||||
}
|
||||
|
||||
int myIndex = std::distance(newConfig.membersBegin(), meConfigs.front());
|
||||
invariant(myIndex >= 0 && myIndex < newConfig.getNumMembers());
|
||||
invariant(myIndex >= 0 && myIndex < newConfig.getNumMembers(),
|
||||
str::stream() << "myIndex: " << myIndex
|
||||
<< ", numMembers: " << newConfig.getNumMembers());
|
||||
return StatusWith<int>(myIndex);
|
||||
}
|
||||
|
||||
|
||||
@ -48,7 +48,10 @@ bool getReplSetMemberInStandaloneMode(ServiceContext* serviceCtx) {
|
||||
void setReplSetMemberInStandaloneMode(ServiceContext* serviceCtx,
|
||||
bool isReplSetMemberInStandaloneMode) {
|
||||
auto& replSetMemberInStandaloneModeBool = replSetMemberInStandaloneMode(serviceCtx);
|
||||
invariant(!replSetMemberInStandaloneModeBool || isReplSetMemberInStandaloneMode);
|
||||
invariant(
|
||||
!replSetMemberInStandaloneModeBool || isReplSetMemberInStandaloneMode,
|
||||
str::stream() << "replSetMemberInStandaloneModeBool: " << replSetMemberInStandaloneModeBool
|
||||
<< ", isReplSetMemberInStandaloneMode: " << isReplSetMemberInStandaloneMode);
|
||||
replSetMemberInStandaloneModeBool = isReplSetMemberInStandaloneMode;
|
||||
}
|
||||
|
||||
|
||||
@ -152,14 +152,17 @@ int32_t ReplSetTagConfig::_findKeyIndex(StringData key) const {
|
||||
}
|
||||
|
||||
std::string ReplSetTagConfig::getTagKey(const ReplSetTag& tag) const {
|
||||
invariant(tag.isValid() && size_t(tag.getKeyIndex()) < _tagData.size());
|
||||
invariant(tag.isValid());
|
||||
invariant(size_t(tag.getKeyIndex()) < _tagData.size());
|
||||
return _tagData[tag.getKeyIndex()].first;
|
||||
}
|
||||
|
||||
std::string ReplSetTagConfig::getTagValue(const ReplSetTag& tag) const {
|
||||
invariant(tag.isValid() && size_t(tag.getKeyIndex()) < _tagData.size());
|
||||
invariant(tag.isValid());
|
||||
invariant(size_t(tag.getKeyIndex()) < _tagData.size());
|
||||
const ValueVector& values = _tagData[tag.getKeyIndex()].second;
|
||||
invariant(tag.getValueIndex() >= 0 && size_t(tag.getValueIndex()) < values.size());
|
||||
invariant(tag.getValueIndex() >= 0);
|
||||
invariant(size_t(tag.getValueIndex()) < values.size());
|
||||
return values[tag.getValueIndex()];
|
||||
}
|
||||
|
||||
|
||||
@ -1727,8 +1727,10 @@ bool ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTimeForward(
|
||||
// timestamp. So, in pv1, its not possible for us to get opTime with lower term and
|
||||
// timestamp higher than or equal to our current lastAppliedOptime.
|
||||
invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp());
|
||||
myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
opTime.getTimestamp() < myLastAppliedOpTime.getTimestamp(),
|
||||
str::stream() << "opTime: " << opTime.toString()
|
||||
<< ", myLastAppliedOpTime:" << myLastAppliedOpTime.toString());
|
||||
}
|
||||
|
||||
if (_readWriteAbility->canAcceptNonLocalWrites(lk) &&
|
||||
@ -2897,7 +2899,10 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(OperationContext* op
|
||||
const DatabaseName& dbName) {
|
||||
// The answer isn't meaningful unless we hold the ReplicationStateTransitionLock.
|
||||
invariant(opCtx->isLockFreeReadsOp() || shard_role_details::getLocker(opCtx)->isRSTLLocked() ||
|
||||
gFeatureFlagIntentRegistration.isEnabled());
|
||||
gFeatureFlagIntentRegistration.isEnabled(),
|
||||
str::stream() << "isLockFreeRead: " << opCtx->isLockFreeReadsOp()
|
||||
<< ", isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked());
|
||||
return canAcceptWritesForDatabase_UNSAFE(opCtx, dbName);
|
||||
}
|
||||
|
||||
@ -3015,7 +3020,10 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* opCtx
|
||||
const NamespaceString& ns,
|
||||
bool secondaryOk) {
|
||||
invariant(opCtx->isLockFreeReadsOp() || shard_role_details::getLocker(opCtx)->isRSTLLocked() ||
|
||||
gFeatureFlagIntentRegistration.isEnabled());
|
||||
gFeatureFlagIntentRegistration.isEnabled(),
|
||||
str::stream() << "isLockFreeRead: " << opCtx->isLockFreeReadsOp()
|
||||
<< ", isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked());
|
||||
return checkCanServeReadsFor_UNSAFE(opCtx, ns, secondaryOk);
|
||||
}
|
||||
|
||||
@ -3578,7 +3586,9 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
||||
// reconfig, we should never have a config in an older term. If the current config was
|
||||
// installed via a force reconfig, we aren't concerned about this safety guarantee.
|
||||
invariant(_rsConfig.unsafePeek().getConfigTerm() == OpTime::kUninitializedTerm ||
|
||||
_rsConfig.unsafePeek().getConfigTerm() == topCoordTerm);
|
||||
_rsConfig.unsafePeek().getConfigTerm() == topCoordTerm,
|
||||
str::stream() << "config term: " << _rsConfig.unsafePeek().getConfigTerm()
|
||||
<< ", topology coordinator term: " << topCoordTerm);
|
||||
}
|
||||
|
||||
auto configWriteConcern = _getConfigReplicationWriteConcern();
|
||||
@ -5836,7 +5846,10 @@ bool ReplicationCoordinatorImpl::ReadWriteAbility::canServeNonLocalReads(
|
||||
// We must be holding the RSTL.
|
||||
invariant(opCtx);
|
||||
invariant(opCtx->isLockFreeReadsOp() || shard_role_details::getLocker(opCtx)->isRSTLLocked() ||
|
||||
gFeatureFlagIntentRegistration.isEnabled());
|
||||
gFeatureFlagIntentRegistration.isEnabled(),
|
||||
str::stream() << "isLockFreeRead: " << opCtx->isLockFreeReadsOp()
|
||||
<< ", isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked());
|
||||
return _canServeNonLocalReads.loadRelaxed();
|
||||
}
|
||||
|
||||
|
||||
@ -733,9 +733,14 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
|
||||
|
||||
_setConfigState(lk, kConfigHBReconfiguring);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
invariant(!rsc.isInitialized() ||
|
||||
rsc.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() ||
|
||||
_selfIndex < 0);
|
||||
invariant(
|
||||
!rsc.isInitialized() ||
|
||||
rsc.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || _selfIndex < 0,
|
||||
str::stream() << "initialized: " << rsc.isInitialized() << ", old config version and term: "
|
||||
<< rsc.getConfigVersionAndTerm().toString()
|
||||
<< ", new config version and term: "
|
||||
<< newConfig.getConfigVersionAndTerm().toString()
|
||||
<< ", selfIndex: " << _selfIndex);
|
||||
_replExecutor
|
||||
->scheduleWork([=, this](const executor::TaskExecutor::CallbackArgs& cbData) {
|
||||
_heartbeatReconfigStore(cbData, newConfig);
|
||||
@ -1014,9 +1019,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
|
||||
}
|
||||
|
||||
invariant(_rsConfigState == kConfigHBReconfiguring);
|
||||
invariant(!rsc.isInitialized() ||
|
||||
rsc.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() ||
|
||||
_selfIndex < 0);
|
||||
invariant(
|
||||
!rsc.isInitialized() ||
|
||||
rsc.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || _selfIndex < 0,
|
||||
str::stream() << "initialized: " << rsc.isInitialized() << ", old config version and term: "
|
||||
<< rsc.getConfigVersionAndTerm().toString()
|
||||
<< ", new config version and term: "
|
||||
<< newConfig.getConfigVersionAndTerm().toString()
|
||||
<< ", selfIndex: " << _selfIndex);
|
||||
|
||||
if (!myIndex.isOK()) {
|
||||
switch (myIndex.getStatus().code()) {
|
||||
|
||||
@ -576,7 +576,8 @@ void RollbackImpl::_restoreTxnsTableEntryFromRetryableWrites(OperationContext* o
|
||||
const auto txnNumber = *opSessionInfo.getTxnNumber();
|
||||
const auto wallClockTime = entry.getWallClockTime();
|
||||
|
||||
invariant(!prevWriteOpTime.isNull() && prevWriteOpTime.getTimestamp() <= stableTimestamp);
|
||||
invariant(!prevWriteOpTime.isNull());
|
||||
invariant(prevWriteOpTime.getTimestamp() <= stableTimestamp);
|
||||
// This is a retryable writes oplog entry with a non-null 'prevWriteOpTime' value that
|
||||
// is less than or equal to the 'stableTimestamp'.
|
||||
LOGV2(5530501,
|
||||
|
||||
@ -1447,7 +1447,11 @@ void TopologyCoordinator::setMyLastWrittenOpTimeAndWallTime(OpTimeAndWallTime op
|
||||
bool isRollbackAllowed) {
|
||||
auto opTime = opTimeAndWallTime.opTime;
|
||||
auto& myMemberData = _selfMemberData();
|
||||
invariant(isRollbackAllowed || opTime >= myMemberData.getLastWrittenOpTime());
|
||||
invariant(isRollbackAllowed || opTime >= myMemberData.getLastWrittenOpTime(),
|
||||
str::stream() << "isRollbackAllowed: " << isRollbackAllowed
|
||||
<< ", current last written optime: "
|
||||
<< myMemberData.getLastWrittenOpTime().toString()
|
||||
<< ", setting last written optime to: " << opTime.toString());
|
||||
myMemberData.setLastWrittenOpTimeAndWallTime(opTimeAndWallTime, now);
|
||||
}
|
||||
|
||||
@ -1472,8 +1476,10 @@ void TopologyCoordinator::setMyLastAppliedOpTimeAndWallTime(OpTimeAndWallTime op
|
||||
// timestamp. So, in pv1, its not possible for us to get opTime with higher term and
|
||||
// timestamp lesser than or equal to our current lastAppliedOptime.
|
||||
invariant(opTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp());
|
||||
myLastAppliedOpTime.getTerm() == OpTime::kUninitializedTerm ||
|
||||
opTime.getTimestamp() > myLastAppliedOpTime.getTimestamp(),
|
||||
str::stream() << "current lastApplied opTime: " << myLastAppliedOpTime.toString()
|
||||
<< ", setting lastApplied opTime to: " << opTime.toString());
|
||||
}
|
||||
|
||||
myMemberData.setLastAppliedOpTimeAndWallTime(opTimeAndWallTime, now);
|
||||
@ -1492,7 +1498,11 @@ void TopologyCoordinator::setMyLastDurableOpTimeAndWallTime(OpTimeAndWallTime op
|
||||
bool isRollbackAllowed) {
|
||||
auto opTime = opTimeAndWallTime.opTime;
|
||||
auto& myMemberData = _selfMemberData();
|
||||
invariant(isRollbackAllowed || opTime >= myMemberData.getLastDurableOpTime());
|
||||
invariant(isRollbackAllowed || opTime >= myMemberData.getLastDurableOpTime(),
|
||||
str::stream() << "isRollbackAllowed: " << isRollbackAllowed
|
||||
<< ", current lastDurable opTime: "
|
||||
<< myMemberData.getLastDurableOpTime().toString()
|
||||
<< ", setting lastDurable opTime to: " << opTime.toString());
|
||||
myMemberData.setLastDurableOpTimeAndWallTime(opTimeAndWallTime, now);
|
||||
}
|
||||
|
||||
|
||||
@ -502,7 +502,8 @@ TransactionCoordinator::~TransactionCoordinator() {
|
||||
void TransactionCoordinator::runCommit(OperationContext* opCtx, std::vector<ShardId> participants) {
|
||||
if (!_reserveKickOffCommitPromise())
|
||||
return;
|
||||
invariant(opCtx != nullptr && opCtx->getClient() != nullptr);
|
||||
invariant(opCtx != nullptr);
|
||||
invariant(opCtx->getClient() != nullptr);
|
||||
_updateAssociatedClient(opCtx->getClient());
|
||||
_participants = std::move(participants);
|
||||
_kickOffCommitPromise.emplaceValue();
|
||||
|
||||
@ -275,7 +275,10 @@ void PrepareVoteConsensus::registerVote(const PrepareResponse& vote) {
|
||||
}
|
||||
|
||||
CoordinatorCommitDecision PrepareVoteConsensus::decision() const {
|
||||
invariant(_numShards == _numCommitVotes + _numAbortVotes + _numNoVotes);
|
||||
invariant(_numShards == _numCommitVotes + _numAbortVotes + _numNoVotes,
|
||||
str::stream() << "numShards: " << _numShards << ", numCommitVotes: "
|
||||
<< _numCommitVotes << ", numAbortVotes: " << _numAbortVotes
|
||||
<< ", numNoVotes: " << _numNoVotes);
|
||||
|
||||
CoordinatorCommitDecision decision;
|
||||
if (_numCommitVotes == _numShards) {
|
||||
|
||||
@ -75,7 +75,10 @@ SyncTransactionWithRetries::SyncTransactionWithRetries(
|
||||
std::make_unique<details::DefaultSEPTransactionClientBehaviors>(opCtx)))) {
|
||||
// Callers should always provide a yielder when using the API with a session checked out,
|
||||
// otherwise commands run by the API won't be able to check out that session.
|
||||
invariant(!OperationContextSession::get(opCtx) || _resourceYielder);
|
||||
invariant(!OperationContextSession::get(opCtx) || _resourceYielder,
|
||||
str::stream() << "session is not checked out by the opCtx: "
|
||||
<< !OperationContextSession::get(opCtx)
|
||||
<< ", yielder is not provided: " << !_resourceYielder);
|
||||
}
|
||||
|
||||
StatusWith<CommitResult> SyncTransactionWithRetries::runNoThrow(OperationContext* opCtx,
|
||||
|
||||
@ -323,8 +323,8 @@ Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitR
|
||||
int attemptCounter) const noexcept {
|
||||
stdx::lock_guard<stdx::mutex> lg(_mutex);
|
||||
// Errors aborting are always ignored.
|
||||
invariant(!_state.is(TransactionState::kNeedsCleanup) &&
|
||||
!_state.is(TransactionState::kStartedAbort));
|
||||
invariant(!_state.is(TransactionState::kNeedsCleanup));
|
||||
invariant(!_state.is(TransactionState::kStartedAbort));
|
||||
|
||||
LOGV2_DEBUG(5875905,
|
||||
3,
|
||||
|
||||
@ -332,7 +332,9 @@ std::size_t TransactionOperations::logOplogEntries(
|
||||
}
|
||||
|
||||
// A 'prepare' oplog entry should never include a 'partialTxn' field.
|
||||
invariant(!(isPartialTxn && implicitPrepare));
|
||||
invariant(!(isPartialTxn && implicitPrepare),
|
||||
str::stream() << "isPartialTxn: " << isPartialTxn
|
||||
<< ", implicitPrepare: " << implicitPrepare);
|
||||
if (implicitPrepare) {
|
||||
applyOpsBuilder.append("prepare", true);
|
||||
}
|
||||
|
||||
@ -826,12 +826,19 @@ bool TransactionParticipant::Participant::_shouldRestartTransactionOnReuseActive
|
||||
|
||||
// We should only call this function if the request is attempting to reuse the active txnNumber
|
||||
// and retryCounter
|
||||
invariant(
|
||||
txnNumberAndRetryCounter.getTxnNumber() ==
|
||||
o().activeTxnNumberAndRetryCounter.getTxnNumber() &&
|
||||
(txnNumberAndRetryCounter.getTxnRetryCounter() ==
|
||||
o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() ||
|
||||
o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() == kUninitializedTxnRetryCounter));
|
||||
invariant(txnNumberAndRetryCounter.getTxnNumber() ==
|
||||
o().activeTxnNumberAndRetryCounter.getTxnNumber(),
|
||||
str::stream() << "active transaction number: "
|
||||
<< o().activeTxnNumberAndRetryCounter.getTxnNumber()
|
||||
<< ", transaction number: " << txnNumberAndRetryCounter.getTxnNumber());
|
||||
invariant(txnNumberAndRetryCounter.getTxnRetryCounter() ==
|
||||
o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() ||
|
||||
o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() ==
|
||||
kUninitializedTxnRetryCounter,
|
||||
str::stream() << "active transaction retry counter: "
|
||||
<< o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()
|
||||
<< ", transaction retry counter: "
|
||||
<< txnNumberAndRetryCounter.getTxnRetryCounter());
|
||||
|
||||
if (serverGlobalParams.clusterRole.has(ClusterRole::None)) {
|
||||
// Servers in a sharded cluster can start a new transaction at the active transaction
|
||||
@ -1390,7 +1397,9 @@ TransactionParticipant::TxnResources::TxnResources(ClientLock& clientLock,
|
||||
|
||||
// On secondaries, max lock timeout must not be set.
|
||||
invariant(!(stashStyle == StashStyle::kSecondary &&
|
||||
shard_role_details::getLocker(opCtx)->hasMaxLockTimeout()));
|
||||
shard_role_details::getLocker(opCtx)->hasMaxLockTimeout()),
|
||||
str::stream() << "hasMaxLockTimeout: "
|
||||
<< shard_role_details::getLocker(opCtx)->hasMaxLockTimeout());
|
||||
|
||||
_recoveryUnit = shard_role_details::releaseAndReplaceRecoveryUnit(opCtx, clientLock);
|
||||
// The recovery unit is detached from the OperationContext, but keep the OperationContext in the
|
||||
@ -1556,7 +1565,10 @@ void TransactionParticipant::Participant::_stashActiveTransaction(OperationConte
|
||||
|
||||
invariant(!o().txnResourceStash);
|
||||
// If this is a prepared transaction, invariant that it does not hold the RSTL lock.
|
||||
invariant(!o().txnState.isPrepared() || !shard_role_details::getLocker(opCtx)->isRSTLLocked());
|
||||
invariant(!o().txnState.isPrepared() || !shard_role_details::getLocker(opCtx)->isRSTLLocked(),
|
||||
str::stream() << "transaction is prepared: " << o().txnState.isPrepared()
|
||||
<< ", isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked());
|
||||
auto stashStyle = opCtx->writesAreReplicated() ? TxnResources::StashStyle::kPrimary
|
||||
: TxnResources::StashStyle::kSecondary;
|
||||
o(lk).txnResourceStash = TxnResources(lk, opCtx, stashStyle);
|
||||
@ -1754,7 +1766,10 @@ void TransactionParticipant::Participant::unstashTransactionResources(
|
||||
|
||||
// On secondaries, max lock timeout must not be set.
|
||||
invariant(opCtx->writesAreReplicated() ||
|
||||
!shard_role_details::getLocker(opCtx)->hasMaxLockTimeout());
|
||||
!shard_role_details::getLocker(opCtx)->hasMaxLockTimeout(),
|
||||
str::stream() << "writes are replicated: " << opCtx->writesAreReplicated()
|
||||
<< ", hasMaxLockTimeout: "
|
||||
<< shard_role_details::getLocker(opCtx)->hasMaxLockTimeout());
|
||||
|
||||
// Storage engine transactions may be started in a lazy manner. By explicitly
|
||||
// starting here we ensure that a point-in-time snapshot is established during the
|
||||
@ -2004,8 +2019,9 @@ void TransactionParticipant::Participant::addTransactionOperation(
|
||||
}
|
||||
invariant(o().txnState.isInProgress(), str::stream() << "Current state: " << o().txnState);
|
||||
|
||||
invariant(p().autoCommit && !*p().autoCommit &&
|
||||
o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber);
|
||||
invariant(p().autoCommit);
|
||||
invariant(!*p().autoCommit);
|
||||
invariant(o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber);
|
||||
invariant(shard_role_details::getLocker(opCtx)->inAWriteUnitOfWork());
|
||||
|
||||
auto transactionSizeLimitBytes = static_cast<std::size_t>(gTransactionSizeLimitBytes.load());
|
||||
@ -2250,7 +2266,10 @@ void TransactionParticipant::Participant::_commitStorageTransaction(OperationCon
|
||||
bool isSplitPreparedTxn) {
|
||||
invariant(shard_role_details::getWriteUnitOfWork(opCtx));
|
||||
invariant(shard_role_details::getLocker(opCtx)->isRSTLLocked() || isSplitPreparedTxn ||
|
||||
gFeatureFlagIntentRegistration.isEnabled());
|
||||
gFeatureFlagIntentRegistration.isEnabled(),
|
||||
str::stream() << "isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked()
|
||||
<< ", is split prepared transaction: " << isSplitPreparedTxn);
|
||||
try {
|
||||
shard_role_details::getWriteUnitOfWork(opCtx)->commit();
|
||||
} catch (const ExceptionFor<ErrorCodes::WriteConflict>&) {
|
||||
@ -2406,7 +2425,9 @@ void TransactionParticipant::Participant::setLastWriteOpTime(OperationContext* o
|
||||
const repl::OpTime& lastWriteOpTime) {
|
||||
stdx::lock_guard<Client> lg(*opCtx->getClient());
|
||||
auto& curLastWriteOpTime = o(lg).lastWriteOpTime;
|
||||
invariant(lastWriteOpTime.isNull() || lastWriteOpTime > curLastWriteOpTime);
|
||||
invariant(lastWriteOpTime.isNull() || lastWriteOpTime > curLastWriteOpTime,
|
||||
str::stream() << "current lastWrite opTime: " << curLastWriteOpTime.toString()
|
||||
<< ", setting lastWrite opTime to: " << lastWriteOpTime.toString());
|
||||
curLastWriteOpTime = lastWriteOpTime;
|
||||
}
|
||||
|
||||
@ -2692,8 +2713,14 @@ void TransactionParticipant::Participant::_cleanUpTxnResourceOnOpCtx(
|
||||
// 2. We have failed trying to get the initial global lock, in which case we will have
|
||||
// a WriteUnitOfWork but not have allocated the storage transaction.
|
||||
invariant(shard_role_details::getLocker(opCtx)->isRSTLLocked() || isSplitPreparedTxn ||
|
||||
!shard_role_details::getRecoveryUnit(opCtx)->isActive() ||
|
||||
gFeatureFlagIntentRegistration.isEnabled());
|
||||
!shard_role_details::getRecoveryUnit(opCtx)->isActive() ||
|
||||
gFeatureFlagIntentRegistration.isEnabled(),
|
||||
str::stream() << "isRSTLLocked: "
|
||||
<< shard_role_details::getLocker(opCtx)->isRSTLLocked()
|
||||
<< ", is split prepared transaction: " << isSplitPreparedTxn
|
||||
<< ", recovery unit state: "
|
||||
<< RecoveryUnit::toString(
|
||||
shard_role_details::getRecoveryUnit(opCtx)->getState()));
|
||||
shard_role_details::setWriteUnitOfWork(opCtx, nullptr);
|
||||
}
|
||||
|
||||
@ -3454,7 +3481,8 @@ void TransactionParticipant::Participant::_resetRetryableWriteState(WithLock wl)
|
||||
|
||||
void TransactionParticipant::Participant::_resetTransactionStateAndUnlock(
|
||||
stdx::unique_lock<Client>* lk, OperationContext* opCtx, TransactionState::StateFlag state) {
|
||||
invariant(lk && lk->owns_lock());
|
||||
invariant(lk);
|
||||
invariant(lk->owns_lock());
|
||||
|
||||
// If we are transitioning to kNone, we are either starting a new transaction or aborting a
|
||||
// prepared transaction for rollback. In the latter case, we will need to relax the
|
||||
|
||||
Loading…
Reference in New Issue
Block a user