SERVER-109998 Add 'integration_tests_sharded' and 'run_unittests' tasks to UWE variant (#43207)

GitOrigin-RevId: 3f66dfefbed2111b57dd29b29f9d4299e1297456
This commit is contained in:
Mihai Andrei 2025-10-28 19:04:06 -04:00 committed by MongoDB Bot
parent 032eeefb4b
commit ddc9cd72a1
7 changed files with 322 additions and 105 deletions

View File

@ -592,6 +592,7 @@ buildvariants:
expansions:
<<: *amazon_linux2023_arm64_dynamic_expansions
has_packages: false
evergreen_remote_exec: on
target_resmoke_time: 30
max_sub_suites: 3
idle_timeout_factor: 1.5
@ -603,6 +604,14 @@ buildvariants:
--mongosSetParameters="{internalQueryUnifiedWriteExecutor: true}"
# For now just unified write executor suites but tasks will be added as development continues.
tasks:
# TODO SERVER-104147: Both 'DocumentSequenceLargeDocumentMultiInsertWorks' and
# 'DocumentSequenceMaxWriteBatchWorks' currently fails in the UWE because we do not check our
# batches for sizing, nor do we check that a batch is under the BSON size limit. Enable this
# task and verify that this passes once we have support for this.
# - name: integration_tests_sharded
- name: run_unit_tests_TG
distros:
- amazon2023-arm64-latest-large
- name: unified_write_executor
- name: unified_write_executor_sharding
- name: unified_write_executor_concurrency_sharded_multi_stmt_txn_gen

View File

@ -44,6 +44,7 @@
#include "mongo/db/session/logical_session_id.h"
#include "mongo/executor/network_test_env.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@ -87,23 +88,42 @@ TEST_F(SessionsCollectionShardedTest, RefreshOneSessionOKTest) {
// Set up routing table for the logical sessions collection.
loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString::kLogicalSessionsNamespace,
BSON("_id" << 1));
auto future = launchAsync([&] {
auto now = Date_t::now();
auto thePast = now - Minutes(5);
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
auto future = launchAsync([&] {
auto now = Date_t::now();
auto thePast = now - Minutes(5);
auto record1 = makeRecord(thePast);
_collection.refreshSessions(operationContext(), {record1});
});
auto record1 = makeRecord(thePast);
_collection.refreshSessions(operationContext(), {record1});
});
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(1);
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0,
{BulkWriteReplyItem{0, Status::OK()}},
NamespaceString::kLogicalSessionsNamespace),
0,
0,
0,
0,
0,
0);
reply.setNModified(1);
return reply.toBSON();
} else {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(1);
return response.toBSON();
});
return response.toBSON();
}
});
future.default_timed_get();
future.default_timed_get();
}
}
TEST_F(SessionsCollectionShardedTest, CheckReadConcern) {
@ -153,42 +173,85 @@ TEST_F(SessionsCollectionShardedTest, RefreshOneSessionWriteErrTest) {
// Set up routing table for the logical sessions collection.
loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString::kLogicalSessionsNamespace,
BSON("_id" << 1));
auto future = launchAsync([&] {
auto now = Date_t::now();
auto thePast = now - Minutes(5);
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
auto record1 = makeRecord(thePast);
_collection.refreshSessions(operationContext(), {record1});
});
auto future = launchAsync([&] {
auto now = Date_t::now();
auto thePast = now - Minutes(5);
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(0);
response.addToErrDetails(
write_ops::WriteError(0, {ErrorCodes::NotWritablePrimary, "not primary"}));
return response.toBSON();
});
auto record1 = makeRecord(thePast);
_collection.refreshSessions(operationContext(), {record1});
});
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::NotWritablePrimary);
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(
0,
{BulkWriteReplyItem{0, {ErrorCodes::NotWritablePrimary, "not primary"}}},
NamespaceString::kLogicalSessionsNamespace),
0,
0,
0,
0,
0,
0);
reply.setNModified(0);
return reply.toBSON();
} else {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(0);
response.addToErrDetails(
write_ops::WriteError(0, {ErrorCodes::NotWritablePrimary, "not primary"}));
return response.toBSON();
}
});
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::NotWritablePrimary);
}
}
TEST_F(SessionsCollectionShardedTest, RemoveOneSessionOKTest) {
// Set up routing table for the logical sessions collection.
loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString::kLogicalSessionsNamespace,
BSON("_id" << 1));
auto future = launchAsync(
[&] { _collection.removeRecords(operationContext(), {makeLogicalSessionIdForTest()}); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(0);
response.setNModified(1);
return response.toBSON();
});
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
future.default_timed_get();
auto future = launchAsync([&] {
_collection.removeRecords(operationContext(), {makeLogicalSessionIdForTest()});
});
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
// TODO lol
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0,
{BulkWriteReplyItem{0, Status::OK()}},
NamespaceString::kLogicalSessionsNamespace),
0,
0,
0,
0,
0,
0);
reply.setNModified(1);
return reply.toBSON();
} else {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(1);
return response.toBSON();
}
});
future.default_timed_get();
}
}
TEST_F(SessionsCollectionShardedTest, RemoveOneSessionStatusErrTest) {
@ -209,19 +272,43 @@ TEST_F(SessionsCollectionShardedTest, RemoveOneSessionWriteErrTest) {
// Set up routing table for the logical sessions collection.
loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString::kLogicalSessionsNamespace,
BSON("_id" << 1));
auto future = launchAsync(
[&] { _collection.removeRecords(operationContext(), {makeLogicalSessionIdForTest()}); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(0);
response.addToErrDetails(
write_ops::WriteError(0, {ErrorCodes::NotWritablePrimary, "not primary"}));
return response.toBSON();
});
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::NotWritablePrimary);
auto future = launchAsync([&] {
_collection.removeRecords(operationContext(), {makeLogicalSessionIdForTest()});
});
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
// TODO
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(
0,
{BulkWriteReplyItem{0, {ErrorCodes::NotWritablePrimary, "not primary"}}},
NamespaceString::kLogicalSessionsNamespace),
0,
0,
0,
0,
0,
0);
reply.setNModified(0);
return reply.toBSON();
} else {
BatchedCommandResponse response;
response.setStatus(Status::OK());
response.setNModified(0);
response.addToErrDetails(
write_ops::WriteError(0, {ErrorCodes::NotWritablePrimary, "not primary"}));
return response.toBSON();
}
});
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::NotWritablePrimary);
}
}
} // namespace

