diff --git a/modules_poc/modules.yaml b/modules_poc/modules.yaml index 219f0e57a55..51d6bd2d8ee 100644 --- a/modules_poc/modules.yaml +++ b/modules_poc/modules.yaml @@ -275,6 +275,7 @@ car_common: - src/mongo/s/type_collection_common_types.idl - src/mongo/db/s/operation_sharding_state* - src/mongo/db/persistent_task_store* + - src/mongo/db/local_executor* # Observers overlaps between shard_catalog, ddl and routing_and_topology. Putting them in this # module because it is "generic infrastructure", but likely their contents need to be moved diff --git a/src/mongo/db/BUILD.bazel b/src/mongo/db/BUILD.bazel index da050ab19e7..8c7ef12e579 100644 --- a/src/mongo/db/BUILD.bazel +++ b/src/mongo/db/BUILD.bazel @@ -610,6 +610,23 @@ mongo_cc_library( ], ) +mongo_cc_library( + name = "local_executor", + srcs = [ + "local_executor.cpp", + ], + hdrs = [ + "local_executor.h", + ], + deps = [ + ":service_context", + "//src/mongo:base", + "//src/mongo/executor:network_interface_factory", + "//src/mongo/executor:task_executor_interface", + "//src/mongo/executor:thread_pool_task_executor", + ], +) + mongo_cc_library( name = "not_primary_error_tracker", srcs = [ @@ -3624,6 +3641,7 @@ mongo_cc_library( "//src/mongo/db/ttl:ttl_d", "vector_clock", "server_lifecycle_monitor", + ":local_executor", ] + select({ "//bazel/config:enterprise_feature_audit_enabled": [ "//src/mongo/db/modules/enterprise/src/audit:audit_enterprise", @@ -3800,6 +3818,21 @@ mongo_cc_unit_test( ], ) +mongo_cc_unit_test( + name = "local_executor_test", + srcs = [ + "local_executor_test.cpp", + ], + tags = [ + "mongo_unittest_second_group", + "server-programmability", + ], + deps = [ + ":local_executor", + ":service_context_test_fixture", + ], +) + mongo_cc_unit_test( name = "server_base_test", srcs = [ diff --git a/src/mongo/db/OWNERS.yml b/src/mongo/db/OWNERS.yml index 164cfb45ce8..d92fcfe89a4 100644 --- a/src/mongo/db/OWNERS.yml +++ b/src/mongo/db/OWNERS.yml @@ -332,3 +332,6 @@ filters: - "find_one_bm*": approvers: - 10gen/performance + - "local_executor*": + approvers: + - 10gen/server-catalog-and-routing-routing-and-topology diff --git a/src/mongo/db/commands/BUILD.bazel b/src/mongo/db/commands/BUILD.bazel index 190b7c9409d..bf781ea59b5 100644 --- a/src/mongo/db/commands/BUILD.bazel +++ b/src/mongo/db/commands/BUILD.bazel @@ -1635,6 +1635,7 @@ mongo_cc_library( "//src/mongo/db:dbhelpers", "//src/mongo/db:fle_crud_mongod", "//src/mongo/db:index_commands_idl", + "//src/mongo/db:local_executor", "//src/mongo/db:multitenancy", "//src/mongo/db:profile_collection", "//src/mongo/db:rw_concern_d", diff --git a/src/mongo/db/commands/query_cmd/BUILD.bazel b/src/mongo/db/commands/query_cmd/BUILD.bazel index 94a7c5dd74c..0e4f1a0883a 100644 --- a/src/mongo/db/commands/query_cmd/BUILD.bazel +++ b/src/mongo/db/commands/query_cmd/BUILD.bazel @@ -149,6 +149,7 @@ mongo_cc_library( ], deps = [ ":update_metrics", + "//src/mongo/db:local_executor", "//src/mongo/db/auth", "//src/mongo/db/auth:authprivilege", # TODO(SERVER-93876): Remove. "//src/mongo/db/pipeline:lite_parsed_document_source", @@ -252,6 +253,7 @@ mongo_cc_library( "//src/mongo/db:commands", "//src/mongo/db:curop_metrics", "//src/mongo/db:fle_crud_mongod", + "//src/mongo/db:local_executor", "//src/mongo/db:not_primary_error_tracker", "//src/mongo/db:profile_collection", "//src/mongo/db:query_exec", diff --git a/src/mongo/db/commands/query_cmd/bulk_write.cpp b/src/mongo/db/commands/query_cmd/bulk_write.cpp index 8ea76cb4be9..f76cb710b90 100644 --- a/src/mongo/db/commands/query_cmd/bulk_write.cpp +++ b/src/mongo/db/commands/query_cmd/bulk_write.cpp @@ -68,13 +68,13 @@ #include "mongo/db/feature_flag.h" #include "mongo/db/fle_crud.h" #include "mongo/db/initialize_operation_session_info.h" +#include "mongo/db/local_executor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/legacy_runtime_constants_gen.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/pipeline/variables.h" #include "mongo/db/query/client_cursor/clientcursor.h" #include "mongo/db/query/client_cursor/cursor_manager.h" @@ -1614,10 +1614,7 @@ bool handleUpdateOp(OperationContext* opCtx, if (isTimeseriesViewRequest && opCtx->isRetryableWrite() && !opCtx->inMultiDocumentTransaction()) { write_ops_exec::WriteResult out; - auto executor = serverGlobalParams.clusterRole.has(ClusterRole::None) - ? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( - opCtx->getServiceContext()) - : Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto executor = getLocalExecutor(opCtx); auto updateRequest = bulk_write_common::makeUpdateCommandRequestFromUpdateOp( opCtx, op, req, currentOpIdx); diff --git a/src/mongo/db/commands/query_cmd/write_commands.cpp b/src/mongo/db/commands/query_cmd/write_commands.cpp index 8484c99a525..b50101c833b 100644 --- a/src/mongo/db/commands/query_cmd/write_commands.cpp +++ b/src/mongo/db/commands/query_cmd/write_commands.cpp @@ -48,10 +48,10 @@ #include "mongo/db/exec/mutable_bson/document.h" #include "mongo/db/exec/mutable_bson/element.h" #include "mongo/db/fle_crud.h" +#include "mongo/db/local_executor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/operation_context.h" -#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/pipeline/variables.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/explain_options.h" @@ -515,10 +515,7 @@ public: opCtx->isRetryableWrite() && !opCtx->inMultiDocumentTransaction() && !isRawDataOperation(opCtx); if (isTimeseriesRetryableUpdate) { - auto executor = serverGlobalParams.clusterRole.has(ClusterRole::None) - ? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( - opCtx->getServiceContext()) - : Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto executor = getLocalExecutor(opCtx); ON_BLOCK_EXIT([&] { // Increments the counter if the command contains retries. This is normally done // within write_ops_exec::performUpdates. But for retryable timeseries updates, diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp index 7a29edecfb2..4e9e0c5f59b 100644 --- a/src/mongo/db/commands/user_management_commands.cpp +++ b/src/mongo/db/commands/user_management_commands.cpp @@ -74,11 +74,11 @@ #include "mongo/db/database_name.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbmessage.h" +#include "mongo/db/local_executor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/pipeline/aggregation_request_helper.h" -#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/query/client_cursor/cursor_response.h" #include "mongo/db/query/compiler/physical_model/query_solution/query_solution.h" #include "mongo/db/query/find_command.h" @@ -836,9 +836,7 @@ public: void run(OperationContext* opCtx, unique_function txnOpsCallback) final { auto inlineExecutor = std::make_shared(); - auto sleepAndCleanupExecutor = serverGlobalParams.clusterRole.has(ClusterRole::None) - ? ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx->getServiceContext()) - : Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto sleepAndCleanupExecutor = getLocalExecutor(opCtx); // Constructing a SyncTransactionWithRetries causes it to store the write concern from the // supplied OperationContext and then wait for that write concern when running/committing diff --git a/src/mongo/db/local_executor.cpp b/src/mongo/db/local_executor.cpp new file mode 100644 index 00000000000..2279d355410 --- /dev/null +++ b/src/mongo/db/local_executor.cpp @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/local_executor.h" + +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +namespace { + +const auto localExecutor = + ServiceContext::declareDecoration>(); + +} + +std::shared_ptr getLocalExecutor(ServiceContext* service) { + auto executor = localExecutor(service); + invariant(executor); + return executor; +} + +std::shared_ptr getLocalExecutor(OperationContext* opCtx) { + return getLocalExecutor(opCtx->getServiceContext()); +} + +void setLocalExecutor(ServiceContext* service, std::shared_ptr executor) { + localExecutor(service) = std::move(executor); +} + +std::shared_ptr createLocalExecutor(ServiceContext* serviceContext, + const std::string& name) { + ThreadPool::Options tpOptions; + tpOptions.threadNamePrefix = name + "-"; + tpOptions.poolName = name + "ThreadPool"; + tpOptions.maxThreads = ThreadPool::Options::kUnlimited; + tpOptions.onCreateThread = [serviceContext](const std::string& threadName) { + Client::initThread(threadName, + serviceContext->getService(ClusterRole::ShardServer), + Client::noSession(), + ClientOperationKillableByStepdown{false}); + }; + + class BlockerHook : public rpc::EgressMetadataHook { + public: + Status writeRequestMetadata(OperationContext*, BSONObjBuilder*) override { + tasserted(ErrorCodes::IllegalOperation, + "Remote task is prohibited to schedule on local task executor"); + } + Status readReplyMetadata(OperationContext*, const BSONObj&) override { + tasserted(ErrorCodes::IllegalOperation, + "Remote task is prohibited to schedule on local task executor"); + } + }; + + auto hookList = std::make_unique(); + hookList->addHook(std::make_unique()); + + return executor::ThreadPoolTaskExecutor::create( + std::make_unique(tpOptions), + executor::makeNetworkInterface(name + "Network", nullptr, std::move(hookList))); +} + +} // namespace mongo diff --git a/src/mongo/db/local_executor.h b/src/mongo/db/local_executor.h new file mode 100644 index 00000000000..c99f88879fb --- /dev/null +++ b/src/mongo/db/local_executor.h @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" + +#include +#include + +namespace mongo { + +/** + * NOTE: + * Local executor is a ThreadPoolTaskExecutor executor that prohibits remote execution. + * The prohibition is achieved by a hook on the network interface that asserts, so if the client + * code tries to schedule a remote task on this executor, it will fail. + * This executor is used in cases where we normally have to choose between ReplSetNodeExecutor and + * the Grid's one. In those cases usually we select based on the serverGlobalParam (the startup + * flag). But the fac that we have a cluster role doesn't mean the grid is already initialized (we + * might still wait for the shardIdentity to initialize). Since this executor is always initialized + * we can use this instead of the selection mechanism. + */ + +/** + * Returns the local executor. + */ +std::shared_ptr getLocalExecutor(ServiceContext* srvCtx); + +/** + * Returns the local executor. + */ +std::shared_ptr getLocalExecutor(OperationContext* opCtx); + +/** + * Sets the local executor. + */ +void setLocalExecutor(ServiceContext* srvCtx, std::shared_ptr executor); + +/** + * Creates a local executor. + */ +std::shared_ptr createLocalExecutor(ServiceContext* srvCtx, + const std::string& name); + +} // namespace mongo diff --git a/src/mongo/db/local_executor_test.cpp b/src/mongo/db/local_executor_test.cpp new file mode 100644 index 00000000000..5c42213d568 --- /dev/null +++ b/src/mongo/db/local_executor_test.cpp @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/local_executor.h" + +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class ProcessInterfaceStandaloneTest : public ScopedGlobalServiceContextForTest, + public unittest::Test { +public: + ProcessInterfaceStandaloneTest() : ScopedGlobalServiceContextForTest(true) {} + +protected: + void setUp() override { + executor = createLocalExecutor(getServiceContext(), "test"); + executor->startup(); + } + + std::shared_ptr executor; +}; + +TEST_F(ProcessInterfaceStandaloneTest, StandaloneExecutorCanExecuteTasks) { + int ran = false; + ExecutorFuture(executor).then([&]() { ran = true; }).get(); + + ASSERT_TRUE(ran); +} + +DEATH_TEST_F(ProcessInterfaceStandaloneTest, StandaloneExecutorThrowsOnRemoteExecution, "") { + auto cr = executor::RemoteCommandRequest( + HostAndPort("localhost"), DatabaseName::kAdmin, BSON("isMaster" << 1), BSONObj(), nullptr); + + std::ignore = executor->scheduleRemoteCommand( + cr, [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {}); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index f08f519bb22..3d7da101a8d 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -98,6 +98,7 @@ #include "mongo/db/keys_collection_client_direct.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/keys_collection_manager_gen.h" +#include "mongo/db/local_executor.h" #include "mongo/db/log_process_details.h" #include "mongo/db/logical_session_cache_factory_mongod.h" #include "mongo/db/logical_time_validator.h" @@ -816,6 +817,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext) { audit::initializeManager(startupOpCtx.get()); } + getLocalExecutor(serviceContext)->startup(); + // This is for security on certain platforms (nonce generation) srand((unsigned)(curTimeMicros64()) ^ (unsigned(uintptr_t(&startupOpCtx)))); // NOLINT @@ -2009,6 +2012,15 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { stopMongoDFTDC(); } + { + SectionScopedTimer scopedTimer(serviceContext->getFastClockSource(), + TimedSectionId::shutDownReplicaSetNodeExecutor, + &shutdownTimeElapsedBuilder); + LOGV2_OPTIONS(10175800, {LogComponent::kDefault}, "Shutting down the standalone executor"); + getLocalExecutor(serviceContext)->shutdown(); + getLocalExecutor(serviceContext)->join(); + } + LOGV2(20565, "Now exiting"); audit::logShutdown(client); @@ -2107,6 +2119,8 @@ int mongod_main(int argc, char* argv[]) { quickExit(ExitCode::auditRotateError); } + setLocalExecutor(service, createLocalExecutor(service, "Standalone")); + setUpCatalog(service); setUpReplication(service); setUpObservers(service);