SERVER-119085: Make networkCounters a function and duplicate some network counters into otel (#53511)
GitOrigin-RevId: 9c348da072ccd87ec6dc34b306b132f40b0d8977
This commit is contained in:
parent
31bfafd041
commit
333ce46dac
@ -301,7 +301,7 @@ Future<void> AsyncDBClient::_call(Message request,
|
||||
int32_t msgId,
|
||||
const BatonHandle& baton,
|
||||
const CancellationToken& token) {
|
||||
networkCounter.hitLogicalOut(NetworkCounter::ConnectionType::kEgress, request.size());
|
||||
globalNetworkCounter().hitLogicalOut(NetworkCounter::ConnectionType::kEgress, request.size());
|
||||
|
||||
auto swm = _compressorManager.compressMessage(request);
|
||||
if (!swm.isOK()) {
|
||||
@ -344,8 +344,8 @@ Future<Message> AsyncDBClient::_waitForResponse(boost::optional<int32_t> msgId,
|
||||
? _compressorManager.decompressMessage(response)
|
||||
: response;
|
||||
if (swMessage.isOK()) {
|
||||
networkCounter.hitLogicalIn(NetworkCounter::ConnectionType::kEgress,
|
||||
swMessage.getValue().size());
|
||||
globalNetworkCounter().hitLogicalIn(NetworkCounter::ConnectionType::kEgress,
|
||||
swMessage.getValue().size());
|
||||
}
|
||||
return swMessage;
|
||||
});
|
||||
|
||||
@ -536,7 +536,7 @@ Message DBClientSession::_call(Message& toSend, string* actualServer) {
|
||||
OpMsg::appendChecksum(&toSend);
|
||||
#endif
|
||||
}
|
||||
networkCounter.hitLogicalOut(NetworkCounter::ConnectionType::kEgress, toSend.size());
|
||||
globalNetworkCounter().hitLogicalOut(NetworkCounter::ConnectionType::kEgress, toSend.size());
|
||||
auto swm = _compressorManager.compressMessage(toSend);
|
||||
uassertStatusOK(swm.getStatus());
|
||||
|
||||
@ -568,7 +568,7 @@ Message DBClientSession::_call(Message& toSend, string* actualServer) {
|
||||
if (response.operation() == dbCompressed) {
|
||||
response = uassertStatusOK(_compressorManager.decompressMessage(response));
|
||||
}
|
||||
networkCounter.hitLogicalIn(NetworkCounter::ConnectionType::kEgress, response.size());
|
||||
globalNetworkCounter().hitLogicalIn(NetworkCounter::ConnectionType::kEgress, response.size());
|
||||
|
||||
killSessionOnError.dismiss();
|
||||
return response;
|
||||
|
||||
@ -67,7 +67,7 @@ public:
|
||||
BSONObj generateSection(OperationContext* opCtx,
|
||||
const BSONElement& configElement) const override {
|
||||
BSONObjBuilder b;
|
||||
networkCounter.append(b);
|
||||
globalNetworkCounter().append(b);
|
||||
appendMessageCompressionStats(&b);
|
||||
|
||||
|
||||
|
||||
@ -168,6 +168,7 @@ mongo_cc_library(
|
||||
"//src/mongo:base",
|
||||
"//src/mongo/db/commands/server_status:server_status_core",
|
||||
"//src/mongo/db/storage:index_entry_comparison",
|
||||
"//src/mongo/otel/metrics:otel_metrics_service",
|
||||
"//src/mongo/util/concurrency:spin_lock",
|
||||
],
|
||||
)
|
||||
@ -238,6 +239,18 @@ mongo_cc_unit_test(
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "counters_test",
|
||||
srcs = [
|
||||
"counters_test.cpp",
|
||||
],
|
||||
tags = ["mongo_unittest_seventh_group"],
|
||||
deps = [
|
||||
":counters",
|
||||
"//src/mongo/otel/metrics:metrics_test_util",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "db_stats_test",
|
||||
srcs = [
|
||||
|
||||
@ -34,6 +34,10 @@
|
||||
#include "mongo/client/authenticate.h"
|
||||
#include "mongo/db/commands/server_status/server_status.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/otel/metrics/metric_names.h"
|
||||
#include "mongo/otel/metrics/metric_unit.h"
|
||||
#include "mongo/otel/metrics/metrics_service.h"
|
||||
#include "mongo/util/static_immortal.h"
|
||||
|
||||
#include <tuple>
|
||||
|
||||
@ -43,6 +47,35 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
using ::mongo::otel::metrics::MetricNames;
|
||||
using ::mongo::otel::metrics::MetricsService;
|
||||
using ::mongo::otel::metrics::MetricUnit;
|
||||
|
||||
NetworkCounter::NetworkCounter()
|
||||
: _ingressLogicalBytesIn(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkIngressBytesIn,
|
||||
"Total number of logical bytes received from ingress (wire-protocol) clients.",
|
||||
MetricUnit::kBytes)),
|
||||
_ingressNumRequests(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkIngressNumRequests,
|
||||
"Total number of requests received from ingress (wire-protocol) clients.",
|
||||
MetricUnit::kOperations)),
|
||||
_ingressLogicalBytesOut(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkIngressBytesOut,
|
||||
"Total number of logical bytes sent to ingress (wire-protocol) clients.",
|
||||
MetricUnit::kBytes)),
|
||||
_egressLogicalBytesIn(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkEgressBytesIn,
|
||||
"Total number of logical bytes received on egress (outbound client) connections.",
|
||||
MetricUnit::kBytes)),
|
||||
_egressNumRequests(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkEgressNumRequests,
|
||||
"Total number of requests sent on egress (outbound client) connections.",
|
||||
MetricUnit::kOperations)),
|
||||
_egressLogicalBytesOut(MetricsService::instance().createInt64Counter(
|
||||
MetricNames::kNetworkEgressBytesOut,
|
||||
"Total number of logical bytes sent on egress (outbound client) connections.",
|
||||
MetricUnit::kBytes)) {}
|
||||
|
||||
void NetworkCounter::hitPhysicalIn(ConnectionType connectionType, long long bytes) {
|
||||
static const int64_t MAX = 1ULL << 60;
|
||||
@ -75,36 +108,23 @@ void NetworkCounter::hitPhysicalOut(ConnectionType connectionType, long long byt
|
||||
}
|
||||
|
||||
void NetworkCounter::hitLogicalIn(ConnectionType connectionType, long long bytes) {
|
||||
static const int64_t MAX = 1ULL << 60;
|
||||
auto& ref = connectionType == ConnectionType::kIngress ? _ingressTogether : _egressTogether;
|
||||
|
||||
// don't care about the race as its just a counter
|
||||
const bool overflow = ref->logicalBytesIn.loadRelaxed() > MAX;
|
||||
|
||||
if (overflow) {
|
||||
ref->logicalBytesIn.store(bytes);
|
||||
// The requests field only gets incremented here (and not in hitPhysical) because the
|
||||
// hitLogical and hitPhysical are each called for each operation. Incrementing it in both
|
||||
// functions would double-count the number of operations.
|
||||
ref->requests.store(1);
|
||||
// The requests field only gets incremented here (and not in hitPhysical) because
|
||||
// hitLogical and hitPhysical are each called for each operation. Incrementing it in both
|
||||
// functions would double-count the number of operations.
|
||||
if (connectionType == ConnectionType::kIngress) {
|
||||
_ingressLogicalBytesIn.add(bytes);
|
||||
_ingressNumRequests.add(1);
|
||||
} else {
|
||||
ref->logicalBytesIn.fetchAndAdd(bytes);
|
||||
ref->requests.fetchAndAdd(1);
|
||||
_egressLogicalBytesIn.add(bytes);
|
||||
_egressNumRequests.add(1);
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkCounter::hitLogicalOut(ConnectionType connectionType, long long bytes) {
|
||||
static const int64_t MAX = 1ULL << 60;
|
||||
auto& ref = connectionType == ConnectionType::kIngress ? _ingressLogicalBytesOut
|
||||
: _egressLogicalBytesOut;
|
||||
|
||||
// don't care about the race as its just a counter
|
||||
const bool overflow = ref->loadRelaxed() > MAX;
|
||||
|
||||
if (overflow) {
|
||||
ref->store(bytes);
|
||||
if (connectionType == ConnectionType::kIngress) {
|
||||
_ingressLogicalBytesOut.add(bytes);
|
||||
} else {
|
||||
ref->fetchAndAdd(bytes);
|
||||
_egressLogicalBytesOut.add(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,26 +141,24 @@ void NetworkCounter::acceptedTFOIngress() {
|
||||
}
|
||||
|
||||
void NetworkCounter::append(BSONObjBuilder& b) {
|
||||
b.append("bytesIn", static_cast<long long>(_ingressTogether->logicalBytesIn.loadRelaxed()));
|
||||
b.append("bytesOut", static_cast<long long>(_ingressLogicalBytesOut->loadRelaxed()));
|
||||
b.append("bytesIn", _ingressLogicalBytesIn.valueForLegacyUse());
|
||||
b.append("bytesOut", _ingressLogicalBytesOut.valueForLegacyUse());
|
||||
b.append("physicalBytesIn", static_cast<long long>(_ingressPhysicalBytesIn->loadRelaxed()));
|
||||
b.append("physicalBytesOut", static_cast<long long>(_ingressPhysicalBytesOut->loadRelaxed()));
|
||||
|
||||
BSONObjBuilder egressBuilder(b.subobjStart("egress"));
|
||||
egressBuilder.append("bytesIn",
|
||||
static_cast<long long>(_egressTogether->logicalBytesIn.loadRelaxed()));
|
||||
egressBuilder.append("bytesOut", static_cast<long long>(_egressLogicalBytesOut->loadRelaxed()));
|
||||
egressBuilder.append("bytesIn", _egressLogicalBytesIn.valueForLegacyUse());
|
||||
egressBuilder.append("bytesOut", _egressLogicalBytesOut.valueForLegacyUse());
|
||||
egressBuilder.append("physicalBytesIn",
|
||||
static_cast<long long>(_egressPhysicalBytesIn->loadRelaxed()));
|
||||
egressBuilder.append("physicalBytesOut",
|
||||
static_cast<long long>(_egressPhysicalBytesOut->loadRelaxed()));
|
||||
egressBuilder.append("numRequests",
|
||||
static_cast<long long>(_egressTogether->requests.loadRelaxed()));
|
||||
egressBuilder.append("numRequests", _egressNumRequests.valueForLegacyUse());
|
||||
egressBuilder.done();
|
||||
|
||||
b.append("numSlowDNSOperations", static_cast<long long>(_numSlowDNSOperations->loadRelaxed()));
|
||||
b.append("numSlowSSLOperations", static_cast<long long>(_numSlowSSLOperations->loadRelaxed()));
|
||||
b.append("numRequests", static_cast<long long>(_ingressTogether->requests.loadRelaxed()));
|
||||
b.append("numRequests", _ingressNumRequests.valueForLegacyUse());
|
||||
|
||||
BSONObjBuilder tfo;
|
||||
#ifdef __linux__
|
||||
@ -152,6 +170,11 @@ void NetworkCounter::append(BSONObjBuilder& b) {
|
||||
b.append("tcpFastOpen", tfo.obj());
|
||||
}
|
||||
|
||||
NetworkCounter& globalNetworkCounter() {
|
||||
static StaticImmortal<NetworkCounter> instance;
|
||||
return *instance;
|
||||
}
|
||||
|
||||
const std::vector<std::string> kAllMechanisms{std::string(auth::kMechanismMongoX509),
|
||||
std::string(auth::kMechanismSaslPlain),
|
||||
std::string(auth::kMechanismGSSAPI),
|
||||
@ -325,7 +348,6 @@ void AuthCounter::append(BSONObjBuilder* b) {
|
||||
}
|
||||
|
||||
|
||||
NetworkCounter networkCounter;
|
||||
AuthCounter authCounter;
|
||||
AggStageCounters aggStageCounters{"aggStageCounters."};
|
||||
DotsAndDollarsFieldsCounters dotsAndDollarsFieldsCounters;
|
||||
|
||||
@ -37,6 +37,7 @@
|
||||
#include "mongo/db/query/plan_executor.h"
|
||||
#include "mongo/db/stats/opcounters.h"
|
||||
#include "mongo/db/topology/cluster_role.h"
|
||||
#include "mongo/otel/metrics/metrics_counter.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/rpc/message.h"
|
||||
#include "mongo/util/aligned.h"
|
||||
@ -63,6 +64,12 @@ namespace MONGO_MOD_PUBLIC mongo {
|
||||
class NetworkCounter {
|
||||
public:
|
||||
enum class ConnectionType { kIngress = 1, kEgress = 2 };
|
||||
|
||||
NetworkCounter();
|
||||
|
||||
NetworkCounter(const NetworkCounter&) = delete;
|
||||
NetworkCounter& operator=(const NetworkCounter&) = delete;
|
||||
|
||||
// Increment the counters for the number of bytes read directly off the wire
|
||||
void hitPhysicalIn(ConnectionType connectionType, long long bytes);
|
||||
void hitPhysicalOut(ConnectionType connectionType, long long bytes);
|
||||
@ -96,28 +103,21 @@ public:
|
||||
void append(BSONObjBuilder& b);
|
||||
|
||||
private:
|
||||
// Physical byte counters — not OTel-exported.
|
||||
CacheExclusive<AtomicWord<long long>> _ingressPhysicalBytesIn{0};
|
||||
CacheExclusive<AtomicWord<long long>> _ingressPhysicalBytesOut{0};
|
||||
|
||||
CacheExclusive<AtomicWord<long long>> _egressPhysicalBytesIn{0};
|
||||
CacheExclusive<AtomicWord<long long>> _egressPhysicalBytesOut{0};
|
||||
|
||||
// These two counters are always incremented at the same time, so
|
||||
// we place them on the same cache line. We use
|
||||
// CacheCombinedExclusive to ensure that they are combined within
|
||||
// the scope of a constructive interference region, and protected
|
||||
// from false sharing by padding out to destructive interference
|
||||
// size.
|
||||
struct Together {
|
||||
AtomicWord<long long> logicalBytesIn{0};
|
||||
AtomicWord<long long> requests{0};
|
||||
};
|
||||
// Logical ingress counters.
|
||||
otel::metrics::Counter<int64_t>& _ingressLogicalBytesIn;
|
||||
otel::metrics::Counter<int64_t>& _ingressNumRequests;
|
||||
otel::metrics::Counter<int64_t>& _ingressLogicalBytesOut;
|
||||
|
||||
CacheCombinedExclusive<Together> _ingressTogether{};
|
||||
CacheExclusive<AtomicWord<long long>> _ingressLogicalBytesOut{0};
|
||||
|
||||
CacheCombinedExclusive<Together> _egressTogether{};
|
||||
CacheExclusive<AtomicWord<long long>> _egressLogicalBytesOut{0};
|
||||
// Logical egress counters.
|
||||
otel::metrics::Counter<int64_t>& _egressLogicalBytesIn;
|
||||
otel::metrics::Counter<int64_t>& _egressNumRequests;
|
||||
otel::metrics::Counter<int64_t>& _egressLogicalBytesOut;
|
||||
|
||||
CacheExclusive<AtomicWord<long long>> _numSlowDNSOperations{0};
|
||||
CacheExclusive<AtomicWord<long long>> _numSlowSSLOperations{0};
|
||||
@ -131,7 +131,8 @@ private:
|
||||
bool _tfoKernelSupportClient{false};
|
||||
};
|
||||
|
||||
extern NetworkCounter networkCounter;
|
||||
/** Returns the process-global NetworkCounter. */
|
||||
NetworkCounter& globalNetworkCounter();
|
||||
|
||||
class AuthCounter {
|
||||
struct MechanismData;
|
||||
|
||||
117
src/mongo/db/stats/counters_test.cpp
Normal file
117
src/mongo/db/stats/counters_test.cpp
Normal file
@ -0,0 +1,117 @@
|
||||
/**
|
||||
* Copyright (C) 2026-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/stats/counters.h"
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/otel/metrics/metric_names.h"
|
||||
#include "mongo/otel/metrics/metrics_test_util.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
using ::mongo::otel::metrics::MetricNames;
|
||||
using ::mongo::otel::metrics::OtelMetricsCapturer;
|
||||
|
||||
BSONObj getNetworkBson(NetworkCounter& nc) {
|
||||
BSONObjBuilder bob;
|
||||
nc.append(bob);
|
||||
return bob.obj();
|
||||
}
|
||||
|
||||
TEST(NetworkCounterOtelTest, IngressCountersAreExported) {
|
||||
OtelMetricsCapturer capturer;
|
||||
if (!capturer.canReadMetrics()) {
|
||||
return;
|
||||
}
|
||||
|
||||
NetworkCounter nc;
|
||||
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kIngress, 100);
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kIngress, 50);
|
||||
nc.hitLogicalOut(NetworkCounter::ConnectionType::kIngress, 25);
|
||||
|
||||
EXPECT_EQ(150, capturer.readInt64Counter(MetricNames::kNetworkIngressBytesIn));
|
||||
EXPECT_EQ(25, capturer.readInt64Counter(MetricNames::kNetworkIngressBytesOut));
|
||||
EXPECT_EQ(2, capturer.readInt64Counter(MetricNames::kNetworkIngressNumRequests));
|
||||
}
|
||||
|
||||
TEST(NetworkCounterOtelTest, EgressCountersAreExported) {
|
||||
OtelMetricsCapturer capturer;
|
||||
if (!capturer.canReadMetrics()) {
|
||||
return;
|
||||
}
|
||||
|
||||
NetworkCounter nc;
|
||||
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kEgress, 200);
|
||||
nc.hitLogicalOut(NetworkCounter::ConnectionType::kEgress, 80);
|
||||
|
||||
EXPECT_EQ(200, capturer.readInt64Counter(MetricNames::kNetworkEgressBytesIn));
|
||||
EXPECT_EQ(80, capturer.readInt64Counter(MetricNames::kNetworkEgressBytesOut));
|
||||
EXPECT_EQ(1, capturer.readInt64Counter(MetricNames::kNetworkEgressNumRequests));
|
||||
}
|
||||
|
||||
TEST(NetworkCounterBsonTest, IngressBytesInAndNumRequests) {
|
||||
NetworkCounter nc;
|
||||
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kIngress, 300);
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kIngress, 700);
|
||||
|
||||
BSONObj obj = getNetworkBson(nc);
|
||||
EXPECT_EQ(1000, obj["bytesIn"].numberLong());
|
||||
EXPECT_EQ(2, obj["numRequests"].numberLong());
|
||||
}
|
||||
|
||||
TEST(NetworkCounterBsonTest, IngressBytesOut) {
|
||||
NetworkCounter nc;
|
||||
|
||||
nc.hitLogicalOut(NetworkCounter::ConnectionType::kIngress, 512);
|
||||
|
||||
BSONObj obj = getNetworkBson(nc);
|
||||
EXPECT_EQ(512, obj["bytesOut"].numberLong());
|
||||
}
|
||||
|
||||
TEST(NetworkCounterBsonTest, EgressSubObject) {
|
||||
NetworkCounter nc;
|
||||
|
||||
nc.hitLogicalIn(NetworkCounter::ConnectionType::kEgress, 400);
|
||||
nc.hitLogicalOut(NetworkCounter::ConnectionType::kEgress, 100);
|
||||
|
||||
BSONObj obj = getNetworkBson(nc);
|
||||
ASSERT_TRUE(obj.hasField("egress"));
|
||||
BSONObj egress = obj["egress"].Obj();
|
||||
EXPECT_EQ(400, egress["bytesIn"].numberLong());
|
||||
EXPECT_EQ(100, egress["bytesOut"].numberLong());
|
||||
EXPECT_EQ(1, egress["numRequests"].numberLong());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
@ -91,6 +91,18 @@ class MONGO_MOD_FILE_PRIVATE MetricNameMaker{public : static constexpr MetricNam
|
||||
class MetricNames {
|
||||
public:
|
||||
// Networking & Observability Team Metrics
|
||||
static constexpr MetricName kNetworkIngressBytesIn =
|
||||
MetricNameMaker::make("serverStatus.network.bytesIn");
|
||||
static constexpr MetricName kNetworkIngressBytesOut =
|
||||
MetricNameMaker::make("serverStatus.network.bytesOut");
|
||||
static constexpr MetricName kNetworkIngressNumRequests =
|
||||
MetricNameMaker::make("serverStatus.network.numRequests");
|
||||
static constexpr MetricName kNetworkEgressBytesIn =
|
||||
MetricNameMaker::make("serverStatus.network.egress.bytesIn");
|
||||
static constexpr MetricName kNetworkEgressBytesOut =
|
||||
MetricNameMaker::make("serverStatus.network.egress.bytesOut");
|
||||
static constexpr MetricName kNetworkEgressNumRequests =
|
||||
MetricNameMaker::make("serverStatus.network.egress.numRequests");
|
||||
static constexpr MetricName kPrometheusFileExporterWrites =
|
||||
MetricNameMaker::make("metrics.prometheus_file_exporter.writes");
|
||||
static constexpr MetricName kPrometheusFileExporterWritesFailed =
|
||||
|
||||
@ -406,7 +406,7 @@ Future<void> CommonAsioSession::sinkMessageImpl(Message message, const BatonHand
|
||||
.then([this, message /*keep the buffer alive*/]() {
|
||||
auto connectionType = _isIngressSession ? NetworkCounter::ConnectionType::kIngress
|
||||
: NetworkCounter::ConnectionType::kEgress;
|
||||
networkCounter.hitPhysicalOut(connectionType, message.size());
|
||||
globalNetworkCounter().hitPhysicalOut(connectionType, message.size());
|
||||
})
|
||||
.onCompletion([this](Status status) {
|
||||
_asyncOpState.complete();
|
||||
@ -712,7 +712,7 @@ Future<Message> CommonAsioSession::sourceMessageImpl(const BatonHandle& baton) {
|
||||
: NetworkCounter::ConnectionType::kEgress;
|
||||
if (msgLen == kHeaderSize) {
|
||||
// This probably isn't a real case since all (current) messages have bodies.
|
||||
networkCounter.hitPhysicalIn(connectionType, msgLen);
|
||||
globalNetworkCounter().hitPhysicalIn(connectionType, msgLen);
|
||||
return Future<Message>::makeReady(Message(std::move(headerBuffer)));
|
||||
}
|
||||
|
||||
@ -722,7 +722,7 @@ Future<Message> CommonAsioSession::sourceMessageImpl(const BatonHandle& baton) {
|
||||
MsgData::View msgView(buffer.get());
|
||||
return read(asio::buffer(msgView.data(), msgView.dataLen()), baton)
|
||||
.then([this, buffer = std::move(buffer), connectionType, msgLen]() mutable {
|
||||
networkCounter.hitPhysicalIn(connectionType, msgLen);
|
||||
globalNetworkCounter().hitPhysicalIn(connectionType, msgLen);
|
||||
return Message(std::move(buffer));
|
||||
});
|
||||
})
|
||||
|
||||
@ -138,14 +138,14 @@ bool tryTcpSockOpt(int opt, int val) {
|
||||
|
||||
/**
|
||||
* Probe the socket API support for TFO-related options on TCP sockets, and
|
||||
* record the results in the global `networkCounter` object.
|
||||
* record the results in the global network counter object.
|
||||
*/
|
||||
void checkRelevantSocketOptionsAccepted() {
|
||||
#ifdef TCP_FASTOPEN
|
||||
networkCounter.setTFOServerSupport(tryTcpSockOpt(TCP_FASTOPEN, 1));
|
||||
globalNetworkCounter().setTFOServerSupport(tryTcpSockOpt(TCP_FASTOPEN, 1));
|
||||
#endif
|
||||
#ifdef TCP_FASTOPEN_CONNECT
|
||||
networkCounter.setTFOClientSupport(tryTcpSockOpt(TCP_FASTOPEN_CONNECT, 1));
|
||||
globalNetworkCounter().setTFOClientSupport(tryTcpSockOpt(TCP_FASTOPEN_CONNECT, 1));
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -204,7 +204,7 @@ void checkEnabledByKernel(bool srv, bool cli) {
|
||||
|
||||
int64_t k; // The kernel setting.
|
||||
f >> k;
|
||||
networkCounter.setTFOKernelSetting(k);
|
||||
globalNetworkCounter().setTFOKernelSetting(k);
|
||||
|
||||
// Return an integer composed of all bits from 'm' that are missing from 'x'.
|
||||
auto maskBitsMissing = [](uint64_t x, uint64_t m) {
|
||||
|
||||
@ -925,7 +925,7 @@ StatusWith<std::shared_ptr<Session>> AsioTransportLayer::connect(
|
||||
Milliseconds dnsResolveLatency = Date_t::now() - timeBefore;
|
||||
_dnsResolveStatsMillis.record(durationCount<Milliseconds>(dnsResolveLatency));
|
||||
if (dnsResolveLatency > kSlowOperationThreshold) {
|
||||
networkCounter.incrementNumSlowDNSOperations();
|
||||
globalNetworkCounter().incrementNumSlowDNSOperations();
|
||||
}
|
||||
|
||||
if (!swEndpoints.isOK()) {
|
||||
@ -995,7 +995,7 @@ StatusWith<std::shared_ptr<Session>> AsioTransportLayer::connect(
|
||||
Date_t timeAfter = Date_t::now();
|
||||
|
||||
if (timeAfter - timeBefore > kSlowOperationThreshold) {
|
||||
networkCounter.incrementNumSlowSSLOperations();
|
||||
globalNetworkCounter().incrementNumSlowSSLOperations();
|
||||
}
|
||||
|
||||
if (finishLine->arriveStrongly()) {
|
||||
@ -1210,7 +1210,7 @@ Future<std::shared_ptr<Session>> AsioTransportLayer::asyncConnect(
|
||||
"DNS resolution while connecting to peer was slow",
|
||||
"peer"_attr = connector->peer,
|
||||
"duration"_attr = resolveLatency);
|
||||
networkCounter.incrementNumSlowDNSOperations();
|
||||
globalNetworkCounter().incrementNumSlowDNSOperations();
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lk(connector->mutex);
|
||||
@ -1278,7 +1278,7 @@ Future<std::shared_ptr<Session>> AsioTransportLayer::asyncConnect(
|
||||
connectionMetrics->onTLSHandshakeFinished();
|
||||
|
||||
if (duration > kSlowOperationThreshold) {
|
||||
networkCounter.incrementNumSlowSSLOperations();
|
||||
globalNetworkCounter().incrementNumSlowSSLOperations();
|
||||
}
|
||||
return Status::OK();
|
||||
});
|
||||
@ -1642,7 +1642,7 @@ void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) {
|
||||
TcpInfoOption tcpi{};
|
||||
peerSocket.get_option(tcpi);
|
||||
if (tcpi->tcpi_options & TCPI_OPT_SYN_DATA)
|
||||
networkCounter.acceptedTFOIngress();
|
||||
globalNetworkCounter().acceptedTFOIngress();
|
||||
} catch (const asio::system_error&) {
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -978,7 +978,7 @@ void runTfoScenario(bool serverOn, bool clientOn, bool expectTfo) {
|
||||
|
||||
auto extractTfoAccepted = [] {
|
||||
BSONObjBuilder bob;
|
||||
networkCounter.append(bob);
|
||||
globalNetworkCounter().append(bob);
|
||||
auto obj = bob.obj();
|
||||
auto accepted = obj["tcpFastOpen"]["accepted"].numberInt();
|
||||
return std::pair{accepted, obj};
|
||||
|
||||
@ -177,8 +177,8 @@ IngressSession::~IngressSession() {
|
||||
|
||||
StatusWith<Message> IngressSession::_readFromStream() {
|
||||
if (auto maybeBuffer = _stream->read()) {
|
||||
networkCounter.hitPhysicalIn(NetworkCounter::ConnectionType::kIngress,
|
||||
MsgData::ConstView(maybeBuffer->get()).getLen());
|
||||
globalNetworkCounter().hitPhysicalIn(NetworkCounter::ConnectionType::kIngress,
|
||||
MsgData::ConstView(maybeBuffer->get()).getLen());
|
||||
return Message(std::move(*maybeBuffer));
|
||||
}
|
||||
|
||||
@ -194,7 +194,8 @@ StatusWith<Message> IngressSession::_readFromStream() {
|
||||
|
||||
Status IngressSession::_writeToStream(Message message) {
|
||||
if (_stream->write(message.sharedBuffer())) {
|
||||
networkCounter.hitPhysicalOut(NetworkCounter::ConnectionType::kIngress, message.size());
|
||||
globalNetworkCounter().hitPhysicalOut(NetworkCounter::ConnectionType::kIngress,
|
||||
message.size());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -293,8 +294,8 @@ Future<Message> EgressSession::_asyncReadFromStream() {
|
||||
})
|
||||
.then([this, msg = std::move(msg)]() {
|
||||
_updateWireVersion();
|
||||
networkCounter.hitPhysicalIn(NetworkCounter::ConnectionType::kEgress,
|
||||
MsgData::ConstView(msg->get()).getLen());
|
||||
globalNetworkCounter().hitPhysicalIn(NetworkCounter::ConnectionType::kEgress,
|
||||
MsgData::ConstView(msg->get()).getLen());
|
||||
return Message(std::move(*msg));
|
||||
});
|
||||
}
|
||||
@ -318,7 +319,7 @@ Future<void> EgressSession::_asyncWriteToStream(Message message) {
|
||||
});
|
||||
})
|
||||
.then([msgLen]() {
|
||||
networkCounter.hitPhysicalOut(NetworkCounter::ConnectionType::kEgress, msgLen);
|
||||
globalNetworkCounter().hitPhysicalOut(NetworkCounter::ConnectionType::kEgress, msgLen);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -811,7 +811,8 @@ Future<DbResponse> SessionWorkflow::Impl::_dispatchWork() {
|
||||
return makeDbResponseErrorForRateLimiting(_work->in(), status);
|
||||
}
|
||||
|
||||
networkCounter.hitLogicalIn(NetworkCounter::ConnectionType::kIngress, _work->in().size());
|
||||
globalNetworkCounter().hitLogicalIn(NetworkCounter::ConnectionType::kIngress,
|
||||
_work->in().size());
|
||||
|
||||
// Pass sourced Message to handler to generate response.
|
||||
_work->initOperation();
|
||||
@ -871,7 +872,7 @@ void SessionWorkflow::Impl::_acceptResponse(DbResponse response) {
|
||||
// the dbresponses continue to indicate the exhaust stream should continue.
|
||||
_nextWork = work.synthesizeExhaust(response);
|
||||
|
||||
networkCounter.hitLogicalOut(NetworkCounter::ConnectionType::kIngress, toSink.size());
|
||||
globalNetworkCounter().hitLogicalOut(NetworkCounter::ConnectionType::kIngress, toSink.size());
|
||||
|
||||
beforeCompressingExhaustResponse.executeIf(
|
||||
[&](auto&&) {}, [&](auto&&) { return work.hasCompressorId() && _nextWork; });
|
||||
|
||||
@ -340,7 +340,7 @@ struct NetworkConnectionStats {
|
||||
|
||||
static NetworkConnectionStats get(NetworkCounter::ConnectionType type) {
|
||||
BSONObjBuilder bob;
|
||||
networkCounter.append(bob);
|
||||
globalNetworkCounter().append(bob);
|
||||
BSONObj metrics = bob.obj();
|
||||
if (type == NetworkCounter::ConnectionType::kEgress) {
|
||||
metrics = metrics.getField("egress").Obj().getOwned();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user