SERVER-103241 Resharding Coordinator DAO Handles Blocking Writes (#36623)
GitOrigin-RevId: 8b088058e5992a527f9ccc7ea1602baf2a984822
This commit is contained in:
parent
58de2372e5
commit
4164f376ba
@ -1281,17 +1281,22 @@ ExecutorFuture<void> ReshardingCoordinator::_awaitAllRecipientsFinishedApplying(
|
||||
}
|
||||
|
||||
// set the criticalSectionExpiresAt on the coordinator doc
|
||||
const auto now = (*executor)->now();
|
||||
const auto criticalSectionTimeout =
|
||||
Milliseconds(resharding::gReshardingCriticalSectionTimeoutMillis.load());
|
||||
const auto criticalSectionExpiresAt = (*executor)->now() + criticalSectionTimeout;
|
||||
const auto criticalSectionExpiresAt = now + criticalSectionTimeout;
|
||||
|
||||
ReshardingCoordinatorDocument updatedCSExpirationDoc = _coordinatorDoc;
|
||||
updatedCSExpirationDoc.setCriticalSectionExpiresAt(criticalSectionExpiresAt);
|
||||
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites,
|
||||
updatedCSExpirationDoc);
|
||||
|
||||
_metrics->setStartFor(ReshardingMetrics::TimedPhase::kCriticalSection,
|
||||
resharding::getCurrentTime());
|
||||
_updateCoordinatorDocStateAndCatalogEntries(
|
||||
[=, this](OperationContext* opCtx, resharding::DaoStorageClient* client) {
|
||||
auto updatedDocument = _coordinatorDao.transitionToBlockingWritesPhase(
|
||||
opCtx,
|
||||
client,
|
||||
now,
|
||||
criticalSectionExpiresAt,
|
||||
_coordinatorDoc.getReshardingUUID());
|
||||
_metrics->setStartFor(ReshardingMetrics::TimedPhase::kCriticalSection, now);
|
||||
return updatedDocument;
|
||||
});
|
||||
})
|
||||
.then([this] {
|
||||
return resharding::waitForMajority(_ctHolder->getAbortToken(),
|
||||
|
||||
@ -38,6 +38,7 @@
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
|
||||
|
||||
namespace mongo {
|
||||
namespace resharding {
|
||||
|
||||
namespace {
|
||||
BSONObj executeConfigRequest(OperationContext* opCtx, const BatchedCommandRequest& request) {
|
||||
@ -54,9 +55,22 @@ void verifyUpdateResult(const BatchedCommandRequest& request, const BSONObj& res
|
||||
<< numDocsMatched << " for write request " << request.toString(),
|
||||
1 == numDocsMatched);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace resharding {
|
||||
ReshardingCoordinatorDocument buildAndExecuteRequest(OperationContext* opCtx,
|
||||
DaoStorageClient* client,
|
||||
const UUID& reshardingUUID,
|
||||
BSONObjBuilder& bob) {
|
||||
auto request =
|
||||
BatchedCommandRequest::buildUpdateOp(NamespaceString::kConfigReshardingOperationsNamespace,
|
||||
BSON("_id" << reshardingUUID),
|
||||
bob.obj(),
|
||||
false, // upsert
|
||||
false // multi
|
||||
);
|
||||
client->alterState(opCtx, request);
|
||||
return client->readState(opCtx, reshardingUUID);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void DaoStorageClientImpl::alterState(OperationContext* opCtx,
|
||||
const BatchedCommandRequest& request) {
|
||||
@ -131,16 +145,35 @@ ReshardingCoordinatorDocument ReshardingCoordinatorDao::transitionToCloningPhase
|
||||
now);
|
||||
}
|
||||
|
||||
auto updateRequest =
|
||||
BatchedCommandRequest::buildUpdateOp(NamespaceString::kConfigReshardingOperationsNamespace,
|
||||
BSON("_id" << reshardingUUID),
|
||||
updateBuilder.obj(),
|
||||
false, // upsert
|
||||
false // multi
|
||||
);
|
||||
client->alterState(opCtx, updateRequest);
|
||||
return buildAndExecuteRequest(opCtx, client, reshardingUUID, updateBuilder);
|
||||
}
|
||||
|
||||
return client->readState(opCtx, reshardingUUID);
|
||||
ReshardingCoordinatorDocument ReshardingCoordinatorDao::transitionToBlockingWritesPhase(
|
||||
OperationContext* opCtx,
|
||||
DaoStorageClient* client,
|
||||
Date_t now,
|
||||
Date_t criticalSectionExpireTime,
|
||||
const UUID& reshardingUUID) {
|
||||
|
||||
auto doc = client->readState(opCtx, reshardingUUID);
|
||||
invariant(doc.getState() == CoordinatorStateEnum::kApplying);
|
||||
|
||||
BSONObjBuilder updateBuilder;
|
||||
{
|
||||
BSONObjBuilder setBuilder(updateBuilder.subobjStart("$set"));
|
||||
|
||||
setBuilder.append(ReshardingCoordinatorDocument::kStateFieldName,
|
||||
CoordinatorState_serializer(CoordinatorStateEnum::kBlockingWrites));
|
||||
|
||||
setBuilder.append(getIntervalEndFieldName<ReshardingCoordinatorDocument>(
|
||||
ReshardingRecipientMetrics::kOplogApplicationFieldName),
|
||||
now);
|
||||
|
||||
setBuilder.append(ReshardingCoordinatorDocument::kCriticalSectionExpiresAtFieldName,
|
||||
criticalSectionExpireTime);
|
||||
}
|
||||
|
||||
return buildAndExecuteRequest(opCtx, client, reshardingUUID, updateBuilder);
|
||||
}
|
||||
|
||||
ReshardingCoordinatorDocument ReshardingCoordinatorDao::transitionToApplyingPhase(
|
||||
@ -165,16 +198,7 @@ ReshardingCoordinatorDocument ReshardingCoordinatorDao::transitionToApplyingPhas
|
||||
now);
|
||||
}
|
||||
|
||||
auto updateRequest =
|
||||
BatchedCommandRequest::buildUpdateOp(NamespaceString::kConfigReshardingOperationsNamespace,
|
||||
BSON("_id" << reshardingUUID),
|
||||
updateBuilder.obj(),
|
||||
false, // upsert
|
||||
false // multi
|
||||
);
|
||||
client->alterState(opCtx, updateRequest);
|
||||
|
||||
return client->readState(opCtx, reshardingUUID);
|
||||
return buildAndExecuteRequest(opCtx, client, reshardingUUID, updateBuilder);
|
||||
}
|
||||
|
||||
} // namespace resharding
|
||||
|
||||
@ -78,6 +78,12 @@ public:
|
||||
DaoStorageClient* client,
|
||||
Date_t now,
|
||||
const UUID& reshardingUUID);
|
||||
|
||||
ReshardingCoordinatorDocument transitionToBlockingWritesPhase(OperationContext* opCtx,
|
||||
DaoStorageClient* client,
|
||||
Date_t now,
|
||||
Date_t criticalSectionExpireTime,
|
||||
const UUID& reshardingUUID);
|
||||
};
|
||||
|
||||
} // namespace resharding
|
||||
|
||||
@ -157,5 +157,46 @@ DEATH_TEST(ReshardingCoordinatorDaoTest,
|
||||
dao.transitionToApplyingPhase(opCtx, &updater, applyStartTime, uuid);
|
||||
}
|
||||
|
||||
TEST(ReshardingCoordinatorDaoTest, TransitionToBlockingWritesPhase) {
|
||||
ClockSourceMock clock;
|
||||
SpyingDocumentUpdater updater;
|
||||
updater.getOnDiskStateForModification().setState(CoordinatorStateEnum::kApplying);
|
||||
ReshardingCoordinatorDao dao;
|
||||
OperationContext* opCtx = nullptr;
|
||||
|
||||
auto uuid = UUID::gen();
|
||||
auto now = clock.now();
|
||||
auto criticalSectionExpiresAt = now + Seconds(5);
|
||||
dao.transitionToBlockingWritesPhase(opCtx, &updater, now, criticalSectionExpiresAt, uuid);
|
||||
const auto& lastRequest = updater.getLastRequest();
|
||||
|
||||
auto expectedUpdates = BSON_ARRAY(BSON(
|
||||
"q" << BSON("_id" << uuid) << "u"
|
||||
<< BSON("$set" << BSON("state" << "blocking-writes"
|
||||
<< "criticalSectionExpiresAt" << criticalSectionExpiresAt
|
||||
<< "metrics.oplogApplication.stop" << now))
|
||||
<< "multi" << false << "upsert" << false));
|
||||
|
||||
ASSERT_EQUALS(lastRequest.getStringField("update"),
|
||||
NamespaceString::kConfigReshardingOperationsNamespace.coll());
|
||||
auto updates = lastRequest.getObjectField("updates");
|
||||
ASSERT_BSONOBJ_EQ_UNORDERED(updates, expectedUpdates);
|
||||
}
|
||||
|
||||
DEATH_TEST(ReshardingCoordinatorDaoTest,
|
||||
TransitionToBlockingWritesPhasePreviousStateInvariant,
|
||||
"invariant") {
|
||||
ClockSourceMock clock;
|
||||
SpyingDocumentUpdater updater;
|
||||
updater.getOnDiskStateForModification().setState(CoordinatorStateEnum::kAborting);
|
||||
ReshardingCoordinatorDao dao;
|
||||
OperationContext* opCtx = nullptr;
|
||||
|
||||
auto uuid = UUID::gen();
|
||||
auto now = clock.now();
|
||||
auto criticalSectionExpiresAt = now + Seconds(5);
|
||||
dao.transitionToBlockingWritesPhase(opCtx, &updater, now, criticalSectionExpiresAt, uuid);
|
||||
}
|
||||
|
||||
} // namespace resharding
|
||||
} // namespace mongo
|
||||
|
||||
Loading…
Reference in New Issue
Block a user