SERVER-112122 Test that async oplog sampling doesn't block FCBIS (#42546)
GitOrigin-RevId: 2da4fbce585c1adae80c322da949a47ae4f5e2a7
This commit is contained in:
parent
9f2273839d
commit
2dc4bbb91a
@ -192,11 +192,14 @@ CollectionTruncateMarkers::InitialSetOfMarkers CollectionTruncateMarkers::create
|
||||
OperationContext* opCtx,
|
||||
CollectionIterator& collectionIterator,
|
||||
int64_t minBytesPerMarker,
|
||||
std::function<RecordIdAndWallTime(const Record&)> getRecordIdAndWallTime) {
|
||||
std::function<RecordIdAndWallTime(const Record&)> getRecordIdAndWallTime,
|
||||
TickSource* tickSource) {
|
||||
auto startTime = curTimeMicros64();
|
||||
const int64_t numRecordsTotal = collectionIterator.numRecords();
|
||||
LOGV2_INFO(7393212,
|
||||
"Scanning collection to determine where to place markers for truncation",
|
||||
"uuid"_attr = collectionIterator.getRecordStore()->uuid());
|
||||
"uuid"_attr = collectionIterator.getRecordStore()->uuid(),
|
||||
"numRecords"_attr = numRecordsTotal);
|
||||
|
||||
int64_t numRecords = 0;
|
||||
int64_t dataSize = 0;
|
||||
@ -204,6 +207,7 @@ CollectionTruncateMarkers::InitialSetOfMarkers CollectionTruncateMarkers::create
|
||||
int64_t currentBytes = 0;
|
||||
|
||||
std::deque<Marker> markers;
|
||||
Timer lastProgressTimer(tickSource);
|
||||
|
||||
while (auto nextRecord = collectionIterator.getNext()) {
|
||||
const auto& [rId, doc] = *nextRecord;
|
||||
@ -225,6 +229,17 @@ CollectionTruncateMarkers::InitialSetOfMarkers CollectionTruncateMarkers::create
|
||||
numRecords++;
|
||||
dataSize += doc.objsize();
|
||||
|
||||
const int samplingLogIntervalSeconds = gCollectionSamplingLogIntervalSeconds.load();
|
||||
if (samplingLogIntervalSeconds > 0 &&
|
||||
lastProgressTimer.elapsed() >= Seconds(samplingLogIntervalSeconds)) {
|
||||
LOGV2(11212203,
|
||||
"Collection scanning progress",
|
||||
"uuid"_attr = collectionIterator.getRecordStore()->uuid(),
|
||||
"completed"_attr = numRecords,
|
||||
"total"_attr = numRecordsTotal);
|
||||
lastProgressTimer.reset();
|
||||
}
|
||||
|
||||
// Force a call to next() only every ~ 1 second to simulate slowness.
|
||||
if (MONGO_unlikely(gUseSlowCollectionTruncateMarkerScanning)) {
|
||||
sleepFor(Seconds(1));
|
||||
|
||||
@ -242,7 +242,8 @@ public:
|
||||
OperationContext* opCtx,
|
||||
CollectionIterator& collIterator,
|
||||
int64_t minBytesPerMarker,
|
||||
std::function<RecordIdAndWallTime(const Record&)> getRecordIdAndWallTime);
|
||||
std::function<RecordIdAndWallTime(const Record&)> getRecordIdAndWallTime,
|
||||
TickSource* tickSource = globalSystemTickSource());
|
||||
|
||||
// Creates the initial set of markers by sampling the collection. The set of markers
|
||||
// returned will have approximate metrics. The metrics of each marker will be equal and contain
|
||||
|
||||
@ -69,7 +69,11 @@ AtomicWord<int64_t> totalTimeTruncating;
|
||||
// Cumulative number of truncates of the oplog.
|
||||
AtomicWord<int64_t> truncateCount;
|
||||
|
||||
// Cumulative number of times the thread has been interrupted
|
||||
AtomicWord<int64_t> interruptCount;
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(hangOplogCapMaintainerThread);
|
||||
MONGO_FAIL_POINT_DEFINE(hangBeforeOplogSampling);
|
||||
|
||||
class OplogTruncateMarkersServerStatusSection : public ServerStatusSection {
|
||||
public:
|
||||
@ -122,6 +126,7 @@ public:
|
||||
|
||||
builder.append("totalTimeTruncatingMicros", totalTimeTruncating.load());
|
||||
builder.append("truncateCount", truncateCount.load());
|
||||
builder.append("interruptCount", interruptCount.load());
|
||||
|
||||
return builder.obj();
|
||||
}
|
||||
@ -261,6 +266,11 @@ void OplogCapMaintainerThread::run() {
|
||||
}
|
||||
}
|
||||
|
||||
if (MONGO_unlikely(hangBeforeOplogSampling.shouldFail())) {
|
||||
LOGV2(11212200, "Hanging due to 'hangBeforeSampling' fail point");
|
||||
hangBeforeOplogSampling.pauseWhileSet(_uniqueCtx->get());
|
||||
}
|
||||
|
||||
// Need the oplog to have been created first before we proceed.
|
||||
do {
|
||||
// Create the initial set of truncate markers as part of this thread before we
|
||||
@ -293,16 +303,9 @@ void OplogCapMaintainerThread::run() {
|
||||
sleepFor(Milliseconds(100));
|
||||
LOGV2_DEBUG(10621109, 1, "OplogCapMaintainerThread is active");
|
||||
} while (true);
|
||||
} catch (const ExceptionFor<ErrorCategory::ShutdownError>& e) {
|
||||
LOGV2_DEBUG(9468100,
|
||||
1,
|
||||
"Interrupted due to shutdown. OplogCapMaintainerThread Exiting!",
|
||||
"error"_attr = e.what());
|
||||
return;
|
||||
} catch (const ExceptionFor<ErrorCodes::InterruptedDueToStorageChange>&) {
|
||||
LOGV2_DEBUG(10167201,
|
||||
1,
|
||||
"Interrupted due to storage change. OplogCapMaintainerThread Exiting!");
|
||||
} catch (ExceptionFor<ErrorCategory::Interruption>& ex) {
|
||||
LOGV2(11212201, "OplogCapMaintainerThread interrupted", "reason"_attr = ex.reason());
|
||||
interruptCount.fetchAndAdd(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -329,16 +332,9 @@ void OplogCapMaintainerThread::run() {
|
||||
|
||||
_uniqueCtx->get()->sleepFor(
|
||||
Seconds(1)); // Back off in case there were problems deleting.
|
||||
} catch (const ExceptionFor<ErrorCategory::ShutdownError>& e) {
|
||||
LOGV2_DEBUG(9259900,
|
||||
1,
|
||||
"Interrupted due to shutdown. OplogCapMaintainerThread Exiting",
|
||||
"error"_attr = e);
|
||||
return;
|
||||
} catch (const ExceptionFor<ErrorCodes::InterruptedDueToStorageChange>&) {
|
||||
LOGV2_DEBUG(10167202,
|
||||
1,
|
||||
"Interrupted due to storage change. OplogCapMaintainerThread Exiting!");
|
||||
} catch (ExceptionFor<ErrorCategory::Interruption>& ex) {
|
||||
LOGV2(11212204, "OplogCapMaintainerThread interrupted", "reason"_attr = ex.reason());
|
||||
interruptCount.fetchAndAdd(1);
|
||||
return;
|
||||
} catch (...) {
|
||||
const auto& err = mongo::exceptionToStatus();
|
||||
@ -362,7 +358,7 @@ void OplogCapMaintainerThread::run() {
|
||||
}
|
||||
|
||||
void OplogCapMaintainerThread::shutdown(const Status& reason) {
|
||||
LOGV2_INFO(7474902, "Shutting down oplog cap maintainer thread");
|
||||
LOGV2_INFO(7474902, "Shutting down oplog cap maintainer thread", "reason"_attr = reason);
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex);
|
||||
if (_uniqueCtx) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user