View File

@ -36,6 +36,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/sharding_environment/cluster_command_test_fixture.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/unittest/unittest.h"
#include <functional>
@ -58,30 +59,64 @@ protected:
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
if (internalQueryUnifiedWriteExecutor.load()) {
cb(request);
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, {BulkWriteReplyItem{0, Status::OK()}}, kNss),
0,
0,
0,
0,
0,
0);
reply.setNInserted(1);
BSONObjBuilder bob(reply.toBSON().addFields(BSON("ok" << 1)));
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
BSONObjBuilder bob;
bob.append("n", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
BSONObjBuilder bob;
bob.append("n", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
void expectReturnsSuccess(int shardIndex) override {
onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
BSONObjBuilder bob;
bob.append("n", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, {BulkWriteReplyItem{0, Status::OK()}}, kNss),
0,
0,
0,
0,
0,
0);
reply.setNDeleted(1);
BSONObjBuilder bob(reply.toBSON().addFields(BSON("ok" << 1)));
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
BSONObjBuilder bob;
bob.append("n", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
};
TEST_F(ClusterDeleteTest, NoErrors) {
testNoErrors(kDeleteCmdTargeted, kDeleteCmdScatterGather);
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testNoErrors(kDeleteCmdTargeted, kDeleteCmdScatterGather);
}
}
TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) {
@ -100,8 +135,12 @@ TEST_F(ClusterDeleteTest, CorrectMetrics) {
b.append("delete", 1);
b.append("getmore", 0);
b.append("command", 0);
testOpcountersAreCorrect(kDeleteCmdTargeted, /* expectedValue */ b.obj());
const BSONObj obj = b.obj();
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testOpcountersAreCorrect(kDeleteCmdTargeted, /* expectedValue */ obj);
}
}
TEST_F(ClusterDeleteTest, RejectsCmdAggregateNamespace) {

View File

@ -36,6 +36,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/sharding_environment/cluster_command_test_fixture.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/unittest/unittest.h"
#include <functional>
@ -57,30 +58,64 @@ protected:
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
if (internalQueryUnifiedWriteExecutor.load()) {
cb(request);
BulkWriteReplyItem item(0, Status::OK());
item.setN(1);
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, {std::move(item)}, kNss), 0, 0, 0, 0, 0, 0);
reply.setNInserted(1);
BSONObjBuilder bob(reply.toBSON());
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
BSONObjBuilder bob;
bob.append("nInserted", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
BSONObjBuilder bob;
bob.append("nInserted", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
void expectReturnsSuccess(int shardIndex) override {
onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
if (internalQueryUnifiedWriteExecutor.load()) {
auto cmd = request.cmdObj;
auto ops = cmd["ops"].Array();
auto size = ops.size();
std::vector<BulkWriteReplyItem> items;
for (size_t i = 0; i < size; i++) {
BulkWriteReplyItem item(i, Status::OK());
item.setN(1);
items.emplace_back(std::move(item));
}
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, std::move(items), kNss), 0, 0, 0, 0, 0, 0);
reply.setNInserted(size);
BSONObjBuilder bob(reply.toBSON());
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
BSONObjBuilder bob;
bob.append("nInserted", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
BSONObjBuilder bob;
bob.append("nInserted", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
};
TEST_F(ClusterInsertTest, NoErrors) {
testNoErrors(kInsertCmdTargeted, kInsertCmdScatterGather);
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testNoErrors(kInsertCmdTargeted, kInsertCmdScatterGather);
}
}
TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) {
@ -100,7 +135,13 @@ TEST_F(ClusterInsertTest, CorrectMetricsSingleInsert) {
b.append("getmore", 0);
b.append("command", 0);
testOpcountersAreCorrect(kInsertCmdTargeted, /* expectedValue */ b.obj());
const BSONObj obj = b.obj();
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testOpcountersAreCorrect(kInsertCmdTargeted, /* expectedValue */ obj);
}
}
TEST_F(ClusterInsertTest, CorrectMetricsBulkInsert) {
@ -114,8 +155,13 @@ TEST_F(ClusterInsertTest, CorrectMetricsBulkInsert) {
const BSONObj bulkInsertCmd{
fromjson("{insert: 'coll', documents: [{'_id': -1}, {'_id': -2}]}")};
const BSONObj obj = b.obj();
testOpcountersAreCorrect(bulkInsertCmd, /* expectedValue */ b.obj());
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testOpcountersAreCorrect(bulkInsertCmd, /* expectedValue */ obj);
}
}
TEST_F(ClusterInsertTest, RejectsCmdAggregateNamespace) {

View File

@ -36,6 +36,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/sharding_environment/cluster_command_test_fixture.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/idl/server_parameter_test_controller.h"
#include "mongo/unittest/unittest.h"
#include <functional>
@ -58,34 +59,71 @@ protected:
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
if (internalQueryUnifiedWriteExecutor.load()) {
cb(request);
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, {BulkWriteReplyItem{0, Status::OK()}}, kNss),
0,
0,
0,
0,
0,
0);
reply.setNMatched(1);
reply.setNModified(1);
BSONObjBuilder bob(reply.toBSON().addFields(BSON("ok" << 1)));
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
cb(request);
BSONObjBuilder bob;
bob.append("nMatched", 1);
bob.append("nUpserted", 0);
bob.append("nModified", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
BSONObjBuilder bob;
bob.append("nMatched", 1);
bob.append("nUpserted", 0);
bob.append("nModified", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
void expectReturnsSuccess(int shardIndex) override {
onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
if (internalQueryUnifiedWriteExecutor.load()) {
BulkWriteCommandReply reply(
BulkWriteCommandResponseCursor(0, {BulkWriteReplyItem{0, Status::OK()}}, kNss),
0,
0,
0,
0,
0,
0);
reply.setNMatched(1);
reply.setNModified(1);
BSONObjBuilder bob(reply.toBSON().addFields(BSON("ok" << 1)));
appendTxnResponseMetadata(bob);
return bob.obj();
} else {
ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
BSONObjBuilder bob;
bob.append("nMatched", 1);
bob.append("nUpserted", 0);
bob.append("nModified", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
BSONObjBuilder bob;
bob.append("nMatched", 1);
bob.append("nUpserted", 0);
bob.append("nModified", 1);
appendTxnResponseMetadata(bob);
return bob.obj();
}
});
}
};
TEST_F(ClusterUpdateTest, NoErrors) {
testNoErrors(kUpdateCmdTargeted, kUpdateCmdScatterGather);
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testNoErrors(kUpdateCmdTargeted, kUpdateCmdScatterGather);
}
}
TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
@ -105,7 +143,12 @@ TEST_F(ClusterUpdateTest, CorrectMetrics) {
b.append("getmore", 0);
b.append("command", 0);
testOpcountersAreCorrect(kUpdateCmdTargeted, /* expectedValue */ b.obj());
const BSONObj obj = b.obj();
for (auto uweKnobValue : {false, true}) {
RAIIServerParameterControllerForTest uweController("internalQueryUnifiedWriteExecutor",
uweKnobValue);
testOpcountersAreCorrect(kUpdateCmdTargeted, /* expectedValue */ obj);
}
}
TEST_F(ClusterUpdateTest, RejectsCmdAggregateNamespace) {

View File

@ -142,15 +142,9 @@ FindAndModifyCommandResponse findAndModify(OperationContext* opCtx,
executeWriteCommand(opCtx, WriteCommandRef{request}, originalCommand));
}
// TODO SERVER-106306: Convert the knob below to an IFR flag.
bool isEnabled(OperationContext* opCtx) {
auto fcvSnapshot = serverGlobalParams.featureCompatibility.acquireFCVSnapshot();
// (Generic FCV reference): isUpgradingOrDowngrading() must be false since during upgrades the
// viewless featureflag could be on but there are still viewful collections being converted and
// UWE doesn't support viewful collections.
return internalQueryUnifiedWriteExecutor.load() &&
gFeatureFlagCreateViewlessTimeseriesCollections.isEnabled(
VersionContext::getDecoration(opCtx), fcvSnapshot) &&
!fcvSnapshot.isUpgradingOrDowngrading();
return internalQueryUnifiedWriteExecutor.load();
}
} // namespace unified_write_executor

View File

@ -73,8 +73,7 @@ FindAndModifyCommandResponse findAndModify(OperationContext* opCtx,
BSONObj originalCommand = BSONObj());
/**
* Unified write executor feature flag check. Also ensures we only have viewless timeseries
* collections.
* Unified write executor query knob check.
*/
bool isEnabled(OperationContext* opCtx);