SERVER-108256 Fix untracked secondary collections potentially being considered as local on non db-primary shards (#44527)
GitOrigin-RevId: 19a8975cd5cc88a2709161ba781c0d7ae3a19afd
This commit is contained in:
parent
3145debd5e
commit
cb23179906
@ -0,0 +1,229 @@
|
||||
/**
|
||||
* Tests $lookup with untracked-unsharded, tracked-unsharded, and sharded collections while
|
||||
* movePrimary operations are occurring on the database. On suites with random balancer enabled,
|
||||
* additionally moveChunk and moveCollection operations are occurring.
|
||||
*
|
||||
* @tags: [requires_sharding]
|
||||
*/
|
||||
import {interruptedQueryErrors} from "jstests/concurrency/fsm_libs/assert.js";
|
||||
|
||||
export const $config = (function() {
|
||||
const data = {numDocs: 100};
|
||||
|
||||
function retryOnInterruptedQueryError(fn, numRetries, sleepMs) {
|
||||
for (let i = 0; i < numRetries; ++i) {
|
||||
try {
|
||||
return fn();
|
||||
} catch (e) {
|
||||
if (interruptedQueryErrors.includes(e.code)) {
|
||||
jsTestLog(`Caught interrupted query error ${e.code}. Retrying ${i + 1}/${
|
||||
numRetries}...`);
|
||||
sleep(sleepMs);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function runAggWithRetries(db, collName, pipeline) {
|
||||
return retryOnInterruptedQueryError(
|
||||
() => {
|
||||
return db[collName].aggregate(pipeline).toArray();
|
||||
},
|
||||
10 /* numRetries */,
|
||||
10 /* sleepMs */,
|
||||
);
|
||||
}
|
||||
|
||||
const states = (function() {
|
||||
function lookupTwoCollections(db, _) {
|
||||
const results = runAggWithRetries(db, this.localCollName, [
|
||||
{
|
||||
$match: {x: Random.randInt(this.numDocs)},
|
||||
},
|
||||
{
|
||||
$lookup: {
|
||||
from: this.foreignColl1Name,
|
||||
localField: "_id",
|
||||
foreignField: "_id",
|
||||
as: "out",
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
assert.eq(results.length, 1, results);
|
||||
const resultDoc = results[0];
|
||||
assert.eq(resultDoc.out.length, 1, results);
|
||||
assert.eq(resultDoc._id, resultDoc.out[0]._id, results);
|
||||
assert.eq(resultDoc.x, resultDoc.out[0].y, results);
|
||||
}
|
||||
|
||||
function lookupThreeCollections(db, _) {
|
||||
const x_val = Random.randInt(this.numDocs);
|
||||
const results = runAggWithRetries(db, this.localCollName, [
|
||||
{$match: {x: x_val}},
|
||||
{
|
||||
$lookup: {
|
||||
from: this.foreignColl1Name,
|
||||
localField: "x",
|
||||
foreignField: "y",
|
||||
as: "j",
|
||||
},
|
||||
},
|
||||
{$unwind: "$j"},
|
||||
{
|
||||
$lookup: {
|
||||
from: this.foreignColl2Name,
|
||||
localField: "j.y",
|
||||
foreignField: "z",
|
||||
as: "j.j2",
|
||||
},
|
||||
},
|
||||
{$unwind: "$j.j2"},
|
||||
]);
|
||||
|
||||
assert.eq(results.length, 1, results);
|
||||
const resultDoc = results[0];
|
||||
assert.eq(resultDoc.x, x_val, results);
|
||||
assert.eq(resultDoc.j.y, x_val, results);
|
||||
assert.eq(resultDoc.j.j2.z, x_val, results);
|
||||
}
|
||||
|
||||
function movePrimary(db, _) {
|
||||
// Let only one thread do movePrimary to avoid threads stalling behind each other.
|
||||
if (this.tid !== 0) {
|
||||
jsTestLog("Skipping movePrimary on thread " + this.threadId);
|
||||
return;
|
||||
}
|
||||
|
||||
const toShard = this.shards[Random.randInt(this.shards.length)];
|
||||
|
||||
jsTestLog("Executing movePrimary to shard: " + toShard);
|
||||
retryOnRetryableError(
|
||||
() => {
|
||||
assert.commandWorked(db.adminCommand({movePrimary: db.getName(), to: toShard}));
|
||||
},
|
||||
10 /* numRetries */,
|
||||
100 /* sleepMs */,
|
||||
[
|
||||
// The cloning phase has failed (e.g. as a result of a stepdown). When a failure
|
||||
// occurs at this phase, the movePrimary operation does not recover.
|
||||
7120202,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
return {lookupTwoCollections, lookupThreeCollections, movePrimary};
|
||||
})();
|
||||
|
||||
const transitions = {
|
||||
lookupTwoCollections: {
|
||||
lookupTwoCollections: 0.45,
|
||||
lookupThreeCollections: 0.45,
|
||||
movePrimary: 0.1,
|
||||
},
|
||||
lookupThreeCollections: {
|
||||
lookupTwoCollections: 0.45,
|
||||
lookupThreeCollections: 0.45,
|
||||
movePrimary: 0.1,
|
||||
},
|
||||
movePrimary: {
|
||||
lookupTwoCollections: 0.45,
|
||||
lookupThreeCollections: 0.45,
|
||||
movePrimary: 0.1,
|
||||
},
|
||||
};
|
||||
|
||||
function setup(db, _, cluster) {
|
||||
this.shards = Object.keys(cluster.getSerializedCluster().shards);
|
||||
const shards = this.shards;
|
||||
|
||||
this.localCollName = "localColl";
|
||||
this.foreignColl1Name = "foreignColl1";
|
||||
this.foreignColl2Name = "foreignColl2";
|
||||
|
||||
function createShardedCollection(collName) {
|
||||
assert.commandWorked(db.adminCommand(
|
||||
{shardCollection: db[collName].getFullName(), key: {_id: "hashed"}}));
|
||||
}
|
||||
|
||||
function createTrackedCollection(collName) {
|
||||
db.createCollection(collName);
|
||||
// move collection to a random shard
|
||||
const toShard = shards[Random.randInt(shards.length)];
|
||||
jsTestLog("Creating tracked collection: " + collName + " on shard " + toShard);
|
||||
assert.soonRetryOnAcceptableErrors(() => {
|
||||
assert.commandWorked(db.adminCommand(
|
||||
{moveCollection: db.getName() + "." + collName, toShard: toShard}));
|
||||
return true;
|
||||
}, [ErrorCodes.ConflictingOperationInProgress, ErrorCodes.ReshardCollectionInProgress]);
|
||||
}
|
||||
|
||||
function createRandomCollectionType(collName) {
|
||||
const randType = Random.randInt(3);
|
||||
if (randType === 0) {
|
||||
jsTestLog("Creating sharded collection: " + collName);
|
||||
createShardedCollection(collName);
|
||||
} else if (randType === 1) {
|
||||
createTrackedCollection(collName);
|
||||
} else {
|
||||
jsTestLog("Creating untracked collection: " + collName);
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
// Load local collection data.
|
||||
{
|
||||
createRandomCollectionType(this.localCollName);
|
||||
|
||||
const bulk = db[this.localCollName].initializeUnorderedBulkOp();
|
||||
for (let i = 0; i < this.numDocs; ++i) {
|
||||
bulk.insert({_id: i, x: i});
|
||||
}
|
||||
const res = bulk.execute();
|
||||
assert.commandWorked(res);
|
||||
assert.eq(this.numDocs, res.nInserted);
|
||||
}
|
||||
|
||||
// Load foreign collection data.
|
||||
{
|
||||
createRandomCollectionType(this.foreignColl1Name);
|
||||
|
||||
const bulk = db[this.foreignColl1Name].initializeUnorderedBulkOp();
|
||||
for (let i = 0; i < this.numDocs; ++i) {
|
||||
bulk.insert({_id: i, y: i});
|
||||
}
|
||||
const res = bulk.execute();
|
||||
assert.commandWorked(res);
|
||||
assert.eq(this.numDocs, res.nInserted);
|
||||
}
|
||||
|
||||
// Load foreign collection 2 data.
|
||||
{
|
||||
createRandomCollectionType(this.foreignColl2Name);
|
||||
|
||||
const bulk = db[this.foreignColl2Name].initializeUnorderedBulkOp();
|
||||
for (let i = 0; i < this.numDocs; ++i) {
|
||||
bulk.insert({_id: i, z: i});
|
||||
}
|
||||
const res = bulk.execute();
|
||||
assert.commandWorked(res);
|
||||
assert.eq(this.numDocs, res.nInserted);
|
||||
}
|
||||
}
|
||||
|
||||
function teardown(db, collName, cluster) {
|
||||
}
|
||||
|
||||
return {
|
||||
threadCount: 5,
|
||||
iterations: 50,
|
||||
states: states,
|
||||
startState: "lookupTwoCollections",
|
||||
transitions: transitions,
|
||||
data: data,
|
||||
setup: setup,
|
||||
teardown: teardown,
|
||||
};
|
||||
})();
|
||||
@ -488,7 +488,7 @@ if (!isAuthoritativeShardEnabled) {
|
||||
// collection is unsharded.
|
||||
toplevelExec: [true, false],
|
||||
subPipelineLocal: [true, false],
|
||||
subPipelineRemote: [true, false],
|
||||
subPipelineRemote: [false, false],
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -1598,6 +1598,20 @@ mongo_cc_unit_test(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "initialize_auto_get_helper_test",
|
||||
srcs = [
|
||||
"initialize_auto_get_helper_test.cpp",
|
||||
],
|
||||
tags = ["mongo_unittest_eighth_group"],
|
||||
deps = [
|
||||
"//src/mongo/db:shard_role_api",
|
||||
"//src/mongo/db/s:shard_server_test_fixture",
|
||||
"//src/mongo/db/s:sharding_runtime_d",
|
||||
"//src/mongo/s:sharding_router_api",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_benchmark(
|
||||
name = "document_source_group_bm",
|
||||
srcs = [
|
||||
|
||||
@ -183,7 +183,7 @@ filters:
|
||||
- "javascript_execution*":
|
||||
approvers:
|
||||
- 10gen/query-integration-extensions
|
||||
- "initialize_auto_get_helper.h":
|
||||
- "initialize_auto_get_helper*":
|
||||
approvers:
|
||||
- 10gen/query-execution-classic
|
||||
- "lookup_set_cache*":
|
||||
|
||||
@ -47,6 +47,13 @@ inline std::vector<ScopedSetShardRole> createScopedShardRoles(
|
||||
std::vector<ScopedSetShardRole> scopedShardRoles;
|
||||
scopedShardRoles.reserve(nssList.size());
|
||||
const auto myShardId = ShardingState::get(opCtx)->shardId();
|
||||
const auto placementConflictTime = [&] {
|
||||
const auto txnRouter = TransactionRouter::get(opCtx);
|
||||
return txnRouter && opCtx->inMultiDocumentTransaction()
|
||||
? txnRouter.getPlacementConflictTime()
|
||||
: boost::none;
|
||||
}();
|
||||
|
||||
for (const auto& nss : nssList) {
|
||||
const auto nssCri = criMap.find(nss);
|
||||
tassert(8322004,
|
||||
@ -54,22 +61,44 @@ inline std::vector<ScopedSetShardRole> createScopedShardRoles(
|
||||
nssCri != criMap.end());
|
||||
|
||||
bool isTracked = nssCri->second.hasRoutingTable();
|
||||
|
||||
auto shardVersion = [&] {
|
||||
auto sv =
|
||||
isTracked ? nssCri->second.getShardVersion(myShardId) : ShardVersion::UNSHARDED();
|
||||
|
||||
if (auto txnRouter = TransactionRouter::get(opCtx);
|
||||
txnRouter && opCtx->inMultiDocumentTransaction()) {
|
||||
if (auto optOriginalPlacementConflictTime = txnRouter.getPlacementConflictTime()) {
|
||||
sv.setPlacementConflictTime(*optOriginalPlacementConflictTime);
|
||||
}
|
||||
if (placementConflictTime) {
|
||||
sv.setPlacementConflictTime(*placementConflictTime);
|
||||
}
|
||||
return sv;
|
||||
}();
|
||||
const auto dbVersion =
|
||||
isTracked ? boost::none : OperationShardingState::get(opCtx).getDbVersion(nss.dbName());
|
||||
|
||||
scopedShardRoles.emplace_back(opCtx, nss, shardVersion, dbVersion);
|
||||
// For UNTRACKED collections, the collection will only be potentially considered local if
|
||||
// this shard is the dbPrimary shard.
|
||||
//
|
||||
// If the routing info tells this shard is, then attach the DatabaseVersion to validate
|
||||
// that. For the opposite case, where the routing info says that this shard is not the
|
||||
// dbPrimary shard, we cannot attach the DatabaseVersion because the protocol does not allow
|
||||
// a way to express that, so it won't be validated. If the routing info was stale, this will
|
||||
// potentially result in executing a correct but sub-optimal query plan (only this time,
|
||||
// because the next executions will see an updated routing info as a side effect of this
|
||||
// execution having targeted a "remotely" and therefore will choose the optimal plan).
|
||||
const bool isDbPrimaryShard = nssCri->second.getDbPrimaryShardId() == myShardId;
|
||||
auto dbVersion = !isTracked && isDbPrimaryShard
|
||||
? boost::optional<DatabaseVersion>(nssCri->second.getDbVersion())
|
||||
: boost::none;
|
||||
|
||||
if (placementConflictTime && dbVersion) {
|
||||
dbVersion->setPlacementConflictTime(*placementConflictTime);
|
||||
}
|
||||
|
||||
try {
|
||||
scopedShardRoles.emplace_back(opCtx, nss, shardVersion, dbVersion);
|
||||
} catch (const ExceptionFor<ErrorCodes::IllegalChangeToExpectedDatabaseVersion>&) {
|
||||
// Only one can be correct. Check css and the new one.
|
||||
const auto scopedDss = DatabaseShardingState::acquire(opCtx, nss.dbName());
|
||||
scopedDss->checkDbVersionOrThrow(opCtx);
|
||||
scopedDss->checkDbVersionOrThrow(opCtx, *dbVersion);
|
||||
MONGO_UNREACHABLE_TASSERT(10825600);
|
||||
}
|
||||
}
|
||||
return scopedShardRoles;
|
||||
}
|
||||
|
||||
334
src/mongo/db/pipeline/initialize_auto_get_helper_test.cpp
Normal file
334
src/mongo/db/pipeline/initialize_auto_get_helper_test.cpp
Normal file
@ -0,0 +1,334 @@
|
||||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/pipeline/initialize_auto_get_helper.h"
|
||||
|
||||
#include "mongo/db/s/shard_server_test_fixture.h"
|
||||
#include "mongo/s/session_catalog_router.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class InitializeAutoGetHelperTest : public ShardServerTestFixtureWithCatalogCacheMock {
|
||||
protected:
|
||||
void setUp() override {
|
||||
ShardServerTestFixtureWithCatalogCacheMock::setUp();
|
||||
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
|
||||
Grid::get(operationContext())->setShardingInitialized();
|
||||
}
|
||||
|
||||
const ShardId otherShard{"shardB"};
|
||||
};
|
||||
|
||||
TEST_F(InitializeAutoGetHelperTest, TrackedUnshardedSecondaryCollections) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
// nssA and nssC are colocated. nssB is on a different shard.
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
const NamespaceString nssB = NamespaceString::createNamespaceString_forTest(dbName1, "collB");
|
||||
const NamespaceString nssC = NamespaceString::createNamespaceString_forTest(dbName1, "collC");
|
||||
|
||||
const auto db1PrimaryShard = kMyShardName;
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
|
||||
const auto criNssA = CatalogCacheMock::makeCollectionRoutingInfoUnsplittable(
|
||||
nssA, db1PrimaryShard, db1Version, kMyShardName);
|
||||
const auto criNssB = CatalogCacheMock::makeCollectionRoutingInfoUnsplittable(
|
||||
nssB, db1PrimaryShard, db1Version, otherShard);
|
||||
const auto criNssC = CatalogCacheMock::makeCollectionRoutingInfoUnsplittable(
|
||||
nssC, db1PrimaryShard, db1Version, kMyShardName);
|
||||
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssA, criNssA);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssB, criNssB);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssC, criNssC);
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, criNssA.getShardVersion(kMyShardName), boost::none);
|
||||
|
||||
// test with secondaryNss={nssB}.
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
ASSERT_EQ(criNssB.getShardVersion(kMyShardName), oss.getShardVersion(nssB));
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName1));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
|
||||
// test with secondaryNss={nssC}.
|
||||
anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
ASSERT_EQ(criNssC.getShardVersion(kMyShardName), oss.getShardVersion(nssC));
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName1));
|
||||
});
|
||||
ASSERT_EQ(false, anySecondaryCollectionIsNotLocal);
|
||||
|
||||
// test with secondaryNss={nssB, nssC}.
|
||||
anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB, nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
ASSERT_EQ(criNssB.getShardVersion(kMyShardName), oss.getShardVersion(nssB));
|
||||
ASSERT_EQ(criNssC.getShardVersion(kMyShardName), oss.getShardVersion(nssC));
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName1));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
|
||||
// test with secondaryNss={nssA, nssC}.
|
||||
anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssA, nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
ASSERT_EQ(criNssC.getShardVersion(kMyShardName), oss.getShardVersion(nssC));
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName1));
|
||||
});
|
||||
ASSERT_EQ(false, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
TEST_F(InitializeAutoGetHelperTest, UntrackedSecondaryCollections) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
// db1 is on shardA. db2 is on shardB.
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
const DatabaseName dbName2 = DatabaseName::createDatabaseName_forTest(boost::none, "db2");
|
||||
|
||||
// Untracked collections. nssA and nssB are in db1. nssC is in db2.
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
const NamespaceString nssB = NamespaceString::createNamespaceString_forTest(dbName1, "collB");
|
||||
const NamespaceString nssC = NamespaceString::createNamespaceString_forTest(dbName1, "collC");
|
||||
|
||||
const auto db1PrimaryShard = kMyShardName;
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
|
||||
const auto db2PrimaryShard = otherShard;
|
||||
const auto db2Version = DatabaseVersion(UUID::gen(), Timestamp(2, 0));
|
||||
|
||||
const auto criNssA =
|
||||
CatalogCacheMock::makeCollectionRoutingInfoUntracked(nssA, db1PrimaryShard, db1Version);
|
||||
const auto criNssB =
|
||||
CatalogCacheMock::makeCollectionRoutingInfoUntracked(nssB, db1PrimaryShard, db1Version);
|
||||
const auto criNssC =
|
||||
CatalogCacheMock::makeCollectionRoutingInfoUntracked(nssC, db2PrimaryShard, db2Version);
|
||||
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssA, criNssA);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssB, criNssB);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssC, criNssC);
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, ShardVersion::UNSHARDED(), boost::none);
|
||||
|
||||
// test with secondaryNss={nssB}.
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssB));
|
||||
// Expect DatabaseVersion for dbName1 to be set since the cache believes this shard to be
|
||||
// its dbPrimary shard.
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
});
|
||||
ASSERT_EQ(false, anySecondaryCollectionIsNotLocal);
|
||||
|
||||
// test with secondaryNss={nssC}.
|
||||
anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssC));
|
||||
// Do not expect DatabaseVersion for dbName2 to be set since the cache believes this shard
|
||||
// to not be its dbPrimary shard.
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName2));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
|
||||
// test with secondaryNss={nssB, nssC}.
|
||||
anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB, nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssB));
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssC));
|
||||
|
||||
// Expect DatabaseVersion for dbName1 to be set since the cache believes this shard to be
|
||||
// its dbPrimary shard.
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
// Do not expect DatabaseVersion for dbName2 to be set since the cache believes this shard
|
||||
// to not be its dbPrimary shard.
|
||||
ASSERT_EQ(boost::none, oss.getDbVersion(dbName2));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
TEST_F(InitializeAutoGetHelperTest, ShardedSecondaryCollections) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
const NamespaceString nssB = NamespaceString::createNamespaceString_forTest(dbName1, "collB");
|
||||
|
||||
const auto db1PrimaryShard = kMyShardName;
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
|
||||
const auto criNssB = CatalogCacheMock::makeCollectionRoutingInfoSharded(
|
||||
nssB,
|
||||
db1PrimaryShard,
|
||||
db1Version,
|
||||
BSON("skey" << 1),
|
||||
{{ChunkRange(BSON("skey" << MINKEY), BSON("skey" << 0)), kMyShardName},
|
||||
{ChunkRange(BSON("skey" << 0), BSON("skey" << MAXKEY)), otherShard}});
|
||||
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssB, criNssB);
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, ShardVersion::UNSHARDED(), db1Version);
|
||||
|
||||
// test with secondaryNss={nssB}.
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssA));
|
||||
ASSERT_EQ(criNssB.getShardVersion(kMyShardName), oss.getShardVersion(nssB));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
TEST_F(InitializeAutoGetHelperTest, NoSecondaryCollections) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, ShardVersion::UNSHARDED(), db1Version);
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssA));
|
||||
});
|
||||
ASSERT_EQ(false, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
TEST_F(InitializeAutoGetHelperTest, MixedTypeSecondaryCollections) {
|
||||
auto opCtx = operationContext();
|
||||
|
||||
// nssA is tracked-unsplittable on this shard. nssB is sharded. nssC is untracked on this shard.
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
const NamespaceString nssB = NamespaceString::createNamespaceString_forTest(dbName1, "collB");
|
||||
const NamespaceString nssC = NamespaceString::createNamespaceString_forTest(dbName1, "collC");
|
||||
|
||||
const auto db1PrimaryShard = kMyShardName;
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
|
||||
const auto criNssA = CatalogCacheMock::makeCollectionRoutingInfoUnsplittable(
|
||||
nssA, db1PrimaryShard, db1Version, kMyShardName);
|
||||
|
||||
const auto criNssB = CatalogCacheMock::makeCollectionRoutingInfoSharded(
|
||||
nssB,
|
||||
db1PrimaryShard,
|
||||
db1Version,
|
||||
BSON("skey" << 1),
|
||||
{{ChunkRange(BSON("skey" << MINKEY), BSON("skey" << MAXKEY)), otherShard}});
|
||||
|
||||
const auto criNssC =
|
||||
CatalogCacheMock::makeCollectionRoutingInfoUntracked(nssC, db1PrimaryShard, db1Version);
|
||||
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssB, criNssB);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssC, criNssC);
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, criNssA.getShardVersion(kMyShardName), boost::none);
|
||||
|
||||
// test with secondaryNss={nssB, nssC}.
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB, nssC}, [&]() {
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
|
||||
ASSERT_EQ(criNssB.getShardVersion(kMyShardName), oss.getShardVersion(nssB));
|
||||
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssC));
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
// Check placementConflict timestamp is set correctly on shardVersion and dbVersion.
|
||||
TEST_F(InitializeAutoGetHelperTest, InTransactionRcLocal) {
|
||||
auto opCtx = operationContext();
|
||||
opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
|
||||
opCtx->setTxnNumber(10);
|
||||
opCtx->setActiveTransactionParticipant();
|
||||
opCtx->setInMultiDocumentTransaction();
|
||||
const auto afterClusterTime = LogicalTime(Timestamp(1000, 0));
|
||||
repl::ReadConcernArgs::get(opCtx) =
|
||||
repl::ReadConcernArgs(afterClusterTime, repl::ReadConcernLevel::kLocalReadConcern);
|
||||
RouterOperationContextSession rocs(opCtx);
|
||||
|
||||
// nssA is tracked-unsplittable on this shard. nssB is sharded. nssC is untracked on this shard.
|
||||
const DatabaseName dbName1 = DatabaseName::createDatabaseName_forTest(boost::none, "db1");
|
||||
|
||||
const NamespaceString nssA = NamespaceString::createNamespaceString_forTest(dbName1, "collA");
|
||||
const NamespaceString nssB = NamespaceString::createNamespaceString_forTest(dbName1, "collB");
|
||||
const NamespaceString nssC = NamespaceString::createNamespaceString_forTest(dbName1, "collC");
|
||||
|
||||
const auto db1PrimaryShard = kMyShardName;
|
||||
const auto db1Version = DatabaseVersion(UUID::gen(), Timestamp(1, 0));
|
||||
|
||||
const auto criNssA = CatalogCacheMock::makeCollectionRoutingInfoUnsplittable(
|
||||
nssA, db1PrimaryShard, db1Version, kMyShardName);
|
||||
|
||||
const auto criNssB = CatalogCacheMock::makeCollectionRoutingInfoSharded(
|
||||
nssB,
|
||||
db1PrimaryShard,
|
||||
db1Version,
|
||||
BSON("skey" << 1),
|
||||
{{ChunkRange(BSON("skey" << MINKEY), BSON("skey" << MAXKEY)), otherShard}});
|
||||
|
||||
const auto criNssC =
|
||||
CatalogCacheMock::makeCollectionRoutingInfoUntracked(nssC, db1PrimaryShard, db1Version);
|
||||
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssB, criNssB);
|
||||
getCatalogCacheMock()->setCollectionReturnValue(nssC, criNssC);
|
||||
|
||||
ScopedSetShardRole shardRoleA(opCtx, nssA, criNssA.getShardVersion(kMyShardName), boost::none);
|
||||
|
||||
// test with secondaryNss={nssB, nssC}.
|
||||
bool anySecondaryCollectionIsNotLocal = initializeAutoGet(opCtx, nssA, {nssB, nssC}, [&]() {
|
||||
auto txnRouter = TransactionRouter::get(opCtx);
|
||||
ASSERT(txnRouter);
|
||||
|
||||
auto& oss = OperationShardingState::get(opCtx);
|
||||
|
||||
ASSERT_EQ(criNssA.getShardVersion(kMyShardName), oss.getShardVersion(nssA));
|
||||
|
||||
ASSERT_EQ(criNssB.getShardVersion(kMyShardName), oss.getShardVersion(nssB));
|
||||
ASSERT_EQ(afterClusterTime, oss.getShardVersion(nssB)->placementConflictTime());
|
||||
|
||||
ASSERT_EQ(db1Version, oss.getDbVersion(dbName1));
|
||||
ASSERT_EQ(afterClusterTime, oss.getDbVersion(dbName1)->getPlacementConflictTime());
|
||||
ASSERT_EQ(ShardVersion::UNSHARDED(), oss.getShardVersion(nssC));
|
||||
ASSERT_EQ(afterClusterTime, oss.getShardVersion(nssC)->placementConflictTime());
|
||||
});
|
||||
ASSERT_EQ(true, anySecondaryCollectionIsNotLocal);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
Loading…
Reference in New Issue
Block a user