SERVER-101758 Adding a new local-only task executor for node local tasks (#40780) (#42121)

GitOrigin-RevId: 47e8eb91b9d556095429567ae598a11e8167a726
This commit is contained in:
wolfee 2025-10-07 16:25:53 +02:00 committed by MongoDB Bot
parent eb9af3a1da
commit bb2dadf241
12 changed files with 296 additions and 14 deletions

View File

@ -275,6 +275,7 @@ car_common:
- src/mongo/s/type_collection_common_types.idl
- src/mongo/db/s/operation_sharding_state*
- src/mongo/db/persistent_task_store*
- src/mongo/db/local_executor*
# Observers overlaps between shard_catalog, ddl and routing_and_topology. Putting them in this
# module because it is "generic infrastructure", but likely their contents need to be moved

View File

@ -610,6 +610,23 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "local_executor",
srcs = [
"local_executor.cpp",
],
hdrs = [
"local_executor.h",
],
deps = [
":service_context",
"//src/mongo:base",
"//src/mongo/executor:network_interface_factory",
"//src/mongo/executor:task_executor_interface",
"//src/mongo/executor:thread_pool_task_executor",
],
)
mongo_cc_library(
name = "not_primary_error_tracker",
srcs = [
@ -3624,6 +3641,7 @@ mongo_cc_library(
"//src/mongo/db/ttl:ttl_d",
"vector_clock",
"server_lifecycle_monitor",
":local_executor",
] + select({
"//bazel/config:enterprise_feature_audit_enabled": [
"//src/mongo/db/modules/enterprise/src/audit:audit_enterprise",
@ -3800,6 +3818,21 @@ mongo_cc_unit_test(
],
)
mongo_cc_unit_test(
name = "local_executor_test",
srcs = [
"local_executor_test.cpp",
],
tags = [
"mongo_unittest_second_group",
"server-programmability",
],
deps = [
":local_executor",
":service_context_test_fixture",
],
)
mongo_cc_unit_test(
name = "server_base_test",
srcs = [

View File

@ -332,3 +332,6 @@ filters:
- "find_one_bm*":
approvers:
- 10gen/performance
- "local_executor*":
approvers:
- 10gen/server-catalog-and-routing-routing-and-topology

View File

@ -1635,6 +1635,7 @@ mongo_cc_library(
"//src/mongo/db:dbhelpers",
"//src/mongo/db:fle_crud_mongod",
"//src/mongo/db:index_commands_idl",
"//src/mongo/db:local_executor",
"//src/mongo/db:multitenancy",
"//src/mongo/db:profile_collection",
"//src/mongo/db:rw_concern_d",

View File

@ -149,6 +149,7 @@ mongo_cc_library(
],
deps = [
":update_metrics",
"//src/mongo/db:local_executor",
"//src/mongo/db/auth",
"//src/mongo/db/auth:authprivilege", # TODO(SERVER-93876): Remove.
"//src/mongo/db/pipeline:lite_parsed_document_source",
@ -252,6 +253,7 @@ mongo_cc_library(
"//src/mongo/db:commands",
"//src/mongo/db:curop_metrics",
"//src/mongo/db:fle_crud_mongod",
"//src/mongo/db:local_executor",
"//src/mongo/db:not_primary_error_tracker",
"//src/mongo/db:profile_collection",
"//src/mongo/db:query_exec",

View File

@ -68,13 +68,13 @@
#include "mongo/db/feature_flag.h"
#include "mongo/db/fle_crud.h"
#include "mongo/db/initialize_operation_session_info.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/not_primary_error_tracker.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/pipeline/variables.h"
#include "mongo/db/query/client_cursor/clientcursor.h"
#include "mongo/db/query/client_cursor/cursor_manager.h"
@ -1614,10 +1614,7 @@ bool handleUpdateOp(OperationContext* opCtx,
if (isTimeseriesViewRequest && opCtx->isRetryableWrite() &&
!opCtx->inMultiDocumentTransaction()) {
write_ops_exec::WriteResult out;
auto executor = serverGlobalParams.clusterRole.has(ClusterRole::None)
? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
opCtx->getServiceContext())
: Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto executor = getLocalExecutor(opCtx);
auto updateRequest = bulk_write_common::makeUpdateCommandRequestFromUpdateOp(
opCtx, op, req, currentOpIdx);

View File

@ -48,10 +48,10 @@
#include "mongo/db/exec/mutable_bson/document.h"
#include "mongo/db/exec/mutable_bson/element.h"
#include "mongo/db/fle_crud.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/not_primary_error_tracker.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/pipeline/variables.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/explain_options.h"
@ -515,10 +515,7 @@ public:
opCtx->isRetryableWrite() && !opCtx->inMultiDocumentTransaction() &&
!isRawDataOperation(opCtx);
if (isTimeseriesRetryableUpdate) {
auto executor = serverGlobalParams.clusterRole.has(ClusterRole::None)
? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
opCtx->getServiceContext())
: Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto executor = getLocalExecutor(opCtx);
ON_BLOCK_EXIT([&] {
// Increments the counter if the command contains retries. This is normally done
// within write_ops_exec::performUpdates. But for retryable timeseries updates,

View File

@ -74,11 +74,11 @@
#include "mongo/db/database_name.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/query/client_cursor/cursor_response.h"
#include "mongo/db/query/compiler/physical_model/query_solution/query_solution.h"
#include "mongo/db/query/find_command.h"
@ -836,9 +836,7 @@ public:
void run(OperationContext* opCtx,
unique_function<Status(UMCTransactionClient&)> txnOpsCallback) final {
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
auto sleepAndCleanupExecutor = serverGlobalParams.clusterRole.has(ClusterRole::None)
? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx->getServiceContext())
: Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto sleepAndCleanupExecutor = getLocalExecutor(opCtx);
// Constructing a SyncTransactionWithRetries causes it to store the write concern from the
// supplied OperationContext and then wait for that write concern when running/committing

View File

@ -0,0 +1,93 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/local_executor.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace {
const auto localExecutor =
ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>();
}
std::shared_ptr<executor::TaskExecutor> getLocalExecutor(ServiceContext* service) {
auto executor = localExecutor(service);
invariant(executor);
return executor;
}
std::shared_ptr<executor::TaskExecutor> getLocalExecutor(OperationContext* opCtx) {
return getLocalExecutor(opCtx->getServiceContext());
}
void setLocalExecutor(ServiceContext* service, std::shared_ptr<executor::TaskExecutor> executor) {
localExecutor(service) = std::move(executor);
}
std::shared_ptr<executor::TaskExecutor> createLocalExecutor(ServiceContext* serviceContext,
const std::string& name) {
ThreadPool::Options tpOptions;
tpOptions.threadNamePrefix = name + "-";
tpOptions.poolName = name + "ThreadPool";
tpOptions.maxThreads = ThreadPool::Options::kUnlimited;
tpOptions.onCreateThread = [serviceContext](const std::string& threadName) {
Client::initThread(threadName,
serviceContext->getService(ClusterRole::ShardServer),
Client::noSession(),
ClientOperationKillableByStepdown{false});
};
class BlockerHook : public rpc::EgressMetadataHook {
public:
Status writeRequestMetadata(OperationContext*, BSONObjBuilder*) override {
tasserted(ErrorCodes::IllegalOperation,
"Remote task is prohibited to schedule on local task executor");
}
Status readReplyMetadata(OperationContext*, const BSONObj&) override {
tasserted(ErrorCodes::IllegalOperation,
"Remote task is prohibited to schedule on local task executor");
}
};
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
hookList->addHook(std::make_unique<BlockerHook>());
return executor::ThreadPoolTaskExecutor::create(
std::make_unique<ThreadPool>(tpOptions),
executor::makeNetworkInterface(name + "Network", nullptr, std::move(hookList)));
}
} // namespace mongo

View File

@ -0,0 +1,74 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include <memory>
#include <string>
namespace mongo {
/**
* NOTE:
* Local executor is a ThreadPoolTaskExecutor executor that prohibits remote execution.
* The prohibition is achieved by a hook on the network interface that asserts, so if the client
* code tries to schedule a remote task on this executor, it will fail.
* This executor is used in cases where we normally have to choose between ReplSetNodeExecutor and
* the Grid's one. In those cases usually we select based on the serverGlobalParam (the startup
* flag). But the fac that we have a cluster role doesn't mean the grid is already initialized (we
* might still wait for the shardIdentity to initialize). Since this executor is always initialized
* we can use this instead of the selection mechanism.
*/
/**
* Returns the local executor.
*/
std::shared_ptr<executor::TaskExecutor> getLocalExecutor(ServiceContext* srvCtx);
/**
* Returns the local executor.
*/
std::shared_ptr<executor::TaskExecutor> getLocalExecutor(OperationContext* opCtx);
/**
* Sets the local executor.
*/
void setLocalExecutor(ServiceContext* srvCtx, std::shared_ptr<executor::TaskExecutor> executor);
/**
* Creates a local executor.
*/
std::shared_ptr<executor::TaskExecutor> createLocalExecutor(ServiceContext* srvCtx,
const std::string& name);
} // namespace mongo

View File

@ -0,0 +1,69 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/local_executor.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
class ProcessInterfaceStandaloneTest : public ScopedGlobalServiceContextForTest,
public unittest::Test {
public:
ProcessInterfaceStandaloneTest() : ScopedGlobalServiceContextForTest(true) {}
protected:
void setUp() override {
executor = createLocalExecutor(getServiceContext(), "test");
executor->startup();
}
std::shared_ptr<executor::TaskExecutor> executor;
};
TEST_F(ProcessInterfaceStandaloneTest, StandaloneExecutorCanExecuteTasks) {
int ran = false;
ExecutorFuture<void>(executor).then([&]() { ran = true; }).get();
ASSERT_TRUE(ran);
}
DEATH_TEST_F(ProcessInterfaceStandaloneTest, StandaloneExecutorThrowsOnRemoteExecution, "") {
auto cr = executor::RemoteCommandRequest(
HostAndPort("localhost"), DatabaseName::kAdmin, BSON("isMaster" << 1), BSONObj(), nullptr);
std::ignore = executor->scheduleRemoteCommand(
cr, [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {});
}
} // namespace
} // namespace mongo

View File

@ -98,6 +98,7 @@
#include "mongo/db/keys_collection_client_direct.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/keys_collection_manager_gen.h"
#include "mongo/db/local_executor.h"
#include "mongo/db/log_process_details.h"
#include "mongo/db/logical_session_cache_factory_mongod.h"
#include "mongo/db/logical_time_validator.h"
@ -816,6 +817,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext) {
audit::initializeManager(startupOpCtx.get());
}
getLocalExecutor(serviceContext)->startup();
// This is for security on certain platforms (nonce generation)
srand((unsigned)(curTimeMicros64()) ^ (unsigned(uintptr_t(&startupOpCtx)))); // NOLINT
@ -2009,6 +2012,15 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
stopMongoDFTDC();
}
{
SectionScopedTimer scopedTimer(serviceContext->getFastClockSource(),
TimedSectionId::shutDownReplicaSetNodeExecutor,
&shutdownTimeElapsedBuilder);
LOGV2_OPTIONS(10175800, {LogComponent::kDefault}, "Shutting down the standalone executor");
getLocalExecutor(serviceContext)->shutdown();
getLocalExecutor(serviceContext)->join();
}
LOGV2(20565, "Now exiting");
audit::logShutdown(client);
@ -2107,6 +2119,8 @@ int mongod_main(int argc, char* argv[]) {
quickExit(ExitCode::auditRotateError);
}
setLocalExecutor(service, createLocalExecutor(service, "Standalone"));
setUpCatalog(service);
setUpReplication(service);
setUpObservers(service);