SERVER-126474: Move PDIB ident drop registration into onCommit (#53725)
GitOrigin-RevId: c462dbbda23a55b5ebf85c63d2bf3538a5f30555
This commit is contained in:
parent
229a1c0755
commit
4f35bda394
@ -44,7 +44,6 @@ mongo_cc_unit_test(
|
||||
deps = [
|
||||
":primary_driven_index_builds",
|
||||
"//src/mongo/db/collection_crud:container_write",
|
||||
"//src/mongo/db/repl:timestamp_block",
|
||||
"//src/mongo/db/shard_role/shard_catalog:catalog_test_fixture",
|
||||
],
|
||||
)
|
||||
|
||||
@ -54,6 +54,45 @@ namespace {
|
||||
|
||||
auto _registry = ServiceContext::declareDecoration<Registry>();
|
||||
|
||||
std::vector<std::string> getIndexBuildIdents(const std::vector<IndexBuildInfo>& indexes,
|
||||
const boost::optional<std::string>& indexBuildIdent) {
|
||||
std::vector<std::string> idents;
|
||||
for (auto& index : indexes) {
|
||||
if (index.sorterIdent) {
|
||||
idents.push_back(*index.sorterIdent);
|
||||
}
|
||||
if (index.sideWritesIdent) {
|
||||
idents.push_back(*index.sideWritesIdent);
|
||||
}
|
||||
if (index.skippedRecordsIdent) {
|
||||
idents.push_back(*index.skippedRecordsIdent);
|
||||
}
|
||||
if (index.constraintViolationsIdent) {
|
||||
idents.push_back(*index.constraintViolationsIdent);
|
||||
}
|
||||
}
|
||||
if (indexBuildIdent) {
|
||||
idents.push_back(*indexBuildIdent);
|
||||
}
|
||||
return idents;
|
||||
}
|
||||
|
||||
void dropIdentsAndDeregisterOnCommit(OperationContext* opCtx,
|
||||
const UUID& buildUUID,
|
||||
std::vector<std::string> idents) {
|
||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||
[buildUUID, idents = std::move(idents)](OperationContext* opCtx,
|
||||
boost::optional<Timestamp> commitTs) {
|
||||
invariant(commitTs && !commitTs->isNull());
|
||||
auto dropTime = StorageEngine::DropTime{StorageEngine::StableTimestamp{*commitTs}};
|
||||
auto* storageEngine = opCtx->getServiceContext()->getStorageEngine();
|
||||
for (auto& ident : idents) {
|
||||
storageEngine->addDropPendingIdent(dropTime, std::make_shared<Ident>(ident));
|
||||
}
|
||||
_registry(opCtx->getServiceContext()).remove(buildUUID);
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<boost::optional<BSONObj>> multikeyPathsToObjs(
|
||||
const std::vector<IndexBuildInfo>& indexes,
|
||||
const std::vector<boost::optional<MultikeyPaths>>& multikeyPaths) {
|
||||
@ -163,21 +202,12 @@ Status commit(OperationContext* opCtx,
|
||||
WriteUnitOfWork wuow{opCtx};
|
||||
auto writableColl = writer.getWritableCollection(opCtx);
|
||||
|
||||
auto commitTs = shard_role_details::getRecoveryUnit(opCtx)->getCommitTimestamp();
|
||||
StorageEngine::DropTime dropTime = !commitTs.isNull()
|
||||
? StorageEngine::DropTime{StorageEngine::StableTimestamp{commitTs}}
|
||||
: StorageEngine::DropTime{StorageEngine::Immediate{}};
|
||||
|
||||
for (size_t i = 0; i < indexes.size(); ++i) {
|
||||
auto&& index = indexes[i];
|
||||
|
||||
auto entry = writableColl->getIndexCatalog()->getWritableEntryByName(
|
||||
opCtx, index.getIndexName(), IndexCatalog::InclusionPolicy::kUnfinished);
|
||||
|
||||
IndexBuildInterceptor interceptor{
|
||||
opCtx, index, LazyRecordStore::CreateMode::openExisting, entry->descriptor()->unique()};
|
||||
interceptor.dropTemporaryTables(opCtx, dropTime);
|
||||
|
||||
writableColl->indexBuildSuccess(opCtx, entry);
|
||||
if (multikey[i]) {
|
||||
entry->setMultikey(opCtx, writer.get(), {}, *multikey[i]);
|
||||
@ -221,11 +251,8 @@ Status commit(OperationContext* opCtx,
|
||||
}
|
||||
}
|
||||
|
||||
if (indexBuildIdent) {
|
||||
opCtx->getServiceContext()->getStorageEngine()->addDropPendingIdent(
|
||||
dropTime, std::make_shared<Ident>(*indexBuildIdent));
|
||||
}
|
||||
|
||||
dropIdentsAndDeregisterOnCommit(
|
||||
opCtx, buildUUID, getIndexBuildIdents(indexes, indexBuildIdent));
|
||||
opCtx->getServiceContext()->getOpObserver()->onCommitIndexBuild(
|
||||
opCtx,
|
||||
coll.nss(),
|
||||
@ -234,11 +261,6 @@ Status commit(OperationContext* opCtx,
|
||||
indexes,
|
||||
multikeyPathsToObjs(indexes, multikey),
|
||||
/*fromMigrate=*/false);
|
||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||
[buildUUID](OperationContext* opCtx, boost::optional<Timestamp>) {
|
||||
_registry(opCtx->getServiceContext()).remove(buildUUID);
|
||||
});
|
||||
|
||||
wuow.commit();
|
||||
return Status::OK();
|
||||
}
|
||||
@ -259,19 +281,10 @@ Status abort(OperationContext* opCtx,
|
||||
WriteUnitOfWork wuow{opCtx};
|
||||
auto writableColl = writer.getWritableCollection(opCtx);
|
||||
|
||||
auto commitTs = shard_role_details::getRecoveryUnit(opCtx)->getCommitTimestamp();
|
||||
StorageEngine::DropTime dropTime = !commitTs.isNull()
|
||||
? StorageEngine::DropTime{StorageEngine::StableTimestamp{commitTs}}
|
||||
: StorageEngine::DropTime{StorageEngine::Immediate{}};
|
||||
|
||||
for (auto&& index : indexes) {
|
||||
auto entry = writableColl->getIndexCatalog()->getWritableEntryByName(
|
||||
opCtx, index.getIndexName(), IndexCatalog::InclusionPolicy::kUnfinished);
|
||||
|
||||
IndexBuildInterceptor interceptor{
|
||||
opCtx, index, LazyRecordStore::CreateMode::openExisting, entry->descriptor()->unique()};
|
||||
interceptor.dropTemporaryTables(opCtx, dropTime);
|
||||
|
||||
auto status = writableColl->getIndexCatalog()->dropIndexEntry(opCtx, writableColl, entry);
|
||||
if (!status.isOK()) {
|
||||
return status;
|
||||
@ -285,11 +298,8 @@ Status abort(OperationContext* opCtx,
|
||||
ErrorCodes::IndexBuildAborted);
|
||||
}
|
||||
|
||||
if (indexBuildIdent) {
|
||||
opCtx->getServiceContext()->getStorageEngine()->addDropPendingIdent(
|
||||
dropTime, std::make_shared<Ident>(*indexBuildIdent));
|
||||
}
|
||||
|
||||
dropIdentsAndDeregisterOnCommit(
|
||||
opCtx, buildUUID, getIndexBuildIdents(indexes, indexBuildIdent));
|
||||
opCtx->getServiceContext()->getOpObserver()->onAbortIndexBuild(opCtx,
|
||||
coll.nss(),
|
||||
collectionUUID,
|
||||
@ -297,11 +307,6 @@ Status abort(OperationContext* opCtx,
|
||||
indexes,
|
||||
cause,
|
||||
/*fromMigrate=*/false);
|
||||
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
|
||||
[buildUUID](OperationContext* opCtx, boost::optional<Timestamp>) {
|
||||
_registry(opCtx->getServiceContext()).remove(buildUUID);
|
||||
});
|
||||
|
||||
wuow.commit();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -36,7 +36,6 @@
|
||||
#include "mongo/db/op_observer/op_observer_noop.h"
|
||||
#include "mongo/db/query/collection_index_usage_tracker_decoration.h"
|
||||
#include "mongo/db/repl/replication_coordinator_mock.h"
|
||||
#include "mongo/db/repl/timestamp_block.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/catalog_test_fixture.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/index_catalog.h"
|
||||
@ -110,6 +109,9 @@ public:
|
||||
const std::vector<boost::optional<BSONObj>>& multikey,
|
||||
bool fromMigrate,
|
||||
bool isTimeseries) override {
|
||||
if (throwOnCommit) {
|
||||
uasserted(ErrorCodes::InterruptedDueToReplStateChange, "simulated stepdown");
|
||||
}
|
||||
lastCommitArgs = CommitArgs{.ns = ns,
|
||||
.collUUID = collUUID,
|
||||
.buildUUID = buildUUID,
|
||||
@ -127,6 +129,9 @@ public:
|
||||
const Status& cause,
|
||||
bool fromMigrate,
|
||||
bool isTimeseries) override {
|
||||
if (throwOnAbort) {
|
||||
uasserted(ErrorCodes::InterruptedDueToReplStateChange, "simulated stepdown");
|
||||
}
|
||||
lastAbortArgs = AbortArgs{.ns = ns,
|
||||
.collUUID = collUUID,
|
||||
.buildUUID = buildUUID,
|
||||
@ -136,6 +141,9 @@ public:
|
||||
.isTimeseries = isTimeseries};
|
||||
}
|
||||
|
||||
bool throwOnAbort = false;
|
||||
bool throwOnCommit = false;
|
||||
|
||||
boost::optional<StartArgs> lastStartArgs;
|
||||
boost::optional<CommitArgs> lastCommitArgs;
|
||||
boost::optional<AbortArgs> lastAbortArgs;
|
||||
@ -260,23 +268,21 @@ TEST_F(UtilTest, Commit) {
|
||||
const auto indexBuildIdent = ident::generateNewIndexBuildIdent(buildUUID);
|
||||
ASSERT_OK(
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
const Timestamp commitTs(1, 0);
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(commitTs);
|
||||
ASSERT_OK(commit(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, multikey, indexBuildIdent));
|
||||
|
||||
const Timestamp dropTs(commitTs.getSecs() + 1, 0);
|
||||
for (auto&& index : indexes) {
|
||||
auto& engine = *operationContext()->getServiceContext()->getStorageEngine();
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(), index.indexIdent));
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(), *index.sorterIdent));
|
||||
ASSERT_OK(
|
||||
engine.immediatelyCompletePendingDrop(operationContext(), *index.sideWritesIdent));
|
||||
ASSERT_OK(
|
||||
engine.immediatelyCompletePendingDrop(operationContext(), *index.skippedRecordsIdent));
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(),
|
||||
*index.constraintViolationsIdent));
|
||||
engine.dropIdentTimestamped(operationContext(), *index.sorterIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.sideWritesIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.skippedRecordsIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.constraintViolationsIdent, dropTs);
|
||||
}
|
||||
ASSERT_OK(
|
||||
operationContext()->getServiceContext()->getStorageEngine()->immediatelyCompletePendingDrop(
|
||||
operationContext(), indexBuildIdent));
|
||||
operationContext()->getServiceContext()->getStorageEngine()->dropIdentTimestamped(
|
||||
operationContext(), indexBuildIdent, dropTs);
|
||||
|
||||
auto coll = acquireCollectionMaybeLockFree(
|
||||
operationContext(),
|
||||
@ -339,23 +345,22 @@ TEST_F(UtilTest, Abort) {
|
||||
const auto indexBuildIdent = ident::generateNewIndexBuildIdent(buildUUID);
|
||||
ASSERT_OK(
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
const Timestamp commitTs(1, 0);
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(commitTs);
|
||||
ASSERT_OK(abort(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent, cause));
|
||||
|
||||
const Timestamp dropTs(commitTs.getSecs() + 1, 0);
|
||||
for (auto&& index : indexes) {
|
||||
auto& engine = *operationContext()->getServiceContext()->getStorageEngine();
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(), index.indexIdent));
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(), *index.sorterIdent));
|
||||
ASSERT_OK(
|
||||
engine.immediatelyCompletePendingDrop(operationContext(), *index.sideWritesIdent));
|
||||
ASSERT_OK(
|
||||
engine.immediatelyCompletePendingDrop(operationContext(), *index.skippedRecordsIdent));
|
||||
ASSERT_OK(engine.immediatelyCompletePendingDrop(operationContext(),
|
||||
*index.constraintViolationsIdent));
|
||||
engine.dropIdentTimestamped(operationContext(), index.indexIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.sorterIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.sideWritesIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.skippedRecordsIdent, dropTs);
|
||||
engine.dropIdentTimestamped(operationContext(), *index.constraintViolationsIdent, dropTs);
|
||||
}
|
||||
ASSERT_OK(
|
||||
operationContext()->getServiceContext()->getStorageEngine()->immediatelyCompletePendingDrop(
|
||||
operationContext(), indexBuildIdent));
|
||||
operationContext()->getServiceContext()->getStorageEngine()->dropIdentTimestamped(
|
||||
operationContext(), indexBuildIdent, dropTs);
|
||||
|
||||
auto coll = acquireCollectionMaybeLockFree(
|
||||
operationContext(),
|
||||
@ -400,16 +405,9 @@ TEST_F(UtilTest, CommitUsesCommitTimestampForTemporaryTableDrops) {
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
|
||||
const Timestamp commitTs(200, 0);
|
||||
{
|
||||
TimestampBlock tsBlock(operationContext(), commitTs);
|
||||
ASSERT_OK(commit(operationContext(),
|
||||
ns.dbName(),
|
||||
collUUID,
|
||||
buildUUID,
|
||||
indexes,
|
||||
multikey,
|
||||
indexBuildIdent));
|
||||
}
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(commitTs);
|
||||
ASSERT_OK(commit(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, multikey, indexBuildIdent));
|
||||
|
||||
auto storageEngine = operationContext()->getServiceContext()->getStorageEngine();
|
||||
for (auto&& index : indexes) {
|
||||
@ -435,11 +433,9 @@ TEST_F(UtilTest, AbortUsesCommitTimestampForTemporaryTableDrops) {
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
|
||||
const Timestamp commitTs(300, 0);
|
||||
{
|
||||
TimestampBlock tsBlock(operationContext(), commitTs);
|
||||
ASSERT_OK(abort(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent, cause));
|
||||
}
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(commitTs);
|
||||
ASSERT_OK(abort(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent, cause));
|
||||
|
||||
auto storageEngine = operationContext()->getServiceContext()->getStorageEngine();
|
||||
for (auto&& index : indexes) {
|
||||
@ -453,23 +449,63 @@ TEST_F(UtilTest, AbortUsesCommitTimestampForTemporaryTableDrops) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(UtilTest, AbortWithNoCommitTimestampDropsImmediately) {
|
||||
|
||||
TEST_F(UtilTest, AbortWUOWRollbackAllowsRetry) {
|
||||
auto buildUUID = UUID::gen();
|
||||
auto indexes = makeIndexes({"a"});
|
||||
auto indexBuildIdent = ident::generateNewIndexBuildIdent(buildUUID);
|
||||
Status cause{ErrorCodes::Error{11130403}, "abort"};
|
||||
const Status cause{ErrorCodes::Error{11130404}, "abort"};
|
||||
|
||||
ASSERT_OK(
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
|
||||
// Throw from the OpObserver to roll back the WUoW after dropIdentsAndDeregisterOnCommit
|
||||
// has registered its onCommit handler but before wuow.commit().
|
||||
opObserver().throwOnAbort = true;
|
||||
ASSERT_THROWS_CODE(
|
||||
abort(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent, cause),
|
||||
DBException,
|
||||
ErrorCodes::InterruptedDueToReplStateChange);
|
||||
opObserver().throwOnAbort = false;
|
||||
|
||||
ASSERT_FALSE(registry(getServiceContext()).all().empty());
|
||||
EXPECT_EQ(getServiceContext()->getStorageEngine()->getNumDropPendingIdents(), 0U);
|
||||
// No mangled state.
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(Timestamp(1, 0));
|
||||
ASSERT_OK(abort(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent, cause));
|
||||
}
|
||||
|
||||
auto& engine = *operationContext()->getServiceContext()->getStorageEngine();
|
||||
for (auto&& index : indexes) {
|
||||
// Without a commit timestamp, the drop is registered as Immediate.
|
||||
ASSERT_OK(
|
||||
engine.immediatelyCompletePendingDrop(operationContext(), *index.sideWritesIdent));
|
||||
}
|
||||
TEST_F(UtilTest, CommitWUOWRollbackAllowsRetry) {
|
||||
auto buildUUID = UUID::gen();
|
||||
auto indexes = makeIndexes({"a"});
|
||||
auto indexBuildIdent = ident::generateNewIndexBuildIdent(buildUUID);
|
||||
std::vector<boost::optional<MultikeyPaths>> multikey(indexes.size());
|
||||
|
||||
ASSERT_OK(
|
||||
start(operationContext(), ns.dbName(), collUUID, buildUUID, indexes, indexBuildIdent));
|
||||
|
||||
// Throw from the OpObserver to roll back the WUoW after dropIdentsAndDeregisterOnCommit
|
||||
// has registered its onCommit handler but before wuow.commit().
|
||||
opObserver().throwOnCommit = true;
|
||||
ASSERT_THROWS_CODE(commit(operationContext(),
|
||||
ns.dbName(),
|
||||
collUUID,
|
||||
buildUUID,
|
||||
indexes,
|
||||
multikey,
|
||||
indexBuildIdent),
|
||||
DBException,
|
||||
ErrorCodes::InterruptedDueToReplStateChange);
|
||||
opObserver().throwOnCommit = false;
|
||||
|
||||
ASSERT_FALSE(registry(getServiceContext()).all().empty());
|
||||
EXPECT_EQ(getServiceContext()->getStorageEngine()->getNumDropPendingIdents(), 0U);
|
||||
// No mangled state.
|
||||
shard_role_details::getRecoveryUnit(operationContext())->setCommitTimestamp(Timestamp(1, 0));
|
||||
ASSERT_OK(commit(
|
||||
operationContext(), ns.dbName(), collUUID, buildUUID, indexes, multikey, indexBuildIdent));
|
||||
}
|
||||
|
||||
TEST_F(UtilTest, ResumeInfoRequiresValidIdent) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user