SERVER-78495 Throw out vestiges of enableFinerGrainedCatalogCacheRefresh from the router role (#17693)
GitOrigin-RevId: 277f3bdecb1f20663b1d2e0259e01571849e08a5
This commit is contained in:
parent
72cf858a50
commit
e377614369
23
debian/mongodb-parameters.5
vendored
23
debian/mongodb-parameters.5
vendored
@ -3812,29 +3812,6 @@ For example, the following sets the interval at 300000 milliseconds
|
||||
.RE
|
||||
.RE
|
||||
.PP
|
||||
\fBenableFinerGrainedCatalogCacheRefresh\f1
|
||||
.RS
|
||||
.PP
|
||||
Available for both \fBmongod\f1\f1 and \fBmongos\f1\f1\&.
|
||||
.PP
|
||||
\fIType\f1: boolean
|
||||
.PP
|
||||
\fIDefault\f1: true
|
||||
.PP
|
||||
This parameter allows the catalog cache to be refreshed only if the
|
||||
shard needs to be refreshed. If disabled, any stale chunk will cause
|
||||
the entire chunk distribution for a collection to be considered stale
|
||||
and force all \fBrouters\f1 who
|
||||
contact the shard to refresh their shard catalog cache.
|
||||
.PP
|
||||
You can only set this parameter during start\-up and cannot change
|
||||
this setting using the \fBsetParameter\f1\f1 database command.
|
||||
.PP
|
||||
.EX
|
||||
mongod \-\-setParameter enableFinerGrainedCatalogCacheRefresh=true
|
||||
mongos \-\-setParameter enableFinerGrainedCatalogCacheRefresh=true
|
||||
.EE
|
||||
.RS
|
||||
.IP \(bu 2
|
||||
\fBSharding\f1
|
||||
.IP \(bu 2
|
||||
|
||||
@ -84,7 +84,6 @@ export const $config = extendWorkload($baseConfig, function($config, $super) {
|
||||
ErrorCodes.WriteConflict,
|
||||
ErrorCodes.LockTimeout,
|
||||
ErrorCodes.PreparedTransactionInProgress,
|
||||
ErrorCodes.ShardInvalidatedForTargeting,
|
||||
ErrorCodes.NoSuchTransaction
|
||||
];
|
||||
|
||||
|
||||
@ -1,108 +0,0 @@
|
||||
/**
|
||||
* Test that chunk operations don't cause the mongos to refresh unless an affected chunk is
|
||||
* targeted.
|
||||
*/
|
||||
|
||||
import {
|
||||
flushRoutersAndRefreshShardMetadata
|
||||
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
|
||||
|
||||
let st = new ShardingTest({
|
||||
mongos: 1,
|
||||
shards: 3,
|
||||
other: {mongosOptions: {setParameter: {enableFinerGrainedCatalogCacheRefresh: true}}}
|
||||
});
|
||||
const dbName = "test";
|
||||
const collName = "foo";
|
||||
const ns = dbName + "." + collName;
|
||||
let testDB = st.s.getDB(dbName);
|
||||
let testColl = testDB.foo;
|
||||
|
||||
let getMongosCollVersion = (ns) => {
|
||||
return st.s.adminCommand({getShardVersion: ns}).version;
|
||||
};
|
||||
|
||||
let setUp = () => {
|
||||
/**
|
||||
* Sets up a test by moving chunks to such that one chunk is on each
|
||||
* shard, with the following distribution:
|
||||
* shard0: [-inf, -10)
|
||||
* shard1: [-10, 10)
|
||||
* shard2: [10, inf)
|
||||
*/
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: -10}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 10}}));
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 0}, to: st.shard1.shardName}));
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: 1000}, to: st.shard2.shardName}));
|
||||
flushRoutersAndRefreshShardMetadata(st, {ns});
|
||||
};
|
||||
|
||||
// Verify that a split doesn't update the mongos' catalog cache unless an affected chunk is
|
||||
// targeted.
|
||||
let testSplit = () => {
|
||||
const mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: -500}}));
|
||||
assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
testColl.findOne({x: 0});
|
||||
testColl.findOne({x: 1000});
|
||||
assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
testColl.findOne({x: -1000});
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
// Verify that a merge doesn't update the mongos' catalog cache unless an affected chunk is
|
||||
// targeted.
|
||||
let testMerge = () => {
|
||||
const mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({mergeChunks: ns, bounds: [{x: MinKey}, {x: -10}]}));
|
||||
testColl.findOne({x: 0});
|
||||
testColl.findOne({x: 1000});
|
||||
assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
testColl.findOne({x: -1000});
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
// Verify that a chunk move doesn't update the mongos' catalog cache unless an affected chunk is
|
||||
// targeted.
|
||||
let testMoveChunk = () => {
|
||||
// Contact the donor shard to trigger update.
|
||||
let mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard1.shardName}));
|
||||
testColl.findOne({x: 1000});
|
||||
assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
testColl.findOne({x: -1000});
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
// Contact the recipient shard to trigger update.
|
||||
mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard0.shardName}));
|
||||
testColl.findOne({x: 1000});
|
||||
assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
testColl.findOne({x: -1000});
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
setUp();
|
||||
testSplit();
|
||||
testMerge();
|
||||
testMoveChunk();
|
||||
|
||||
st.stop();
|
||||
@ -1,79 +0,0 @@
|
||||
/**
|
||||
* Test that chunk operations cause the mongos to refresh if the finer grained catalog cache
|
||||
* refresh flag is set to false.
|
||||
*/
|
||||
|
||||
import {
|
||||
flushRoutersAndRefreshShardMetadata
|
||||
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
|
||||
|
||||
let st = new ShardingTest({
|
||||
mongos: 1,
|
||||
shards: 3,
|
||||
mongosOptions: {setParameter: {enableFinerGrainedCatalogCacheRefresh: false}}
|
||||
});
|
||||
const dbName = "test";
|
||||
const collName = "foo";
|
||||
const ns = dbName + "." + collName;
|
||||
|
||||
let getMongosCollVersion = (ns) => {
|
||||
return st.s.adminCommand({getShardVersion: ns}).version;
|
||||
};
|
||||
|
||||
let setUp = () => {
|
||||
/**
|
||||
* Sets up a test by moving chunks to such that one chunk is on each
|
||||
* shard, with the following distribution:
|
||||
* shard0: [-inf, -10)
|
||||
* shard1: [-10, 10)
|
||||
* shard2: [10, inf)
|
||||
*/
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: -10}}));
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 10}}));
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard0.shardName}));
|
||||
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 0}, to: st.shard1.shardName}));
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: 1000}, to: st.shard2.shardName}));
|
||||
flushRoutersAndRefreshShardMetadata(st, {ns});
|
||||
};
|
||||
|
||||
// Verify that a split updates the mongos' catalog cache.
|
||||
let testSplit = () => {
|
||||
const mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: -500}}));
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
// Verify that a merge updates the mongos' catalog cache.
|
||||
let testMerge = () => {
|
||||
const mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
assert.commandWorked(st.s.adminCommand({mergeChunks: ns, bounds: [{x: MinKey}, {x: -10}]}));
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
// Verify that a chunk move updates the mongos' catalog cache.
|
||||
let testMoveChunk = () => {
|
||||
let mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard1.shardName}));
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
|
||||
mongosCollectionVersion = getMongosCollVersion(ns);
|
||||
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: {x: -100}, to: st.shard0.shardName}));
|
||||
assert.lt(mongosCollectionVersion, getMongosCollVersion(ns));
|
||||
};
|
||||
|
||||
setUp();
|
||||
testSplit();
|
||||
testMerge();
|
||||
testMoveChunk();
|
||||
|
||||
st.stop();
|
||||
@ -14,13 +14,7 @@ const nodeOptions = {
|
||||
setParameter: {enableShardedIndexConsistencyCheck: false}
|
||||
};
|
||||
|
||||
const st = new ShardingTest({
|
||||
shards: 3,
|
||||
other: {
|
||||
configOptions: nodeOptions,
|
||||
mongosOptions: {setParameter: {enableFinerGrainedCatalogCacheRefresh: true}}
|
||||
}
|
||||
});
|
||||
const st = new ShardingTest({shards: 3, other: {configOptions: nodeOptions}});
|
||||
const dbName = "test";
|
||||
const collName = "user";
|
||||
const ns = dbName + "." + collName;
|
||||
@ -54,10 +48,8 @@ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: null}}));
|
||||
ShardVersioningUtil.moveChunkNotRefreshRecipient(st.s, ns, st.shard1, st.shard2, {_id: MinKey});
|
||||
|
||||
const latestCollectionVersion = ShardVersioningUtil.getMetadataOnShard(st.shard1, ns).collVersion;
|
||||
const mongosCollectionVersion = st.s.adminCommand({getShardVersion: ns}).version;
|
||||
|
||||
// Assert that the mongos and all non-donor shards have a stale collection version.
|
||||
assert.lt(mongosCollectionVersion, latestCollectionVersion);
|
||||
// Assert that all non-donor shards have a stale collection version.
|
||||
ShardVersioningUtil.assertCollectionVersionOlderThan(st.shard0, ns, latestCollectionVersion);
|
||||
ShardVersioningUtil.assertCollectionVersionEquals(st.shard1, ns, latestCollectionVersion);
|
||||
ShardVersioningUtil.assertCollectionVersionOlderThan(st.shard2, ns, latestCollectionVersion);
|
||||
|
||||
@ -17,10 +17,7 @@ if (isStepdownSuite && isCodeCoverageEnabled) {
|
||||
// query collecting the routing table metadata to fail due to max document size exceeded).
|
||||
TestData.skipCheckRoutingTableConsistency = true;
|
||||
|
||||
const st = new ShardingTest({
|
||||
shards: 2,
|
||||
other: {mongosOptions: {setParameter: {enableFinerGrainedCatalogCacheRefresh: true}}}
|
||||
});
|
||||
const st = new ShardingTest({shards: 2});
|
||||
const dbName = "test";
|
||||
const collName = "foo";
|
||||
const ns = dbName + "." + collName;
|
||||
@ -106,17 +103,13 @@ st.configRS.awaitLastOpCommitted();
|
||||
// last two splits were performed directly on the shards.
|
||||
assert.neq(null, st.s.getDB('config').databases.findOne());
|
||||
|
||||
// Verify that moving a chunk won't trigger mongos's routing entry to get marked as stale until
|
||||
// a request comes in to target that chunk.
|
||||
// Trigger a refresh on the mongos through a moveChunk command.
|
||||
assert.commandWorked(
|
||||
st.s.adminCommand({moveChunk: ns, find: splitPoint, to: otherShard.shardName}));
|
||||
|
||||
// Chunks should not be included in the response regardless of the mongos version
|
||||
// because the chunk size exceeds the limit.
|
||||
// Chunks should not be included in the response because the chunk size exceeds the limit.
|
||||
res = st.s.adminCommand({getShardVersion: ns, fullMetadata: true});
|
||||
assert.commandWorked(res);
|
||||
assert.eq(res.version.t, 1);
|
||||
assert.eq(res.version.i, 20002);
|
||||
assert.eq(undefined, res.chunks);
|
||||
|
||||
st.stop();
|
||||
st.stop();
|
||||
|
||||
@ -360,11 +360,6 @@ error_codes:
|
||||
- {code: 302,name: OCSPCertificateStatusUnknown}
|
||||
- {code: 303,name: SplitHorizonChange,categories: [CloseConnectionError]}
|
||||
|
||||
# This code should only be used upon finding that a shard has been marked stale in the sharding
|
||||
# catalog cache, and as such does not belong in the StaleShardVersionError or
|
||||
# NeedRetargettingError categories.
|
||||
- {code: 304,name: ShardInvalidatedForTargeting,extra: ShardInvalidatedForTargetingInfo}
|
||||
|
||||
- {code: 306,name: ReadThroughCacheLookupCanceled,categories: [InternalOnly]}
|
||||
|
||||
- {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist}
|
||||
|
||||
@ -266,7 +266,6 @@ bool isTransientTransactionError(ErrorCodes::Error code,
|
||||
case ErrorCodes::LockTimeout:
|
||||
case ErrorCodes::PreparedTransactionInProgress:
|
||||
case ErrorCodes::ShardCannotRefreshDueToLocksHeld:
|
||||
case ErrorCodes::ShardInvalidatedForTargeting:
|
||||
case ErrorCodes::StaleDbVersion:
|
||||
case ErrorCodes::TenantMigrationAborted:
|
||||
case ErrorCodes::TenantMigrationCommitted:
|
||||
|
||||
@ -93,12 +93,6 @@ TEST(IsTransientTransactionErrorTest, ShardCannotRefreshDueToLocksHeldIsTransien
|
||||
false /* isCommitOrAbort */));
|
||||
}
|
||||
|
||||
TEST(IsTransientTransactionErrorTest, ShardInvalidatedForTargetingIsTransient) {
|
||||
ASSERT_TRUE(isTransientTransactionError(ErrorCodes::ShardInvalidatedForTargeting,
|
||||
false /* hasWriteConcernError */,
|
||||
false /* isCommitOrAbort */));
|
||||
}
|
||||
|
||||
TEST(IsTransientTransactionErrorTest, StaleDbVersionIsTransient) {
|
||||
ASSERT_TRUE(isTransientTransactionError(
|
||||
ErrorCodes::StaleDbVersion, false /* hasWriteConcernError */, false /* isCommitOrAbort */));
|
||||
|
||||
@ -115,9 +115,6 @@ public:
|
||||
/**
|
||||
* Returns the current shard's placement version for the collection or UNSHARDED if it is not
|
||||
* sharded.
|
||||
*
|
||||
* Will throw ShardInvalidatedForTargeting if _thisShardId is marked as stale by
|
||||
* the CollectionMetadata's current chunk manager.
|
||||
*/
|
||||
ChunkVersion getShardPlacementVersion() const {
|
||||
return (hasRoutingTable() ? _cm->getVersion(_thisShardId) : ChunkVersion::UNSHARDED());
|
||||
@ -128,9 +125,6 @@ public:
|
||||
* sharded. This value indicates the commit time of the latest placement change that this shard
|
||||
* participated in and is used to answer the question of "did any chunks move since some
|
||||
* timestamp".
|
||||
*
|
||||
* Will throw ShardInvalidatedForTargeting if _thisShardId is marked as stale by
|
||||
* the CollectionMetadata's current chunk manager.
|
||||
*/
|
||||
Timestamp getShardMaxValidAfter() const {
|
||||
return (hasRoutingTable() ? _cm->getMaxValidAfter(_thisShardId) : Timestamp(0, 0));
|
||||
|
||||
@ -292,7 +292,6 @@ env.Library(
|
||||
'resharding/resume_token.idl',
|
||||
'resharding/type_collection_fields.idl',
|
||||
'shard_cannot_refresh_due_to_locks_held_exception.cpp',
|
||||
'shard_invalidated_for_targeting_exception.cpp',
|
||||
'shard_key_pattern.cpp',
|
||||
'shard_version.cpp',
|
||||
'shard_version.idl',
|
||||
|
||||
@ -77,7 +77,6 @@
|
||||
#include "mongo/s/shard_version_factory.h"
|
||||
#include "mongo/s/sharding_feature_flags_gen.h"
|
||||
#include "mongo/s/type_collection_common_types_gen.h"
|
||||
#include "mongo/util/decorable.h"
|
||||
#include "mongo/util/duration.h"
|
||||
#include "mongo/util/fail_point.h"
|
||||
#include "mongo/util/future.h"
|
||||
@ -102,9 +101,6 @@ const int kDatabaseCacheSize = 10000;
|
||||
const int kCollectionCacheSize = 10000;
|
||||
const int kIndexCacheSize = 10000;
|
||||
|
||||
const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh =
|
||||
OperationContext::declareDecoration<bool>();
|
||||
|
||||
std::shared_ptr<RoutingTableHistory> createUpdatedRoutingTableHistory(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
@ -384,12 +380,8 @@ StatusWith<ChunkManager> CatalogCache::_getCollectionPlacementInfoAt(
|
||||
|
||||
const auto dbInfo = std::move(swDbInfo.getValue());
|
||||
|
||||
const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh &&
|
||||
!operationShouldBlockBehindCatalogCacheRefresh(opCtx)
|
||||
? CacheCausalConsistency::kLatestCached
|
||||
: CacheCausalConsistency::kLatestKnown;
|
||||
|
||||
auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
|
||||
auto collEntryFuture =
|
||||
_collectionCache.acquireAsync(nss, CacheCausalConsistency::kLatestKnown);
|
||||
|
||||
if (allowLocks) {
|
||||
// When allowLocks is true we may be holding a lock, so we don't
|
||||
@ -397,7 +389,6 @@ StatusWith<ChunkManager> CatalogCache::_getCollectionPlacementInfoAt(
|
||||
// use it, otherwise return an error
|
||||
|
||||
if (collEntryFuture.isReady()) {
|
||||
setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
|
||||
return ChunkManager(dbInfo->getPrimary(),
|
||||
dbInfo->getVersion(),
|
||||
collEntryFuture.get(opCtx),
|
||||
@ -417,8 +408,6 @@ StatusWith<ChunkManager> CatalogCache::_getCollectionPlacementInfoAt(
|
||||
auto collEntry = collEntryFuture.get(opCtx);
|
||||
_stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
|
||||
|
||||
setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
|
||||
|
||||
return ChunkManager(dbInfo->getPrimary(),
|
||||
dbInfo->getVersion(),
|
||||
std::move(collEntry),
|
||||
@ -443,7 +432,8 @@ StatusWith<ChunkManager> CatalogCache::_getCollectionPlacementInfoAt(
|
||||
}
|
||||
}
|
||||
|
||||
collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
|
||||
collEntryFuture =
|
||||
_collectionCache.acquireAsync(nss, CacheCausalConsistency::kLatestKnown);
|
||||
t.reset();
|
||||
}
|
||||
} catch (const DBException& ex) {
|
||||
@ -596,18 +586,14 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon
|
||||
return getDatabase(opCtx, dbName);
|
||||
}
|
||||
|
||||
void CatalogCache::_triggerPlacementVersionRefresh(OperationContext* opCtx,
|
||||
const NamespaceString& nss) {
|
||||
void CatalogCache::_triggerPlacementVersionRefresh(const NamespaceString& nss) {
|
||||
_collectionCache.advanceTimeInStore(
|
||||
nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh());
|
||||
setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
|
||||
}
|
||||
|
||||
void CatalogCache::_triggerIndexVersionRefresh(OperationContext* opCtx,
|
||||
const NamespaceString& nss) {
|
||||
void CatalogCache::_triggerIndexVersionRefresh(const NamespaceString& nss) {
|
||||
_indexCache.advanceTimeInStore(
|
||||
nss, ComparableIndexVersion::makeComparableIndexVersionForForcedRefresh());
|
||||
setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
|
||||
}
|
||||
|
||||
StatusWith<CollectionRoutingInfo> CatalogCache::_retryUntilConsistentRoutingInfo(
|
||||
@ -636,8 +622,8 @@ StatusWith<CollectionRoutingInfo> CatalogCache::_retryUntilConsistentRoutingInfo
|
||||
StatusWith<CollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh(
|
||||
OperationContext* opCtx, const NamespaceString& nss) {
|
||||
try {
|
||||
_triggerPlacementVersionRefresh(opCtx, nss);
|
||||
_triggerIndexVersionRefresh(opCtx, nss);
|
||||
_triggerPlacementVersionRefresh(nss);
|
||||
_triggerIndexVersionRefresh(nss);
|
||||
return _getCollectionRoutingInfoWithoutOptimization(opCtx, nss);
|
||||
} catch (const DBException& ex) {
|
||||
return ex.toStatus();
|
||||
@ -647,7 +633,7 @@ StatusWith<CollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefr
|
||||
StatusWith<CollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithPlacementRefresh(
|
||||
OperationContext* opCtx, const NamespaceString& nss) {
|
||||
try {
|
||||
_triggerPlacementVersionRefresh(opCtx, nss);
|
||||
_triggerPlacementVersionRefresh(nss);
|
||||
return getCollectionRoutingInfo(opCtx, nss, false);
|
||||
} catch (const DBException& ex) {
|
||||
return ex.toStatus();
|
||||
@ -657,7 +643,7 @@ StatusWith<CollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithPlac
|
||||
StatusWith<CollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithIndexRefresh(
|
||||
OperationContext* opCtx, const NamespaceString& nss) {
|
||||
try {
|
||||
_triggerIndexVersionRefresh(opCtx, nss);
|
||||
_triggerIndexVersionRefresh(nss);
|
||||
return _getCollectionRoutingInfoWithoutOptimization(opCtx, nss);
|
||||
} catch (const DBException& ex) {
|
||||
return ex.toStatus();
|
||||
@ -756,41 +742,21 @@ void CatalogCache::onStaleDatabaseVersion(const DatabaseName& dbName,
|
||||
}
|
||||
}
|
||||
|
||||
void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx,
|
||||
bool shouldBlock) {
|
||||
if (gEnableFinerGrainedCatalogCacheRefresh) {
|
||||
operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock;
|
||||
}
|
||||
}
|
||||
|
||||
void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection(
|
||||
const NamespaceString& nss,
|
||||
const boost::optional<ShardVersion>& wantedVersion,
|
||||
const ShardId& shardId) {
|
||||
_stats.countStaleConfigErrors.addAndFetch(1);
|
||||
|
||||
auto collectionEntry = _collectionCache.peekLatestCached(nss);
|
||||
|
||||
const auto newChunkVersion = wantedVersion
|
||||
? ComparableChunkVersion::makeComparableChunkVersion(wantedVersion->placementVersion())
|
||||
: ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh();
|
||||
|
||||
const bool routingInfoTimeAdvanced = _collectionCache.advanceTimeInStore(nss, newChunkVersion);
|
||||
_collectionCache.advanceTimeInStore(nss, newChunkVersion);
|
||||
|
||||
const auto newIndexVersion = wantedVersion
|
||||
? ComparableIndexVersion::makeComparableIndexVersion(wantedVersion->indexVersion())
|
||||
: ComparableIndexVersion::makeComparableIndexVersionForForcedRefresh();
|
||||
|
||||
_indexCache.advanceTimeInStore(nss, newIndexVersion);
|
||||
|
||||
if (routingInfoTimeAdvanced && collectionEntry && collectionEntry->optRt) {
|
||||
// Shards marked stale will be reset on the next refresh.
|
||||
// We can mark the shard stale only if the time advanced, otherwise no refresh would happen
|
||||
// and the shard will remain marked stale.
|
||||
// Even if a concurrent refresh is happening this is still the old collectionEntry,
|
||||
// so it is safe to call setShardStale.
|
||||
collectionEntry->optRt->setShardStale(shardId);
|
||||
}
|
||||
}
|
||||
|
||||
void CatalogCache::advanceCollectionTimeInStore(const NamespaceString& nss,
|
||||
@ -1035,8 +1001,6 @@ CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_look
|
||||
opCtx, nss, isIncremental, existingHistory, collectionAndChunks);
|
||||
invariant(newRoutingHistory);
|
||||
|
||||
newRoutingHistory->setAllShardsRefreshed();
|
||||
|
||||
// Check that the shards all match with what is on the config server
|
||||
std::set<ShardId> shardIds;
|
||||
newRoutingHistory->getAllShardIds(&shardIds);
|
||||
|
||||
@ -298,12 +298,6 @@ public:
|
||||
void onStaleDatabaseVersion(const DatabaseName& dbName,
|
||||
const boost::optional<DatabaseVersion>& wantedVersion);
|
||||
|
||||
/**
|
||||
* Sets whether this operation should block behind a catalog cache refresh.
|
||||
*/
|
||||
static void setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx,
|
||||
bool shouldBlock);
|
||||
|
||||
/**
|
||||
* Invalidates a single shard for the current collection if the epochs given in the chunk
|
||||
* versions match. Otherwise, invalidates the entire collection, causing any future targetting
|
||||
@ -454,9 +448,9 @@ private:
|
||||
boost::optional<ShardingIndexesCatalogCache> _getCollectionIndexInfoAt(
|
||||
OperationContext* opCtx, const NamespaceString& nss, bool allowLocks = false);
|
||||
|
||||
void _triggerPlacementVersionRefresh(OperationContext* opCtx, const NamespaceString& nss);
|
||||
void _triggerPlacementVersionRefresh(const NamespaceString& nss);
|
||||
|
||||
void _triggerIndexVersionRefresh(OperationContext* opCtx, const NamespaceString& nss);
|
||||
void _triggerIndexVersionRefresh(const NamespaceString& nss);
|
||||
|
||||
// Same as getCollectionRoutingInfo but will fetch the index information from the cache even if
|
||||
// the placement information is not sharded. Used internally when the a refresh is requested for
|
||||
|
||||
@ -51,7 +51,6 @@
|
||||
#include "mongo/bson/util/builder_fwd.h"
|
||||
#include "mongo/db/query/collation/collation_index_key.h"
|
||||
#include "mongo/s/mongod_and_mongos_server_parameters_gen.h"
|
||||
#include "mongo/s/shard_invalidated_for_targeting_exception.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
@ -646,25 +645,7 @@ RoutingTableHistory::RoutingTableHistory(
|
||||
_timeseriesFields(std::move(timeseriesFields)),
|
||||
_reshardingFields(std::move(reshardingFields)),
|
||||
_allowMigrations(allowMigrations),
|
||||
_chunkMap(std::move(chunkMap)),
|
||||
_placementVersions(_chunkMap.getShardPlacementVersionMap()) {}
|
||||
|
||||
void RoutingTableHistory::setShardStale(const ShardId& shardId) {
|
||||
if (gEnableFinerGrainedCatalogCacheRefresh) {
|
||||
auto it = _placementVersions.find(shardId);
|
||||
if (it != _placementVersions.end()) {
|
||||
it->second.isStale.store(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void RoutingTableHistory::setAllShardsRefreshed() {
|
||||
if (gEnableFinerGrainedCatalogCacheRefresh) {
|
||||
for (auto& [shard, targetingInfo] : _placementVersions) {
|
||||
targetingInfo.isStale.store(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
_chunkMap(std::move(chunkMap)) {}
|
||||
|
||||
Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey,
|
||||
const BSONObj& collation,
|
||||
@ -748,7 +729,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
|
||||
// because _placementVersions contains shards with chunks and is built based on the last
|
||||
// refresh. Therefore, it is possible for _placementVersions to have fewer entries if a
|
||||
// shard no longer owns chunks when it used to at _clusterTime.
|
||||
if (!_clusterTime && shardIds->size() == _rt->optRt->_placementVersions.size()) {
|
||||
if (!_clusterTime && shardIds->size() == _rt->optRt->getNShardsOwningChunks()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -802,8 +783,9 @@ ShardId ChunkManager::getMinKeyShardIdWithSimpleCollation() const {
|
||||
void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const {
|
||||
invariant(all->empty());
|
||||
|
||||
std::transform(_placementVersions.begin(),
|
||||
_placementVersions.end(),
|
||||
const auto& shardPlacementVersionMap = _chunkMap.getShardPlacementVersionMap();
|
||||
std::transform(shardPlacementVersionMap.begin(),
|
||||
shardPlacementVersionMap.end(),
|
||||
std::inserter(*all, all->begin()),
|
||||
[](const ShardPlacementVersionMap::value_type& pair) { return pair.first; });
|
||||
}
|
||||
@ -829,10 +811,9 @@ std::string ChunkManager::toString() const {
|
||||
return _rt->optRt ? _rt->optRt->toString() : "UNSHARDED";
|
||||
}
|
||||
|
||||
PlacementVersionTargetingInfo RoutingTableHistory::_getVersion(const ShardId& shardName,
|
||||
bool throwOnStaleShard) const {
|
||||
auto it = _placementVersions.find(shardName);
|
||||
if (it == _placementVersions.end()) {
|
||||
PlacementVersionTargetingInfo RoutingTableHistory::_getVersion(const ShardId& shardName) const {
|
||||
auto it = _chunkMap.getShardPlacementVersionMap().find(shardName);
|
||||
if (it == _chunkMap.getShardPlacementVersionMap().end()) {
|
||||
// Shards without explicitly tracked placement versions (meaning they have no chunks) always
|
||||
// have a version of (epoch, timestamp, 0, 0)
|
||||
auto collPlacementVersion = _chunkMap.getVersion();
|
||||
@ -840,12 +821,6 @@ PlacementVersionTargetingInfo RoutingTableHistory::_getVersion(const ShardId& sh
|
||||
Timestamp(0, 0));
|
||||
}
|
||||
|
||||
if (throwOnStaleShard && gEnableFinerGrainedCatalogCacheRefresh) {
|
||||
uassert(ShardInvalidatedForTargetingInfo(_nss),
|
||||
"shard has been marked stale",
|
||||
!it->second.isStale.load());
|
||||
}
|
||||
|
||||
const auto& placementVersionTargetingInfo = it->second;
|
||||
return PlacementVersionTargetingInfo(placementVersionTargetingInfo.placementVersion,
|
||||
placementVersionTargetingInfo.validAfter);
|
||||
@ -858,12 +833,6 @@ std::string RoutingTableHistory::toString() const {
|
||||
|
||||
sb << _chunkMap.toString();
|
||||
|
||||
sb << "Shard placement versions:\n";
|
||||
for (const auto& entry : _placementVersions) {
|
||||
sb << "\t" << entry.first << ": " << entry.second.placementVersion.toString() << " @ "
|
||||
<< entry.second.validAfter.toString() << '\n';
|
||||
}
|
||||
|
||||
return sb.str();
|
||||
}
|
||||
|
||||
|
||||
@ -66,11 +66,6 @@ namespace mongo {
|
||||
class ChunkManager;
|
||||
|
||||
struct PlacementVersionTargetingInfo {
|
||||
|
||||
PlacementVersionTargetingInfo(const PlacementVersionTargetingInfo& other)
|
||||
: placementVersion(other.placementVersion),
|
||||
validAfter(other.validAfter),
|
||||
isStale(other.isStale.load()) {}
|
||||
/**
|
||||
* Constructs a placement information for a collection with the specified generation, starting
|
||||
* at placementVersion {0, 0} and maxValidAfter of Timestamp{0, 0}. The expectation is that the
|
||||
@ -86,9 +81,6 @@ struct PlacementVersionTargetingInfo {
|
||||
// Max validAfter for the shard, effectively this is the timestamp of the latest placement
|
||||
// change that occurred on a particular shard.
|
||||
Timestamp validAfter;
|
||||
|
||||
// Indicates whether the shard is stale and thus needs a catalog cache refresh
|
||||
AtomicWord<bool> isStale{false};
|
||||
};
|
||||
|
||||
// Map from a shard to a struct indicating both the max chunk version on that shard and whether the
|
||||
@ -346,18 +338,6 @@ public:
|
||||
return _unique;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the given shard as stale, indicating that requests targetted to this shard (for this
|
||||
* namespace) need to block on a catalog cache refresh.
|
||||
*/
|
||||
void setShardStale(const ShardId& shardId);
|
||||
|
||||
/**
|
||||
* Mark all shards as not stale, indicating that a refresh has happened and requests targeted
|
||||
* to all shards (for this namespace) do not currently need to block on a catalog cache refresh.
|
||||
*/
|
||||
void setAllShardsRefreshed();
|
||||
|
||||
/**
|
||||
* Returns the maximum version across all shards (also known as the "collection placement
|
||||
* version").
|
||||
@ -367,11 +347,10 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the placement version for the given shard. Will throw a
|
||||
* ShardInvalidatedForTargeting exception if the shard is marked as stale.
|
||||
* Retrieves the placement version for the given shard.
|
||||
*/
|
||||
ChunkVersion getVersion(const ShardId& shardId) const {
|
||||
return _getVersion(shardId, true).placementVersion;
|
||||
return _getVersion(shardId).placementVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -380,15 +359,14 @@ public:
|
||||
* based on the returned version, use getVersion() instead.
|
||||
*/
|
||||
ChunkVersion getVersionForLogging(const ShardId& shardId) const {
|
||||
return _getVersion(shardId, false).placementVersion;
|
||||
return _getVersion(shardId).placementVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the maximum validAfter timestamp for the given shard. Will throw a
|
||||
* ShardInvalidatedForTargeting exception if the shard is marked as stale.
|
||||
* Retrieves the maximum validAfter timestamp for the given shard.
|
||||
*/
|
||||
Timestamp getMaxValidAfter(const ShardId& shardId) const {
|
||||
return _getVersion(shardId, true).validAfter;
|
||||
return _getVersion(shardId).validAfter;
|
||||
}
|
||||
|
||||
size_t numChunks() const {
|
||||
@ -427,7 +405,7 @@ public:
|
||||
* Returns the number of shards on which the collection has any chunks
|
||||
*/
|
||||
size_t getNShardsOwningChunks() const {
|
||||
return _placementVersions.size();
|
||||
return _chunkMap.getShardPlacementVersionMap().size();
|
||||
}
|
||||
|
||||
std::string toString() const;
|
||||
@ -466,7 +444,7 @@ private:
|
||||
bool allowMigrations,
|
||||
ChunkMap chunkMap);
|
||||
|
||||
PlacementVersionTargetingInfo _getVersion(const ShardId& shardId, bool throwOnStaleShard) const;
|
||||
PlacementVersionTargetingInfo _getVersion(const ShardId& shardId) const;
|
||||
|
||||
// Namespace to which this routing information corresponds
|
||||
NamespaceString _nss;
|
||||
@ -500,11 +478,6 @@ private:
|
||||
// Map from the max for each chunk to an entry describing the chunk. The union of all chunks'
|
||||
// ranges must cover the complete space from [MinKey, MaxKey).
|
||||
ChunkMap _chunkMap;
|
||||
|
||||
// The representation of shards' placement versions and staleness indicators for this namespace.
|
||||
// If a shard does not exist, it will not have an entry in the map. Note: this declaration must
|
||||
// not be moved before _chunkMap since it is initialized by using the _chunkMap instance.
|
||||
ShardPlacementVersionMap _placementVersions;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -733,8 +706,7 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the placement version for the given shard. Will throw a
|
||||
* ShardInvalidatedForTargeting exception if the shard is marked as stale.
|
||||
* Retrieves the placement version for the given shard.
|
||||
*/
|
||||
ChunkVersion getVersion(const ShardId& shardId) const {
|
||||
tassert(7626404, "Expected routing table to be initialized", _rt->optRt);
|
||||
@ -742,8 +714,7 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the maximum validAfter timestamp for the given shard. Will throw a
|
||||
* ShardInvalidatedForTargeting exception if the shard is marked as stale.
|
||||
* Retrieves the maximum validAfter timestamp for the given shard.
|
||||
*/
|
||||
Timestamp getMaxValidAfter(const ShardId& shardId) const {
|
||||
tassert(7626405, "Expected routing table to be initialized", _rt->optRt);
|
||||
@ -751,8 +722,8 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the placement version for the given shard. Will not throw if the shard is marked as
|
||||
* stale. Only use when logging the given chunk version -- if the caller must execute logic
|
||||
* Retrieves the placement version for the given shard.
|
||||
* Only use when logging the given chunk version -- if the caller must execute logic
|
||||
* based on the returned version, use getVersion() instead.
|
||||
*/
|
||||
ChunkVersion getVersionForLogging(const ShardId& shardId) const {
|
||||
|
||||
@ -396,8 +396,6 @@ private:
|
||||
} // namespace
|
||||
|
||||
Value ExpressionInternalOwningShard::evaluate(const Document& root, Variables* variables) const {
|
||||
// TODO SERVER-71519: Add support for handling stale exception from mongos with
|
||||
// enableFinerGrainedCatalogCacheRefresh.
|
||||
uassert(6868600,
|
||||
"$_internalOwningShard is currently not supported on mongos",
|
||||
!serverGlobalParams.clusterRole.hasExclusively(ClusterRole::RouterServer));
|
||||
|
||||
@ -110,7 +110,6 @@
|
||||
#include "mongo/s/mongos_topology_coordinator.h"
|
||||
#include "mongo/s/query_analysis_sampler.h"
|
||||
#include "mongo/s/session_catalog_router.h"
|
||||
#include "mongo/s/shard_invalidated_for_targeting_exception.h"
|
||||
#include "mongo/s/stale_exception.h"
|
||||
#include "mongo/s/transaction_router.h"
|
||||
#include "mongo/transport/hello_metrics.h"
|
||||
@ -188,9 +187,7 @@ Future<void> invokeInTransactionRouter(TransactionRouter::Router& txnRouter,
|
||||
return runCommandInvocation(rec, std::move(invocation))
|
||||
.tapError([rec = std::move(rec)](Status status) {
|
||||
if (auto code = status.code(); ErrorCodes::isSnapshotError(code) ||
|
||||
ErrorCodes::isNeedRetargettingError(code) ||
|
||||
code == ErrorCodes::ShardInvalidatedForTargeting ||
|
||||
code == ErrorCodes::StaleDbVersion ||
|
||||
ErrorCodes::isNeedRetargettingError(code) || code == ErrorCodes::StaleDbVersion ||
|
||||
code == ErrorCodes::ShardCannotRefreshDueToLocksHeld ||
|
||||
code == ErrorCodes::WouldChangeOwningShard) {
|
||||
// Don't abort on possibly retryable errors.
|
||||
@ -443,7 +440,6 @@ private:
|
||||
// Exception handler for error codes that may trigger a retry. All methods will throw `status`
|
||||
// unless an attempt to retry is possible.
|
||||
void _checkRetryForTransaction(Status& status);
|
||||
void _onShardInvalidatedForTargeting(Status& status);
|
||||
void _onNeedRetargetting(Status& status);
|
||||
void _onStaleDbVersion(Status& status);
|
||||
void _onSnapshotError(Status& status);
|
||||
@ -1027,17 +1023,10 @@ void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status)
|
||||
txnRouter.onSnapshotError(opCtx, status);
|
||||
} else {
|
||||
invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status) ||
|
||||
status.code() == ErrorCodes::ShardInvalidatedForTargeting ||
|
||||
status.code() == ErrorCodes::StaleDbVersion ||
|
||||
status.code() == ErrorCodes::ShardCannotRefreshDueToLocksHeld);
|
||||
|
||||
if (!txnRouter.canContinueOnStaleShardOrDbError(_parc->_commandName, status)) {
|
||||
if (status.code() == ErrorCodes::ShardInvalidatedForTargeting) {
|
||||
auto catalogCache = Grid::get(opCtx)->catalogCache();
|
||||
(void)catalogCache->getCollectionRoutingInfoWithPlacementRefresh(
|
||||
opCtx, status.extraInfo<ShardInvalidatedForTargetingInfo>()->getNss());
|
||||
}
|
||||
|
||||
addContextForTransactionAbortingError(txnRouter.txnIdToString(),
|
||||
txnRouter.getLatestStmtId(),
|
||||
status,
|
||||
@ -1052,19 +1041,6 @@ void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status)
|
||||
abortGuard.dismiss();
|
||||
}
|
||||
|
||||
void ParseAndRunCommand::RunAndRetry::_onShardInvalidatedForTargeting(Status& status) {
|
||||
invariant(status.code() == ErrorCodes::ShardInvalidatedForTargeting);
|
||||
|
||||
auto opCtx = _parc->_rec->getOpCtx();
|
||||
auto catalogCache = Grid::get(opCtx)->catalogCache();
|
||||
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
|
||||
|
||||
_checkRetryForTransaction(status);
|
||||
|
||||
if (!_canRetry())
|
||||
iassert(status);
|
||||
}
|
||||
|
||||
void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) {
|
||||
invariant(ErrorCodes::isA<ErrorCategory::NeedRetargettingError>(status));
|
||||
|
||||
@ -1089,8 +1065,6 @@ void ParseAndRunCommand::RunAndRetry::_onNeedRetargetting(Status& status) {
|
||||
originalNs, boost::none, staleInfo->getShardId());
|
||||
}
|
||||
|
||||
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
|
||||
|
||||
_checkRetryForTransaction(status);
|
||||
|
||||
if (!_canRetry())
|
||||
@ -1168,10 +1142,6 @@ Future<void> ParseAndRunCommand::RunAndRetry::run() {
|
||||
_setup();
|
||||
return _run();
|
||||
})
|
||||
.onError<ErrorCodes::ShardInvalidatedForTargeting>([this](Status status) {
|
||||
_onShardInvalidatedForTargeting(status);
|
||||
return run(); // Retry
|
||||
})
|
||||
.onErrorCategory<ErrorCategory::NeedRetargettingError>([this](Status status) {
|
||||
_onNeedRetargetting(status);
|
||||
return run(); // Retry
|
||||
|
||||
@ -64,7 +64,7 @@ server_parameters:
|
||||
|
||||
enableFinerGrainedCatalogCacheRefresh:
|
||||
description: >-
|
||||
Enables the finer grained catalog cache refresh behavior.
|
||||
DEPRECATED: This parameter has been deprecated and is here only for maintaining backward compatibility.
|
||||
set_at: [ startup ]
|
||||
cpp_vartype: bool
|
||||
cpp_varname: "gEnableFinerGrainedCatalogCacheRefresh"
|
||||
|
||||
@ -769,7 +769,6 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
|
||||
<< "Failed to run query after " << kMaxRetries << " retries");
|
||||
throw;
|
||||
} else if (!ErrorCodes::isStaleShardVersionError(ex.code()) &&
|
||||
ex.code() != ErrorCodes::ShardInvalidatedForTargeting &&
|
||||
ex.code() != ErrorCodes::ShardNotFound) {
|
||||
|
||||
if (ErrorCodes::isRetriableError(ex.code())) {
|
||||
@ -791,23 +790,15 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
|
||||
"maxRetries"_attr = kMaxRetries,
|
||||
"error"_attr = redact(ex));
|
||||
|
||||
if (ex.code() != ErrorCodes::ShardInvalidatedForTargeting) {
|
||||
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
|
||||
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
|
||||
query.nss(), staleInfo->getVersionWanted(), staleInfo->getShardId());
|
||||
} else {
|
||||
catalogCache->invalidateCollectionEntry_LINEARIZABLE(query.nss());
|
||||
}
|
||||
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
|
||||
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
|
||||
query.nss(), staleInfo->getVersionWanted(), staleInfo->getShardId());
|
||||
} else {
|
||||
catalogCache->invalidateCollectionEntry_LINEARIZABLE(query.nss());
|
||||
}
|
||||
|
||||
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
|
||||
|
||||
if (auto txnRouter = TransactionRouter::get(opCtx)) {
|
||||
if (!txnRouter.canContinueOnStaleShardOrDbError(kFindCmdName, ex.toStatus())) {
|
||||
if (ex.code() == ErrorCodes::ShardInvalidatedForTargeting) {
|
||||
(void)catalogCache->getCollectionRoutingInfoWithPlacementRefresh(
|
||||
opCtx, query.nss());
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
|
||||
@ -1,59 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2020-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/s/shard_invalidated_for_targeting_exception.h"
|
||||
#include "mongo/base/init.h" // IWYU pragma: keep
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonelement.h"
|
||||
#include "mongo/util/namespace_string_util.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(ShardInvalidatedForTargetingInfo);
|
||||
|
||||
constexpr StringData kNss = "nss"_sd;
|
||||
|
||||
} // namespace
|
||||
|
||||
void ShardInvalidatedForTargetingInfo::serialize(BSONObjBuilder* bob) const {
|
||||
bob->append(kNss, NamespaceStringUtil::serialize(_nss, SerializationContext::stateDefault()));
|
||||
}
|
||||
|
||||
std::shared_ptr<const ErrorExtraInfo> ShardInvalidatedForTargetingInfo::parse(const BSONObj& obj) {
|
||||
return std::make_shared<ShardInvalidatedForTargetingInfo>(parseFromCommandError(obj));
|
||||
}
|
||||
|
||||
ShardInvalidatedForTargetingInfo ShardInvalidatedForTargetingInfo::parseFromCommandError(
|
||||
const BSONObj& obj) {
|
||||
return ShardInvalidatedForTargetingInfo(NamespaceStringUtil::deserialize(
|
||||
boost::none, obj["nss"].String(), SerializationContext::stateDefault()));
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
@ -1,77 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2020-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/base/error_codes.h"
|
||||
#include "mongo/base/error_extra_info.h"
|
||||
#include "mongo/bson/bsonobj.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* This error is thrown when a stale shard is found when attempting to retrieve a shard's shard
|
||||
* version for a namespace. The router (mongos) will retry a command when encountering this error,
|
||||
* and will block on a catalog cache refresh.
|
||||
*
|
||||
* TODO SERVER-39704 Allow transactions to retry upon receiving a ShardInvalidatedForTargeting
|
||||
* error.
|
||||
*/
|
||||
class ShardInvalidatedForTargetingInfo final : public ErrorExtraInfo {
|
||||
public:
|
||||
static constexpr auto code = ErrorCodes::ShardInvalidatedForTargeting;
|
||||
|
||||
explicit ShardInvalidatedForTargetingInfo(NamespaceString nss) : _nss(nss){};
|
||||
|
||||
const auto& getNss() const {
|
||||
return _nss;
|
||||
}
|
||||
|
||||
BSONObj toBSON() const {
|
||||
BSONObjBuilder bob;
|
||||
serialize(&bob);
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
void serialize(BSONObjBuilder* bob) const override;
|
||||
static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&);
|
||||
static ShardInvalidatedForTargetingInfo parseFromCommandError(const BSONObj& commandError);
|
||||
|
||||
private:
|
||||
NamespaceString _nss;
|
||||
};
|
||||
using ShardInvalidatedForTargetingException =
|
||||
ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>;
|
||||
|
||||
} // namespace mongo
|
||||
@ -89,11 +89,6 @@ void checkErrorStatusAndMaxRetries(const Status& status,
|
||||
return;
|
||||
}
|
||||
|
||||
if (status == ErrorCodes::ShardInvalidatedForTargeting) {
|
||||
logAndTestMaxRetries(status);
|
||||
return;
|
||||
}
|
||||
|
||||
uassertStatusOK(status);
|
||||
}
|
||||
|
||||
|
||||
@ -97,8 +97,6 @@ auto shardVersionRetry(OperationContext* opCtx,
|
||||
size_t numAttempts = 0;
|
||||
|
||||
while (true) {
|
||||
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, numAttempts);
|
||||
|
||||
try {
|
||||
return callbackFn();
|
||||
} catch (const DBException& ex) {
|
||||
@ -140,7 +138,6 @@ auto shardVersionRetry(ServiceContext* service,
|
||||
auto cancelableOpCtx = opCtxFactory.makeOperationContext(&cc());
|
||||
auto opCtx = cancelableOpCtx.get();
|
||||
|
||||
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, *numAttempts);
|
||||
return _callbackFn(opCtx);
|
||||
};
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user