From 87393ce9bcfe06f8aa93b856474fb77bfb3a5267 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Wed, 13 Apr 2022 20:14:37 +0000 Subject: [PATCH] SERVER-62479 Reap sessions for the same retryable write atomically --- ....js => internal_sessions_reaping_basic.js} | 54 +- ...ernal_sessions_reaping_retryable_writes.js | 923 ++++++++++++++++++ src/mongo/db/commands/write_commands.cpp | 9 + src/mongo/db/service_entry_point_common.cpp | 2 + src/mongo/db/session.h | 41 +- src/mongo/db/session_catalog.cpp | 353 ++++--- src/mongo/db/session_catalog.h | 183 ++-- src/mongo/db/session_catalog_mongod.cpp | 238 +++-- src/mongo/db/session_catalog_test.cpp | 378 +++++-- src/mongo/db/transaction_participant.cpp | 6 +- src/mongo/db/transaction_participant.h | 14 +- src/mongo/s/session_catalog_router.cpp | 54 +- src/mongo/s/transaction_router_test.cpp | 16 +- 13 files changed, 1793 insertions(+), 478 deletions(-) rename jstests/replsets/{internal_sessions_reaping.js => internal_sessions_reaping_basic.js} (76%) create mode 100644 jstests/replsets/internal_sessions_reaping_retryable_writes.js diff --git a/jstests/replsets/internal_sessions_reaping.js b/jstests/replsets/internal_sessions_reaping_basic.js similarity index 76% rename from jstests/replsets/internal_sessions_reaping.js rename to jstests/replsets/internal_sessions_reaping_basic.js index 09afcd9f67d..3069284020f 100644 --- a/jstests/replsets/internal_sessions_reaping.js +++ b/jstests/replsets/internal_sessions_reaping_basic.js @@ -28,19 +28,18 @@ rst.initiate(); const primary = rst.getPrimary(); -const kDbName = "testDb"; -const kCollName = "testColl"; - const kConfigSessionsNs = "config.system.sessions"; const kConfigTxnsNs = "config.transactions"; const kImageCollNs = "config.image_collection"; const kOplogCollNs = "local.oplog.rs"; +const sessionsColl = primary.getCollection(kConfigSessionsNs); +const transactionsColl = primary.getCollection(kConfigTxnsNs); +const imageColl = primary.getCollection(kImageCollNs); +const oplogColl = primary.getCollection(kOplogCollNs); -let sessionsCollOnPrimary = primary.getCollection(kConfigSessionsNs); -let transactionsCollOnPrimary = primary.getCollection(kConfigTxnsNs); -let imageCollOnPrimary = primary.getCollection(kImageCollNs); -let oplogCollOnPrimary = primary.getCollection(kOplogCollNs); -let testDB = primary.getDB(kDbName); +const kDbName = "testDb"; +const kCollName = "testColl"; +const testDB = primary.getDB(kDbName); assert.commandWorked(testDB.createCollection(kCollName)); assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); @@ -73,11 +72,11 @@ assert.commandWorked(testDB.runCommand({ assert.commandWorked(testDB.adminCommand( {commitTransaction: 1, lsid: childLsid0, txnNumber: kInternalTxnNumber, autocommit: false})); numTransactionsCollEntries++; -assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); +assert.eq(numTransactionsCollEntries, transactionsColl.find().itcount()); jsTest.log("Verify that the config.transactions entry for the internal transaction for " + "the non-retryable update did not get reaped after command returned"); -assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); +assert.eq(numTransactionsCollEntries, transactionsColl.find().itcount()); const parentTxnNumber1 = NumberLong(1); @@ -122,7 +121,7 @@ numImageCollEntries++; jsTest.log("Verify that the config.transactions entry for the retryable internal transaction for " + "the update did not get reaped although there is already a new retryable write"); -assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); +assert.eq(numTransactionsCollEntries, transactionsColl.find().itcount()); const childLsid2 = { id: sessionUUID, @@ -156,8 +155,8 @@ assert.commandWorked(testDB.runCommand({ jsTest.log("Verify that the config.transactions entry for the retryable internal transaction for " + "the findAndModify did not get reaped although there is already a new retryable write"); -assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); -assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount()); +assert.eq(numTransactionsCollEntries, transactionsColl.find().itcount()); +assert.eq(numImageCollEntries, imageColl.find().itcount()); assert.eq({_id: 0, a: 0, b: 0, c: 0, d: 0, e: 0}, testDB.getCollection(kCollName).findOne({_id: 0})); @@ -165,9 +164,9 @@ assert.eq({_id: 1}, testDB.getCollection(kCollName).findOne({_id: 1})); assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); -assert.eq(1, sessionsCollOnPrimary.find({"_id.id": sessionUUID}).itcount()); -assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); -assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount()); +assert.eq(1, sessionsColl.find({"_id.id": sessionUUID}).itcount()); +assert.eq(numTransactionsCollEntries, transactionsColl.find().itcount()); +assert.eq(numImageCollEntries, imageColl.find().itcount()); assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); @@ -175,29 +174,26 @@ jsTest.log("Verify that the config.transactions entries for internal transaction "reaped although they are expired since the config.system.sessions entry for the " + "parent session still has not been deleted"); -assert.eq(1, sessionsCollOnPrimary.find({"_id.id": sessionUUID}).itcount()); +assert.eq(1, sessionsColl.find({"_id.id": sessionUUID}).itcount()); assert.eq(numTransactionsCollEntries, - transactionsCollOnPrimary.find().itcount(), - tojson(transactionsCollOnPrimary.find().toArray())); -assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount()); + transactionsColl.find().itcount(), + tojson(transactionsColl.find().toArray())); +assert.eq(numImageCollEntries, imageColl.find().itcount()); // Remove the session doc so the parent session gets reaped when reapLogicalSessionCacheNow is run. -assert.commandWorked(sessionsCollOnPrimary.remove({})); +assert.commandWorked(sessionsColl.remove({})); assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); jsTest.log("Verify that the config.transactions entries got reaped since the " + "config.system.sessions entry for the parent session had already been deleted"); -assert.eq(0, sessionsCollOnPrimary.find().itcount()); -assert.eq(0, - transactionsCollOnPrimary.find().itcount(), - tojson(transactionsCollOnPrimary.find().toArray())); -assert.eq(0, imageCollOnPrimary.find().itcount()); +assert.eq(0, sessionsColl.find().itcount()); +assert.eq(0, transactionsColl.find().itcount(), tojson(transactionsColl.find().toArray())); +assert.eq(0, imageColl.find().itcount()); // Validate that writes to config.transactions do not generate oplog entries, with the exception of // deletions. -assert.eq(numTransactionsCollEntries, - oplogCollOnPrimary.find({op: 'd', ns: kConfigTxnsNs}).itcount()); -assert.eq(0, oplogCollOnPrimary.find({op: {'$ne': 'd'}, ns: kConfigTxnsNs}).itcount()); +assert.eq(numTransactionsCollEntries, oplogColl.find({op: 'd', ns: kConfigTxnsNs}).itcount()); +assert.eq(0, oplogColl.find({op: {'$ne': 'd'}, ns: kConfigTxnsNs}).itcount()); rst.stopSet(); })(); diff --git a/jstests/replsets/internal_sessions_reaping_retryable_writes.js b/jstests/replsets/internal_sessions_reaping_retryable_writes.js new file mode 100644 index 00000000000..1e972ee72df --- /dev/null +++ b/jstests/replsets/internal_sessions_reaping_retryable_writes.js @@ -0,0 +1,923 @@ +/* + * Test that the logical cache reaper reaps Session/TransactionParticipant objects and the + * config.transactions and config.image_collection entries that correspond to the same retryable + * write atomically. + * + * @tags: [requires_fcv_53, featureFlagInternalTransactions] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/sharding/libs/sharded_transactions_helpers.js"); + +// This test runs the reapLogicalSessionCacheNow command. That can lead to direct writes to the +// config.transactions collection, which cannot be performed on a session. +TestData.disableImplicitSessions = true; + +const rst = new ReplSetTest({ + nodes: 2, + nodeOptions: { + setParameter: { + TransactionRecordMinimumLifetimeMinutes: 0, + } + } +}); +rst.startSet(); +rst.initiate(); + +const primary = rst.getPrimary(); + +const kConfigSessionsNs = "config.system.sessions"; +const kConfigTxnsNs = "config.transactions"; +const kConfigImageNs = "config.image_collection"; +const sessionsColl = primary.getCollection(kConfigSessionsNs); +const transactionsColl = primary.getCollection(kConfigTxnsNs); +const imageColl = primary.getCollection(kConfigImageNs); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const kNs = kDbName + "." + kCollName; +const testDB = primary.getDB(kDbName); +const testColl = testDB.getCollection(kCollName); + +assert.commandWorked(testDB.createCollection(kCollName)); + +function makeSessionOptsForTest() { + const sessionUUID = UUID(); + const parentLsid = {id: sessionUUID}; + const parentTxnNumber = NumberLong(35); + const childLsidForRetryableWrite = { + id: sessionUUID, + txnNumber: parentTxnNumber, + txnUUID: UUID() + }; + const childLsidForPrevRetryableWrite = { + id: sessionUUID, + txnNumber: NumberLong(parentTxnNumber.valueOf() - 1), + txnUUID: UUID() + }; + const childLsidForNonRetryableWrite = {id: sessionUUID, txnUUID: UUID()}; + const childTxnNumber = NumberLong(0); + return { + sessionUUID, + parentLsid, + parentTxnNumber, + childLsidForRetryableWrite, + childLsidForPrevRetryableWrite, + childLsidForNonRetryableWrite, + childTxnNumber, + }; +} + +function assertNumSessionsCollEntries(sessionOpts, expectedNum) { + const filter = {"_id.id": sessionOpts.parentLsid.id}; + assert.eq(expectedNum, + sessionsColl.find(filter).itcount(), + tojson(sessionsColl.find(filter).toArray())); +} + +function assertNumTransactionsCollEntries(sessionOpts, expectedNum) { + const filter = {"_id.id": sessionOpts.parentLsid.id}; + assert.eq(expectedNum, + transactionsColl.find(filter).itcount(), + tojson(transactionsColl.find(filter).toArray())); +} + +function assertNumImagesCollEntries(sessionOpts, expectedNum) { + const filter = {"_id.id": sessionOpts.parentLsid.id}; + assert.eq( + expectedNum, imageColl.find(filter).itcount(), tojson(imageColl.find(filter).toArray())); +} + +function assertNumEntries( + sessionOpts, {numSessionsCollEntries, numTransactionsCollEntries, numImageCollEntries}) { + assertNumSessionsCollEntries(sessionOpts, numSessionsCollEntries); + assertNumTransactionsCollEntries(sessionOpts, numTransactionsCollEntries); + assertNumImagesCollEntries(sessionOpts, numImageCollEntries); +} + +// Test reaping when neither the external session nor the internal sessions are checked out. + +{ + jsTest.log("Test reaping when there is an in-progress retryable-write internal transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.childLsidForRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(1), + })); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in the external session do not get + // reaped since there is an in-progress internal transaction for that retryable write. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + // Retry the write statement executed in the external session. + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + lsid: sessionOpts.childLsidForRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForRetryableWrite, sessionOpts.childTxnNumber))); + + // Verify that the retried write statement did not re-execute. + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log("Test reaping when there is an in-progress and a committed retryable-write " + + "internal transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.childLsidForRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(1), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 2, numImageCollEntries: 1}); + + const runInternalTxn = + (primaryHost, parentLsidUUIDString, parentTxnNumber, dbName, collName) => { + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const primary = new Mongo(primaryHost); + const testDB = primary.getDB(dbName); + + const childLsid = { + id: UUID(parentLsidUUIDString), + txnNumber: NumberLong(parentTxnNumber), + txnUUID: UUID() + }; + const childTxnNumber = NumberLong(0); + + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{x: 2}], + lsid: childLsid, + txnNumber: childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(2), + })); + + // Retry the write statement executed in the external session. + assert.commandWorked(testDB.runCommand({ + findAndModify: collName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: childLsid, + txnNumber: childTxnNumber, + autocommit: false, + stmtId: NumberInt(0), + })); + + // Retry the write statement executed in the committed internal transaction. + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{x: 1}], + lsid: childLsid, + txnNumber: childTxnNumber, + autocommit: false, + stmtId: NumberInt(1), + })); + + assert.commandWorked( + primary.adminCommand(makeCommitTransactionCmdObj(childLsid, childTxnNumber))); + }; + + // Start another internal transaction in a separate thread, and make it hang right after it + // finishes executing the first statement. + const fp = configureFailPoint(primary, "waitAfterCommandFinishesExecution", {ns: kNs}); + const internalTxnThread = new Thread(runInternalTxn, + primary.host, + extractUUIDFromObject(sessionOpts.sessionUUID), + sessionOpts.parentTxnNumber.valueOf(), + kDbName, + kCollName); + internalTxnThread.start(); + fp.wait(); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in the external session and the + // config.transactions for the committed internal transaction for that retryable write do not + // get reaped since there is an in-progress internal transaction for the same retryable write. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 2, numImageCollEntries: 1}); + + fp.off(); + internalTxnThread.join(); + + // Verify that the retried write statements did not re-execute. + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + assert.eq(testColl.find({x: 2}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log( + "Test reaping when there is an in-progress internal transaction for the current retryable" + + " write and a committed internal transaction for a previous retryable write"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.childLsidForPrevRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForPrevRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(1), + })); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + lsid: sessionOpts.childLsidForRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(2), + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 2, numImageCollEntries: 1}); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the previous write do get reaped. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 1, numImageCollEntries: 0}); + + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForRetryableWrite, sessionOpts.childTxnNumber))); + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(1), + })); + + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + assert.eq(testColl.find({x: 2}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log("Test reaping there is an in-progress transaction in the external session and a " + + "committed internal transaction for a previous retryable write"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.childLsidForPrevRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForPrevRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + startTransaction: true, + autocommit: false, + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the previous write do get reaped. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(primary.adminCommand( + makeCommitTransactionCmdObj(sessionOpts.parentLsid, sessionOpts.parentTxnNumber))); + + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log( + "Test reaping when there is an in-progress non retryable-write internal transaction " + + "and a committed retryable-write internal transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.childLsidForPrevRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForPrevRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.childLsidForNonRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(1), + })); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in external session do not get reaped + // since there has not been a retryble write or transaction with a higher txnNumber in the + // logical session. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForNonRetryableWrite, sessionOpts.childTxnNumber))); + + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log("Test reaping when there is an in-progress transaction in the external session " + + "and a committed non retryable-write internal transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.childLsidForNonRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForNonRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + startTransaction: true, + autocommit: false, + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 0}); + + // Force the logical session cache to reap, and verify that the config.transactions entry for + // the committed non retryable-write internal transaction does get reaped since it is unrelated + // to the in-progress transaction in the external session. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(primary.adminCommand( + makeCommitTransactionCmdObj(sessionOpts.parentLsid, sessionOpts.parentTxnNumber))); + + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +// Test reaping when there is a checked out internal session. + +{ + jsTest.log("Test reaping when there is a checked out retryable-write internal session with " + + "an in-progress transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + const runInternalTxn = + (primaryHost, parentLsidUUIDString, parentTxnNumber, dbName, collName) => { + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const primary = new Mongo(primaryHost); + const testDB = primary.getDB(dbName); + + const childLsid = { + id: UUID(parentLsidUUIDString), + txnNumber: NumberLong(parentTxnNumber), + txnUUID: UUID() + }; + const childTxnNumber = NumberLong(0); + + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{x: 1}], + lsid: childLsid, + txnNumber: childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(1), + })); + + // Retry the write statement executed in the external session. + assert.commandWorked(testDB.runCommand({ + findAndModify: collName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: childLsid, + txnNumber: childTxnNumber, + autocommit: false, + stmtId: NumberInt(0), + })); + + assert.commandWorked( + primary.adminCommand(makeCommitTransactionCmdObj(childLsid, childTxnNumber))); + }; + + const fp = configureFailPoint(primary, "hangAfterSessionCheckOut", {}, {skip: 1}); + const internalTxnThread = new Thread(runInternalTxn, + primary.host, + extractUUIDFromObject(sessionOpts.sessionUUID), + sessionOpts.parentTxnNumber.valueOf(), + kDbName, + kCollName); + internalTxnThread.start(); + fp.wait(); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in the external session do not get + // reaped. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + fp.off(); + internalTxnThread.join(); + + // Verify that the retried write statement did not re-execute. + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log("Test reaping when there is a checked out retryable-write internal session " + + "without an in-progress transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + const runInternalTxn = + (primaryHost, parentLsidUUIDString, parentTxnNumber, dbName, collName) => { + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const primary = new Mongo(primaryHost); + const testDB = primary.getDB(dbName); + + const childLsid = { + id: UUID(parentLsidUUIDString), + txnNumber: NumberLong(parentTxnNumber), + txnUUID: UUID() + }; + const childTxnNumber = NumberLong(0); + + assert.commandWorked(testDB.runCommand({ + findAndModify: collName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: childLsid, + txnNumber: childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked( + primary.adminCommand(makeCommitTransactionCmdObj(childLsid, childTxnNumber))); + }; + + const fp = configureFailPoint(primary, "hangAfterSessionCheckOut"); + const internalTxnThread = new Thread(runInternalTxn, + primary.host, + extractUUIDFromObject(sessionOpts.sessionUUID), + sessionOpts.parentTxnNumber.valueOf(), + kDbName, + kCollName); + internalTxnThread.start(); + fp.wait(); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in the external session do not get + // reaped. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + fp.off(); + internalTxnThread.join(); + + // Verify that the retried write statement did not re-execute. + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +{ + jsTest.log("Test reaping when there are a checked out retryable-write internal session with " + + "an in-progress transaction and an unchecked out retryable-write internal " + + "session for the same retryable write with a committed transaction"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: sessionOpts.childLsidForRetryableWrite, + txnNumber: sessionOpts.childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(1), + })); + assert.commandWorked(primary.adminCommand(makeCommitTransactionCmdObj( + sessionOpts.childLsidForRetryableWrite, sessionOpts.childTxnNumber))); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 2, numImageCollEntries: 1}); + + const runInternalTxn = + (primaryHost, parentLsidUUIDString, parentTxnNumber, dbName, collName) => { + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const primary = new Mongo(primaryHost); + const testDB = primary.getDB(dbName); + + const childLsid = { + id: UUID(parentLsidUUIDString), + txnNumber: NumberLong(parentTxnNumber), + txnUUID: UUID() + }; + const childTxnNumber = NumberLong(0); + + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{x: 2}], + lsid: childLsid, + txnNumber: childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(2), + })); + + // Retry the write statement executed in the external session. + assert.commandWorked(testDB.runCommand({ + findAndModify: collName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: childLsid, + txnNumber: childTxnNumber, + autocommit: false, + stmtId: NumberInt(0), + })); + + // Retry the write statement executed in the committed internal transaction. + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{x: 1}], + lsid: childLsid, + txnNumber: childTxnNumber, + autocommit: false, + stmtId: NumberInt(1), + })); + + assert.commandWorked( + primary.adminCommand(makeCommitTransactionCmdObj(childLsid, childTxnNumber))); + }; + + // Start another internal transaction in a separate thread, and make it hang right after it + // finishes executing the first statement. + const fp = configureFailPoint(primary, "hangInsertBeforeWrite", {ns: kNs}); + const internalTxnThread = new Thread(runInternalTxn, + primary.host, + extractUUIDFromObject(sessionOpts.sessionUUID), + sessionOpts.parentTxnNumber.valueOf(), + kDbName, + kCollName); + internalTxnThread.start(); + fp.wait(); + + // Force the logical session cache to reap, and verify that the config.transactions and + // config.image_collection entry for the retryable write in the external session and for the + // committed internal transaction for that retryable write do not get reaped since there is an + // in-progress internal transaction for the same retryable write. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 2, numImageCollEntries: 1}); + + fp.off(); + internalTxnThread.join(); + + // Verify that the retried write statements did not re-execute. + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + assert.eq(testColl.find({x: 1}).itcount(), 1); + assert.eq(testColl.find({x: 2}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +// Test reaping when an internal session is about to be checked out. + +{ + jsTest.log("Test reaping when a retryable-write internal session is about to be checked out"); + const sessionOpts = makeSessionOptsForTest(); + + assert.commandWorked(testColl.insert([{x: 0, y: 0}])); + assert.commandWorked(testDB.runCommand({ + findAndModify: kCollName, + query: {x: 0}, + update: {$inc: {y: 1}}, + new: true, + lsid: sessionOpts.parentLsid, + txnNumber: sessionOpts.parentTxnNumber, + stmtId: NumberInt(0), + })); + + assert.commandWorked(primary.adminCommand({refreshLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 1, numTransactionsCollEntries: 1, numImageCollEntries: 1}); + + const runInternalTxn = + (primaryHost, parentLsidUUIDString, parentTxnNumber, dbName, collName) => { + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const primary = new Mongo(primaryHost); + const testDB = primary.getDB(dbName); + + const childLsid = { + id: UUID(parentLsidUUIDString), + txnNumber: NumberLong(parentTxnNumber), + txnUUID: UUID() + }; + const childTxnNumber = NumberLong(0); + + // Retry the statement executed in the external session. + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{y: 0}], + lsid: childLsid, + txnNumber: childTxnNumber, + startTransaction: true, + autocommit: false, + stmtId: NumberInt(0), + })); + assert.commandWorked( + testDB.adminCommand(makeCommitTransactionCmdObj(childLsid, childTxnNumber))); + }; + + const fp = configureFailPoint(primary, "hangBeforeSessionCheckOut"); + const internalTxnThread = new Thread(runInternalTxn, + primary.host, + extractUUIDFromObject(sessionOpts.sessionUUID), + sessionOpts.parentTxnNumber.valueOf(), + kDbName, + kCollName); + internalTxnThread.start(); + fp.wait(); + + // Force the logical session cache to reap, and verify that the config.transactions entry and + // config.image_collection entry for the retryable write in the external session do get reaped. + assert.commandWorked(sessionsColl.remove({"_id.id": sessionOpts.sessionUUID})); + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + // Verify that the internal transaction did not get interrupted but that the retried write + // statement re-execute, i.e. retryablity is violated because the retry occurs after the session + // got reaped. + fp.off(); + internalTxnThread.join(); + + assert.eq(testColl.find({x: 0, y: 1}).itcount(), 1); + + assert.commandWorked(primary.adminCommand({reapLogicalSessionCacheNow: 1})); + assertNumEntries( + sessionOpts, + {numSessionsCollEntries: 0, numTransactionsCollEntries: 0, numImageCollEntries: 0}); + + assert.commandWorked(testColl.remove({})); +} + +rst.stopSet(); +})(); diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index c9c46d7807a..9e6d189b4a3 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -87,6 +87,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision); +MONGO_FAIL_POINT_DEFINE(hangInsertBeforeWrite); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeWrite); MONGO_FAIL_POINT_DEFINE(failUnorderedTimeseriesInsert); @@ -546,6 +547,14 @@ public: throw; } } + + if (hangInsertBeforeWrite.shouldFail([&](const BSONObj& data) { + const auto ns = data.getStringField("ns"); + return ns == request().getNamespace().toString(); + })) { + hangInsertBeforeWrite.pauseWhileSet(); + } + auto reply = write_ops_exec::performInserts(opCtx, request()); write_ops::InsertCommandReply insertReply; diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 1e8c6e9cbd6..9093d80ddc4 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -122,6 +122,7 @@ MONGO_FAIL_POINT_DEFINE(waitAfterNewStatementBlocksBehindOpenInternalTransaction MONGO_FAIL_POINT_DEFINE(waitAfterCommandFinishesExecution); MONGO_FAIL_POINT_DEFINE(failWithErrorCodeInRunCommand); MONGO_FAIL_POINT_DEFINE(hangBeforeSessionCheckOut); +MONGO_FAIL_POINT_DEFINE(hangAfterSessionCheckOut); MONGO_FAIL_POINT_DEFINE(hangBeforeSettingTxnInterruptFlag); MONGO_FAIL_POINT_DEFINE(hangAfterCheckingWritabilityForMultiDocumentTransactions); @@ -861,6 +862,7 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() { hangBeforeSessionCheckOut.pauseWhileSet(); _sessionTxnState = std::make_unique(opCtx); _txnParticipant.emplace(TransactionParticipant::get(opCtx)); + hangAfterSessionCheckOut.pauseWhileSet(); // Used for waiting for an in-progress transaction to transition out of the conflicting state. auto waitForInProgressTxn = [](OperationContext* opCtx, auto& stateTransitionFuture) { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 079184747ad..256f7ff28b1 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -37,8 +37,8 @@ namespace mongo { /** - * A decorable container for state associated with an active session running on a MongoD or MongoS - * server. Refer to SessionCatalog for more information on the semantics of sessions. + * A decorable container for state associated with an active transaction session running on a MongoD + * or MongoS server. Refer to SessionCatalog for more information on the semantics of sessions. */ class Session : public Decorable { Session(const Session&) = delete; @@ -50,9 +50,10 @@ class Session : public Decorable { public: explicit Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} - /** - * The logical session id that this object represents. - */ + ~Session() { + invariant(!_numWaitingToCheckOut); + } + const LogicalSessionId& getSessionId() const { return _sessionId; } @@ -61,40 +62,18 @@ public: return _parentSession; } - OperationContext* currentOperation_forTest() const { - return _checkoutOpCtx; - } - private: - // The id of the session with which this object is associated + // The session id of the transaction session that this object represents. const LogicalSessionId _sessionId; // A pointer to the parent Session for this Session if there is one. Set at construction for // child sessions. Children and parents are reaped atomically, so this pointer should always be // valid if it is not null. - // - // TODO SERVER-62479: Verify the implementation of this ticket matches the assumption above. Session* _parentSession{nullptr}; - // These fields are only safe to read or write while holding the SessionCatalog::_mutex. In - // practice, it is only used inside of the SessionCatalog itself. - - // A pointer back to the currently running operation on this Session, or nullptr if there - // is no operation currently running for the Session. - OperationContext* _checkoutOpCtx{nullptr}; - - // A pointer to the operation currently running on one of the child Sessions of this Session, - // or nullptr if this is Session does not have any child Session or if there is no operation - // currently running on any of its child Sessions. Used to block this Session and other child - // Sessions from being checked out if there is already a checked-out child Session. - OperationContext* _childSessionCheckoutOpCtx{nullptr}; - - // Keeps the last time this session was checked-out - Date_t _lastCheckout{Date_t::now()}; - - // Counter indicating the number of times ObservableSession::kill has been called on this - // session, which have not yet had a corresponding call to checkOutSessionForKill. - int _killsRequested{0}; + // Counts how many threads are blocked waiting for this Session to become available. Used to + // block reaping of this Session from the SessionCatalog. + int _numWaitingToCheckOut{0}; }; } // namespace mongo diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index f9f0648002d..eaf13b6a4fa 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -53,10 +53,10 @@ const auto operationSessionDecoration = SessionCatalog::~SessionCatalog() { stdx::lock_guard lg(_mutex); - for (const auto& entry : _sessions) { - ObservableSession session(lg, entry.second->session); - invariant(!session.hasCurrentOperation()); - invariant(!session._killed()); + for (const auto& [_, sri] : _sessions) { + ObservableSession osession(lg, sri.get(), &sri->parentSession); + invariant(!osession.hasCurrentOperation()); + invariant(!osession._killed()); } } @@ -74,95 +74,39 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) { return &sessionTransactionTable; } -SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithParentSession( +SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner( OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional killToken) { if (killToken) { invariant(killToken->lsidToKill == lsid); - invariant(killToken->parentLsidToKill); - invariant(*killToken->parentLsidToKill == *getParentSessionId(lsid)); } else { invariant(opCtx->getLogicalSessionId() == lsid); } stdx::unique_lock ul(_mutex); - auto parentSri = _getOrCreateSessionRuntimeInfo(ul, *getParentSessionId(lsid), nullptr); - auto childSri = _getOrCreateSessionRuntimeInfo(ul, lsid, parentSri); + auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid); + auto session = sri->getSession(lsid); + invariant(session); if (killToken) { - invariant(ObservableSession(ul, childSri->session)._killed()); - invariant(ObservableSession(ul, parentSri->session)._killed()); - } - - // Wait until the session is no longer checked out and until the previously scheduled kill has - // completed - ++parentSri->numWaitingToCheckOut; - ++childSri->numWaitingToCheckOut; - ON_BLOCK_EXIT([&] { - --parentSri->numWaitingToCheckOut; - --childSri->numWaitingToCheckOut; - }); - - // Wait on the parent session's condition variable since if the parent session is checked out - // prior to this, the child session's condition variable will not be notified when the parent - // session becomes available; on the other hand, if the child session is checked out prior to - // this, both parent session's and child session's condition variables will be notified when the - // child session and parent session become available. - opCtx->waitForConditionOrInterrupt( - parentSri->availableCondVar, - ul, - [&ul, &opCtx, &parentSri, &childSri, forKill = killToken.has_value()]() { - ObservableSession oParentSession(ul, parentSri->session); - ObservableSession oChildSession(ul, childSri->session); - auto isParentSessionAvailable = oParentSession._isAvailableForCheckOut(forKill); - auto isChildSessionAvailable = oChildSession._isAvailableForCheckOut(forKill); - if (isParentSessionAvailable) { - invariant(isChildSessionAvailable || oChildSession._killed()); - } - return isParentSessionAvailable && isChildSessionAvailable; - }); - - parentSri->session._childSessionCheckoutOpCtx = opCtx; - parentSri->session._lastCheckout = Date_t::now(); - - childSri->session._checkoutOpCtx = opCtx; - childSri->session._lastCheckout = Date_t::now(); - - return ScopedCheckedOutSession( - *this, std::move(childSri), std::move(parentSri), std::move(killToken)); -} - -SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithoutParentSession( - OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional killToken) { - if (killToken) { - invariant(killToken->lsidToKill == lsid); - invariant(!killToken->parentLsidToKill); - } else { - invariant(opCtx->getLogicalSessionId() == lsid); - } - - stdx::unique_lock ul(_mutex); - - auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid, nullptr); - if (killToken) { - invariant(ObservableSession(ul, sri->session)._killed()); + invariant(ObservableSession(ul, sri, session)._killed()); } // Wait until the session is no longer checked out and until the previously scheduled kill has // completed. - ++sri->numWaitingToCheckOut; - ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); + ++session->_numWaitingToCheckOut; + ON_BLOCK_EXIT([&] { --session->_numWaitingToCheckOut; }); opCtx->waitForConditionOrInterrupt( - sri->availableCondVar, ul, [&ul, &sri, forKill = killToken.has_value()]() { - ObservableSession osession(ul, sri->session); + sri->availableCondVar, ul, [&ul, &sri, &session, forKill = killToken.has_value()]() { + ObservableSession osession(ul, sri, session); return osession._isAvailableForCheckOut(forKill); }); - sri->session._checkoutOpCtx = opCtx; - sri->session._lastCheckout = Date_t::now(); + sri->checkoutOpCtx = opCtx; + sri->lastCheckout = Date_t::now(); - return ScopedCheckedOutSession(*this, std::move(sri), nullptr, std::move(killToken)); + return ScopedCheckedOutSession(*this, std::move(sri), session, std::move(killToken)); } SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) { @@ -174,10 +118,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(Operati invariant(!opCtx->lockState()->isLocked()); auto lsid = *opCtx->getLogicalSessionId(); - if (getParentSessionId(lsid)) { - return _checkOutSessionWithParentSession(opCtx, lsid, boost::none /* killToken */); - } - return _checkOutSessionWithoutParentSession(opCtx, lsid, boost::none /* killToken */); + return _checkOutSessionInner(opCtx, lsid, boost::none /* killToken */); } SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, @@ -188,58 +129,101 @@ SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationCo invariant(!opCtx->getTxnNumber()); auto lsid = killToken.lsidToKill; - if (getParentSessionId(lsid)) { - return SessionToKill(_checkOutSessionWithParentSession(opCtx, lsid, std::move(killToken))); - } - return SessionToKill(_checkOutSessionWithoutParentSession(opCtx, lsid, std::move(killToken))); + return SessionToKill(_checkOutSessionInner(opCtx, lsid, std::move(killToken))); } void SessionCatalog::scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn) { - std::unique_ptr sessionToReap; + stdx::lock_guard lg(_mutex); - { - stdx::lock_guard lg(_mutex); - auto it = _sessions.find(lsid); - if (it != _sessions.end()) { - auto& sri = it->second; - ObservableSession osession(lg, sri->session); - workerFn(osession); + if (auto sri = _getSessionRuntimeInfo(lg, lsid)) { + auto session = sri->getSession(lsid); + invariant(session); - if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { - sessionToReap = std::move(sri); - _sessions.erase(it); - } - } + ObservableSession osession(lg, sri, session); + workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSession'"); } } void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn) { - std::vector> sessionsToReap; + stdx::lock_guard lg(_mutex); + LOGV2_DEBUG(21976, + 2, + "Scanning {sessionCount} sessions", + "Scanning sessions", + "sessionCount"_attr = _sessions.size()); + + for (auto& [parentLsid, sri] : _sessions) { + if (matcher.match(parentLsid)) { + ObservableSession osession(lg, sri.get(), &sri->parentSession); + workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'"); + } + + for (auto& [childLsid, session] : sri->childSessions) { + if (matcher.match(childLsid)) { + ObservableSession osession(lg, sri.get(), &session); + workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'"); + } + } + } +} + +LogicalSessionIdSet SessionCatalog::scanSessionsForReap( + const LogicalSessionId& parentLsid, + const ScanSessionsCallbackFn& parentSessionWorkerFn, + const ScanSessionsCallbackFn& childSessionWorkerFn) { + invariant(!getParentSessionId(parentLsid)); + + std::unique_ptr sriToReap; { stdx::lock_guard lg(_mutex); - LOGV2_DEBUG(21976, - 2, - "Scanning {sessionCount} sessions", - "Scanning sessions", - "sessionCount"_attr = _sessions.size()); + auto sriIt = _sessions.find(parentLsid); + // The reaper should never try to reap a non-existent session id. + invariant(sriIt != _sessions.end()); + auto sri = sriIt->second.get(); - for (auto it = _sessions.begin(); it != _sessions.end(); ++it) { - if (matcher.match(it->first)) { - auto& sri = it->second; - ObservableSession osession(lg, sri->session); + LogicalSessionIdSet remainingSessions; + bool shouldReapRemaining = true; - workerFn(osession); + { + ObservableSession osession(lg, sri, &sri->parentSession); + parentSessionWorkerFn(osession); - if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { - sessionsToReap.emplace_back(std::move(sri)); - _sessions.erase(it++); + remainingSessions.insert(osession.getSessionId()); + shouldReapRemaining = osession._shouldBeReaped(); + } + + { + auto childSessionIt = sri->childSessions.begin(); + while (childSessionIt != sri->childSessions.end()) { + ObservableSession osession(lg, sri, &childSessionIt->second); + childSessionWorkerFn(osession); + + if (osession._shouldBeReaped() && + (osession._reapMode == ObservableSession::ReapMode::kExclusive)) { + sri->childSessions.erase(childSessionIt++); + continue; } + + remainingSessions.insert(osession.getSessionId()); + shouldReapRemaining &= osession._shouldBeReaped(); + ++childSessionIt; } } + + if (shouldReapRemaining) { + sriToReap = std::move(sriIt->second); + _sessions.erase(sriIt); + remainingSessions.clear(); + } + + return remainingSessions; } } @@ -248,7 +232,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls auto sri = _getSessionRuntimeInfo(lg, lsid); uassert(ErrorCodes::NoSuchSession, "Session not found", sri); - return ObservableSession(lg, sri->session).kill(); + return ObservableSession(lg, sri, &sri->parentSession).kill(); } size_t SessionCatalog::size() const { @@ -258,133 +242,122 @@ size_t SessionCatalog::size() const { void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) { stdx::lock_guard lg(_mutex); - auto parentSri = [&]() -> SessionRuntimeInfo* { - if (auto parentLsid = getParentSessionId(lsid)) { - return _getOrCreateSessionRuntimeInfo(lg, *parentLsid, nullptr); - } - return nullptr; - }(); - _getOrCreateSessionRuntimeInfo(lg, lsid, parentSri); + _getOrCreateSessionRuntimeInfo(lg, lsid); } SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo( WithLock, const LogicalSessionId& lsid) { - auto it = _sessions.find(lsid); - if (it == _sessions.end()) { + auto parentLsid = castToParentSessionId(lsid); + auto sriIt = _sessions.find(parentLsid); + + if (sriIt == _sessions.end()) { return nullptr; } - return it->second.get(); + + auto sri = sriIt->second.get(); + auto session = sri->getSession(lsid); + + if (session) { + return sri; + } + + return nullptr; } SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo( - WithLock lk, const LogicalSessionId& lsid, SessionRuntimeInfo* parentSri) { + WithLock lk, const LogicalSessionId& lsid) { if (auto sri = _getSessionRuntimeInfo(lk, lsid)) { return sri; } - auto it = _sessions.emplace(lsid, std::make_unique(lsid, parentSri)).first; - return it->second.get(); + auto parentLsid = castToParentSessionId(lsid); + auto sriIt = + _sessions.emplace(parentLsid, std::make_unique(parentLsid)).first; + auto sri = sriIt->second.get(); + + if (getParentSessionId(lsid)) { + auto [childSessionIt, inserted] = sri->childSessions.try_emplace(lsid, lsid); + // Insert should always succeed since the session did not exist prior to this. + invariant(inserted); + + auto& childSession = childSessionIt->second; + childSession._parentSession = &sri->parentSession; + } + + return sri; } void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, - SessionRuntimeInfo* parentSri, + Session* session, boost::optional killToken) { stdx::lock_guard lg(_mutex); // Make sure we have exactly the same session on the map and that it is still associated with an // operation context (meaning checked-out) - invariant(_sessions[sri->session.getSessionId()].get() == sri); - invariant(sri->session._checkoutOpCtx); + invariant(_sessions[sri->parentSession.getSessionId()].get() == sri); + invariant(sri->checkoutOpCtx); if (killToken) { - invariant(killToken->lsidToKill == sri->session.getSessionId()); - - if (parentSri) { - invariant(killToken->parentLsidToKill == parentSri->session.getSessionId()); - } else { - invariant(!killToken->parentLsidToKill); - } + invariant(killToken->lsidToKill == session->getSessionId()); } - auto parentLsid = getParentSessionId(sri->session.getSessionId()); - if (parentSri) { - invariant(parentLsid); - invariant(parentSri->session._childSessionCheckoutOpCtx == sri->session._checkoutOpCtx); - parentSri->session._childSessionCheckoutOpCtx = nullptr; - parentSri->availableCondVar.notify_all(); - } else { - invariant(!parentLsid); - } - sri->session._checkoutOpCtx = nullptr; + sri->checkoutOpCtx = nullptr; sri->availableCondVar.notify_all(); if (killToken) { - invariant(sri->session._killsRequested > 0); - --sri->session._killsRequested; - - if (parentSri) { - invariant(killToken->parentLsidToKill); - invariant(parentSri->session._killsRequested > 0); - --parentSri->session._killsRequested; - } + invariant(sri->killsRequested > 0); + --sri->killsRequested; } } -SessionCatalog::SessionRuntimeInfo::~SessionRuntimeInfo() { - invariant(!numWaitingToCheckOut); +Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) { + if (lsid == parentSession._sessionId) { + return &parentSession; + } + + invariant(getParentSessionId(lsid) == parentSession._sessionId); + auto it = childSessions.find(lsid); + if (it == childSessions.end()) { + return nullptr; + } + return &it->second; } SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const { - const bool firstKiller = (0 == _session->_killsRequested); - ++_session->_killsRequested; + const bool firstKiller = (0 == _sri->killsRequested); + ++_sri->killsRequested; // For currently checked-out sessions, interrupt the operation context so that the current owner // can release the session if (firstKiller && hasCurrentOperation()) { - if (_session->_checkoutOpCtx) { - invariant(_clientLock.owns_lock()); - invariant(!_session->_childSessionCheckoutOpCtx); - const auto serviceContext = _session->_checkoutOpCtx->getServiceContext(); - serviceContext->killOperation(_clientLock, _session->_checkoutOpCtx, reason); - } else if (_session->_childSessionCheckoutOpCtx) { - // Both parent and child sessions can't be checked out at the same time, so _clientLock - // should be empty, and we'll never take the child operation context's client lock while - // already holding the parent's. - invariant(!_clientLock.owns_lock()); - stdx::unique_lock childOpClientLock{ - *_session->_childSessionCheckoutOpCtx->getClient()}; - const auto serviceContext = _session->_childSessionCheckoutOpCtx->getServiceContext(); - serviceContext->killOperation( - childOpClientLock, _session->_childSessionCheckoutOpCtx, reason); - } + invariant(_clientLock.owns_lock()); + const auto serviceContext = _sri->checkoutOpCtx->getServiceContext(); + serviceContext->killOperation(_clientLock, _sri->checkoutOpCtx, reason); } - auto parentSession = _session->_parentSession; - if (parentSession) { - const bool firstParentKiller = (0 == parentSession->_killsRequested); - ++parentSession->_killsRequested; - - if (firstParentKiller && parentSession->_checkoutOpCtx) { - // Both parent and child sessions can't be checked out at the same time, so _clientLock - // should be empty, and we'll never take the parent operation context's client lock - // while already holding the child's. - invariant(!_clientLock.owns_lock()); - stdx::unique_lock clientLock{*parentSession->_checkoutOpCtx->getClient()}; - const auto serviceContext = parentSession->_checkoutOpCtx->getServiceContext(); - serviceContext->killOperation(clientLock, parentSession->_checkoutOpCtx, reason); - } - } - - return SessionCatalog::KillToken( - getSessionId(), - parentSession ? boost::make_optional(parentSession->getSessionId()) : boost::none); + return SessionCatalog::KillToken(getSessionId()); } -void ObservableSession::markForReap() { +void ObservableSession::markForReap(ReapMode reapMode) { + if (!getParentSessionId(getSessionId())) { + // By design, parent sessions are only safe to be reaped if all of their child sessions are. + invariant(reapMode == ReapMode::kNonExclusive); + } _markedForReap = true; + _reapMode.emplace(reapMode); +} + +bool ObservableSession::_shouldBeReaped() const { + bool isCheckedOut = [&] { + if (_sri->checkoutOpCtx) { + return _sri->checkoutOpCtx->getLogicalSessionId() == getSessionId(); + } + return false; + }(); + return _markedForReap && !isCheckedOut && !get()->_numWaitingToCheckOut && !_killed(); } bool ObservableSession::_killed() const { - return _session->_killsRequested > 0; + return _sri->killsRequested > 0; } OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opCtx(opCtx) { diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 9d4ec1aa890..339b50fa851 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -49,7 +49,8 @@ namespace mongo { class ObservableSession; /** - * Keeps track of the transaction runtime state for every active session on this instance. + * Keeps track of the transaction runtime state for every active transaction session on this + * instance. */ class SessionCatalog { SessionCatalog(const SessionCatalog&) = delete; @@ -63,13 +64,11 @@ public: class SessionToKill; struct KillToken { - KillToken(LogicalSessionId lsid, boost::optional parentLsid) - : lsidToKill(std::move(lsid)), parentLsidToKill(std::move(parentLsid)) {} + KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {} KillToken(KillToken&&) = default; KillToken& operator=(KillToken&&) = default; LogicalSessionId lsidToKill; - boost::optional parentLsidToKill; }; SessionCatalog() = default; @@ -93,20 +92,33 @@ public: */ SessionToKill checkOutSessionForKill(OperationContext* opCtx, KillToken killToken); + using ScanSessionsCallbackFn = std::function; + /** * Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to - * each Session which matches the specified 'matcher'. + * each Session which matches the specified 'matcher'. Does not support reaping. * * NOTE: Since this method runs with the session catalog mutex, the work done by 'workerFn' is * not allowed to block, perform I/O or acquire any lock manager locks. * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the * SessionCatalog. */ - using ScanSessionsCallbackFn = std::function; void scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn); void scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn); + /** + * Same as the above but applies 'parentSessionWorkerFn' to the Session whose session id is + * equal to 'parentLsid' and then applies 'childSessionWorkerFn' to the Sessions whose parent + * session id is equal to 'parentLsid'. To be used with 'markForReap' for reaping sessions + * from the SessionCatalog. It enables transaction sessions that corresponds to the same + * logical session to be reaped atomically. Returns the session ids for the matching Sessions + * that were not reaped after the scan. + */ + LogicalSessionIdSet scanSessionsForReap(const LogicalSessionId& parentLsid, + const ScanSessionsCallbackFn& parentSessionWorkerFn, + const ScanSessionsCallbackFn& childSessionWorkerFn); + /** * Shortcut to invoke 'kill' on the specified session under the SessionCatalog mutex. Throws a * NoSuchSession exception if the session doesn't exist. @@ -124,42 +136,50 @@ public: void createSessionIfDoesNotExist(const LogicalSessionId& lsid); private: + /** + * Tracks the runtime info for transaction sessions that corresponds to the same logical + * session. Designed such that only one transaction session can be checked out at any given + * time. + */ struct SessionRuntimeInfo { - SessionRuntimeInfo(LogicalSessionId lsid, SessionRuntimeInfo* parentSri) - : session(std::move(lsid)) { - // If we're a child we must have been given a parent session, if not, we must not have. - invariant(bool(getParentSessionId(lsid)) == bool(parentSri)); - if (parentSri) { - session._parentSession = &parentSri->session; - } + SessionRuntimeInfo(LogicalSessionId lsid) : parentSession(std::move(lsid)) { + // Can only create a SessionRuntimeInfo with a parent transaction session id. + invariant(!getParentSessionId(lsid)); } - ~SessionRuntimeInfo(); - // Must only be accessed when the state is kInUse and only by the operation context, which - // currently has it checked out - Session session; + Session* getSession(const LogicalSessionId& lsid); - // Counts how many threads have called checkOutSession/checkOutSessionForKill and are - // blocked in it waiting for the session to become available. Used to block reaping of - // sessions entries from the map. - int numWaitingToCheckOut{0}; + // Must only be accessed by the OperationContext which currently has this logical session + // checked out. + Session parentSession; + LogicalSessionIdMap childSessions; // Signaled when the state becomes available. Uses the transaction table's mutex to protect // the state transitions. stdx::condition_variable availableCondVar; + + // Pointer to the OperationContext for the operation running on this logical session, or + // nullptr if there is no operation currently running on the session. + OperationContext* checkoutOpCtx{nullptr}; + + // Last check-out time for this logical session. Updated every time any of the transaction + // sessions gets checked out. + Date_t lastCheckout{Date_t::now()}; + + // Counter indicating the number of times ObservableSession::kill has been called on this + // SessionRuntimeInfo, which have not yet had a corresponding call to + // checkOutSessionForKill. + int killsRequested{0}; }; using SessionRuntimeInfoMap = LogicalSessionIdMap>; /** - * Blocking method, which checks-out the session with the given 'lsid'. + * Blocking method, which checks-out the session with the given 'lsid'. Called inside + * '_checkOutSession' and 'checkOutSessionForKill'. */ - ScopedCheckedOutSession _checkOutSessionWithParentSession(OperationContext* opCtx, - const LogicalSessionId& lsid, - boost::optional killToken); - ScopedCheckedOutSession _checkOutSessionWithoutParentSession( - OperationContext* opCtx, - const LogicalSessionId& lsid, - boost::optional killToken); + ScopedCheckedOutSession _checkOutSessionInner(OperationContext* opCtx, + const LogicalSessionId& lsid, + boost::optional killToken); /** * Blocking method, which checks-out the session set on 'opCtx'. @@ -175,19 +195,14 @@ private: /** * Creates or returns the session runtime info for 'lsid' from the '_sessions' map. The * returned pointer is guaranteed to be linked on the map for as long as the mutex is held. - * - * If we're creating a child session, a pointer to the session runtime info of its parent must - * be provided. */ - SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, - const LogicalSessionId& lsid, - SessionRuntimeInfo* parentSri); + SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid); /** * Makes a session, previously checked out through 'checkoutSession', available again. */ void _releaseSession(SessionRuntimeInfo* sri, - SessionRuntimeInfo* parentSri, + Session* session, boost::optional killToken); // Protects the state below @@ -199,32 +214,27 @@ private: }; /** - * Scoped object representing a checked-out session. This type is an implementation detail - * of the SessionCatalog. + * Scoped object representing a checked-out transaction session. This type is an implementation + * detail of the SessionCatalog. */ class SessionCatalog::ScopedCheckedOutSession { public: ScopedCheckedOutSession(SessionCatalog& catalog, SessionCatalog::SessionRuntimeInfo* sri, - SessionCatalog::SessionRuntimeInfo* parentSri, + Session* session, boost::optional killToken) - : _catalog(catalog), _sri(sri), _parentSri(parentSri), _killToken(std::move(killToken)) { - if (_parentSri) { - invariant(getParentSessionId(_sri->session.getSessionId()) == - _parentSri->session.getSessionId()); - } + : _catalog(catalog), _sri(sri), _session(session), _killToken(std::move(killToken)) { if (_killToken) { - invariant(_sri->session.getSessionId() == _killToken->lsidToKill); + invariant(session->getSessionId() == _killToken->lsidToKill); } } ScopedCheckedOutSession(ScopedCheckedOutSession&& other) : _catalog(other._catalog), _sri(other._sri), - _parentSri(other._parentSri), + _session(other._session), _killToken(std::move(other._killToken)) { other._sri = nullptr; - other._parentSri = nullptr; } ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&&) = delete; @@ -233,12 +243,16 @@ public: ~ScopedCheckedOutSession() { if (_sri) { - _catalog._releaseSession(_sri, _parentSri, std::move(_killToken)); + _catalog._releaseSession(_sri, _session, std::move(_killToken)); } } + OperationContext* currentOperation_forTest() const { + return _sri->checkoutOpCtx; + } + Session* get() const { - return &_sri->session; + return _session; } Session* operator->() const { @@ -258,7 +272,7 @@ private: SessionCatalog& _catalog; SessionCatalog::SessionRuntimeInfo* _sri; - SessionCatalog::SessionRuntimeInfo* _parentSri; + Session* _session; boost::optional _killToken; }; @@ -280,11 +294,13 @@ public: Session* get() const { return _scos.get(); } + const LogicalSessionId& getSessionId() const { return get()->getSessionId(); } + OperationContext* currentOperation_forTest() const { - return get()->currentOperation_forTest(); + return _scos.currentOperation_forTest(); } private: @@ -295,7 +311,7 @@ private: using SessionToKill = SessionCatalog::SessionToKill; /** - * This type represents access to a session inside of a scanSessions loop. + * This type represents access to a transaction session inside of a scanSessions loop. * If you have one of these, you're in a scanSessions callback context, and so * have locked the whole catalog and, if the observed session is bound to an operation context, * you hold that operation context's client's mutex, as well. @@ -308,31 +324,34 @@ public: ObservableSession& operator=(ObservableSession&&) = delete; /** - * The logical session id that this object represents. + * The session id for this transaction session. */ const LogicalSessionId& getSessionId() const { return _session->_sessionId; } /** - * Returns true if there is an operation currently running on this Session. + * Returns true if there is an operation currently running on the logical session that this + * transaction session corresponds to. */ bool hasCurrentOperation() const { - return _session->_checkoutOpCtx || _session->_childSessionCheckoutOpCtx; + return _sri->checkoutOpCtx; } /** - * Returns when is the last time this session was checked-out, for reaping purposes. + * Returns the last check-out time for the logical session that this transaction session + * corresponds to. Used for reaping purposes. */ Date_t getLastCheckout() const { - return _session->_lastCheckout; + return _sri->lastCheckout; } /** - * Increments the number of "killers" for this session and returns a 'kill token' to to be - * passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit - * the caller to execute any kill cleanup tasks. This token is later used to decrement the - * number of "killers". + * Increments the number of "killers" for the logical session that this transaction session + * corresponds to and returns a 'kill token' to to be passed later on to + * 'checkOutSessionForKill' method of the SessionCatalog in order to permit the caller to + * execute any kill cleanup tasks. This token is later used to decrement the number of + * "killers". * * Marking session as killed is an internal property only that will cause any further calls to * 'checkOutSession' to block until 'checkOutSessionForKill' is called the same number of times @@ -344,14 +363,22 @@ public: SessionCatalog::KillToken kill(ErrorCodes::Error reason = ErrorCodes::Interrupted) const; /** - * Indicates to the SessionCatalog that the session tracked by this object is safe to be deleted - * from the map. It is up to the caller to provide the necessary checks that all the decorations - * they are using are prepared to be destroyed. + * To be used with 'scanSessionsForReap' to indicate to the SessionCatalog that, from the user + * perspective, this transaction session is safe to be reaped. That is, the reaper has checked + * that the session has expired and all the decorations they are using are prepared to be + * destroyed. There are two reap modes: + * - kExclusive indicates that the session is safe to be reaped independently of the other + * sessions matched by 'scanSessionsForReap'. + * - kNonExclusive indicates that the session is only safe to reaped if all the other sessions + * are also safe to be reaped. * - * Calling this method does not guarantee that the session will in fact be destroyed, which - * could happen if there are threads waiting for it to be checked-out. + * Calling this method does not guarantee that the session will in fact be reaped. The + * SessionCatalog performs additional checks to protect sessions that are still in use from + * being reaped. However, reaping will still obey the specified reap mode. See the comment for + * '_shouldBeReaped' for more info. */ - void markForReap(); + enum class ReapMode { kExclusive, kNonExclusive }; + void markForReap(ReapMode reapMode); /** * Returns a pointer to the Session itself. @@ -363,15 +390,16 @@ public: private: friend class SessionCatalog; - static stdx::unique_lock _lockClientForSession(WithLock, Session* session) { - if (const auto opCtx = session->_checkoutOpCtx) { + static stdx::unique_lock _lockClientForSession( + WithLock, SessionCatalog::SessionRuntimeInfo* sri) { + if (const auto opCtx = sri->checkoutOpCtx) { return stdx::unique_lock{*opCtx->getClient()}; } return {}; } - ObservableSession(WithLock wl, Session& session) - : _session(&session), _clientLock(_lockClientForSession(std::move(wl), _session)) {} + ObservableSession(WithLock wl, SessionCatalog::SessionRuntimeInfo* sri, Session* session) + : _sri(sri), _session(session), _clientLock(_lockClientForSession(std::move(wl), _sri)) {} /** * Returns whether 'kill' has been called on this session. @@ -386,15 +414,20 @@ private: } /** - * Returns true if this Session should be be deleted from the map. + * Returns true if this transaction session should be be reaped from the SessionCatalog. + * That is, the session has been marked for reap and both of the following are true: + * - It is not checked out by any thread, and there are no threads waiting for it to be + * checked out. + * - It is not marked for kill (i.e. expected to be checked out for kill). */ - bool _shouldBeReaped(int numWaitingToCheckOut) const { - return _markedForReap && !_killed() && !hasCurrentOperation() && !numWaitingToCheckOut; - } + bool _shouldBeReaped() const; + SessionCatalog::SessionRuntimeInfo* _sri; Session* _session; stdx::unique_lock _clientLock; + bool _markedForReap{false}; + boost::optional _reapMode; }; /** diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 9d4ead0a2a3..5cd0b76ad23 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -124,30 +124,119 @@ void disallowDirectWritesUnderSession(OperationContext* opCtx) { } } +/** + * Removes the transaction sessions that are expired and not in use from the in-memory catalog + * (i.e. SessionCatalog). Returns the session ids for the expired transaction sessions that were + * not removed because they were in use. + */ +LogicalSessionIdSet removeExpiredTransactionSessionsNotInUseFromMemory( + OperationContext* opCtx, SessionsCollection& sessionsCollection, Date_t possiblyExpired) { + const auto catalog = SessionCatalog::get(opCtx); + + // Find the possibly expired logical session ids in the in-memory catalog. + LogicalSessionIdSet possiblyExpiredLogicalSessionIds; + catalog->scanSessions( + SessionKiller::Matcher(KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}), + [&](const ObservableSession& session) { + const auto sessionId = session.getSessionId(); + + // Skip child transaction sessions since they correspond to the same logical session as + // their parent transaction session so they have the same last check-out time as the + // the parent's. + if (session.getLastCheckout() < possiblyExpired && !getParentSessionId(sessionId)) { + possiblyExpiredLogicalSessionIds.insert(sessionId); + } + }); + // From the possibly expired logical session ids, find the ones that have been removed from + // from the config.system.sessions collection. + LogicalSessionIdSet expiredLogicalSessionIds = + sessionsCollection.findRemovedSessions(opCtx, possiblyExpiredLogicalSessionIds); + + // For each removed logical session id, removes all of its transaction session ids that are no + // longer in use from the in-memory catalog. + LogicalSessionIdSet expiredTransactionSessionIdsStillInUse; + for (const auto& expiredLogicalSessionId : expiredLogicalSessionIds) { + invariant(!getParentSessionId(expiredLogicalSessionId)); + + // Scan all the transaction sessions for this logical session at once so reaping can be done + // atomically. + TxnNumber parentSessionActiveTxnNumber; + const auto transactionSessionIdsNotReaped = catalog->scanSessionsForReap( + expiredLogicalSessionId, + [&](ObservableSession& parentSession) { + const auto transactionSessionId = parentSession.getSessionId(); + const auto txnParticipant = TransactionParticipant::get(parentSession); + const auto txnRouter = TransactionRouter::get(parentSession); + + parentSessionActiveTxnNumber = + txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(); + if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { + // This is an external session so it can be reaped if and only if all of its + // internal sessions can be reaped. + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + } + }, + [&](ObservableSession& childSession) { + const auto transactionSessionId = childSession.getSessionId(); + const auto txnParticipant = TransactionParticipant::get(childSession); + const auto txnRouter = TransactionRouter::get(childSession); + + if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { + if (isInternalSessionForNonRetryableWrite(transactionSessionId)) { + // This is an internal session for a non-retryable write so it can be reaped + // independently of the external session that write ran in. + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } else if (isInternalSessionForRetryableWrite(transactionSessionId)) { + // This is an internal session for a retryable write so it must be reaped + // atomically with the external session and internal sessions for that + // retryable write, unless the write is no longer active (i.e. there is + // already a retryable write or transaction with a higher txnNumber). + childSession.markForReap(*transactionSessionId.getTxnNumber() < + parentSessionActiveTxnNumber + ? ObservableSession::ReapMode::kExclusive + : ObservableSession::ReapMode::kNonExclusive); + } else { + MONGO_UNREACHABLE; + } + } + }); + expiredTransactionSessionIdsStillInUse.insert(transactionSessionIdsNotReaped.begin(), + transactionSessionIdsNotReaped.end()); + } + + return expiredTransactionSessionIdsStillInUse; +} + const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; /** - * Removes the specified set of session ids from the persistent sessions collection and returns the - * number of sessions actually removed. + * Removes the the config.transactions and the config.image_collection entries for the transaction + * sessions in 'possiblyExpiredTransactionSessionIds' that are actually expired. Returns the number + * of transaction sessions whose entries were removed. */ -int removeSessionsTransactionRecords(OperationContext* opCtx, - SessionsCollection& sessionsCollection, - const LogicalSessionIdSet& sessionIdsToRemove) { - if (sessionIdsToRemove.empty()) +int removeSessionsTransactionRecords( + OperationContext* opCtx, + SessionsCollection& sessionsCollection, + const LogicalSessionIdSet& possiblyExpiredTransactionSessionIds) { + if (possiblyExpiredTransactionSessionIds.empty()) { return 0; + } - // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove); + // From the possibly expired transaction session ids, find the ones which are actually + // expired/removed. + auto expiredTransactionSessionIds = + sessionsCollection.findRemovedSessions(opCtx, possiblyExpiredTransactionSessionIds); - if (expiredSessionIds.empty()) + if (expiredTransactionSessionIds.empty()) { return 0; + } - // Remove findAndModify images that map to deleted sessions. We first delete any images - // belonging to sessions about to be reaped, followed by the sessions. This way if there's a - // failure, we'll only be left with sessions that have a dangling reference to an image. Session - // reaping will rediscover the sessions to delete and try again. + // Remove the config.image_collection entries for the expired transaction session ids. We first + // delete any images belonging to sessions about to be reaped, followed by the sessions. This + // way if there's a failure, we'll only be left with sessions that have a dangling reference + // to an image. Session reaping will rediscover the sessions to delete and try again. // // We opt for this rather than performing the two sets of deletes in a single transaction simply // to reduce code complexity. @@ -161,16 +250,17 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, }()); imageDeleteOp.setDeletes([&] { std::vector entries; - for (const auto& lsid : expiredSessionIds) { - entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), - false /* multi = false */); + for (const auto& transactionSessionId : expiredTransactionSessionIds) { + entries.emplace_back( + BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), + false /* multi = false */); } return entries; }()); return imageDeleteOp; }())); - // Remove the session ids from the on-disk catalog + // Remove the config.transaction entries for the expired transaction session ids. auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] { write_ops::DeleteCommandRequest sessionDeleteOp( NamespaceString::kSessionTransactionsTableNamespace); @@ -181,9 +271,10 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, }()); sessionDeleteOp.setDeletes([&] { std::vector entries; - for (const auto& lsid : expiredSessionIds) { - entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), - false /* multi = false */); + for (const auto& transactionSessionId : expiredTransactionSessionIds) { + entries.emplace_back( + BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), + false /* multi = false */); } return entries; }()); @@ -193,6 +284,53 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, return sessionDeleteReply.getN(); } +/** + * Removes the transaction sessions that are expired and not in use from the on-disk catalog (i.e. + * the config.transactions collection and the config.image_collection collection). Returns the + * number of transaction sessions whose entries were removed. + */ +int removeExpiredTransactionSessionsFromDisk( + OperationContext* opCtx, + SessionsCollection& sessionsCollection, + Date_t possiblyExpired, + const LogicalSessionIdSet& expiredTransactionSessionIdsStillInUse) { + // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index. + DBDirectClient client(opCtx); + FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; + findRequest.setFilter(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); + findRequest.setSort(kSortById); + findRequest.setProjection(kIdProjection); + auto cursor = client.find(std::move(findRequest)); + + // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size + // limit. + const int kMaxBatchSize = 10'000; + + LogicalSessionIdSet possiblyExpiredTransactionSessionIds; + int numReaped = 0; + while (cursor->more()) { + auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( + "TransactionSession"_sd, cursor->next()); + const auto transactionSessionId = transactionSession.get_id(); + + if (expiredTransactionSessionIdsStillInUse.find(transactionSessionId) != + expiredTransactionSessionIdsStillInUse.end()) { + continue; + } + + possiblyExpiredTransactionSessionIds.insert(transactionSessionId); + if (possiblyExpiredTransactionSessionIds.size() > kMaxBatchSize) { + numReaped += removeSessionsTransactionRecords( + opCtx, sessionsCollection, possiblyExpiredTransactionSessionIds); + possiblyExpiredTransactionSessionIds.clear(); + } + } + numReaped += removeSessionsTransactionRecords( + opCtx, sessionsCollection, possiblyExpiredTransactionSessionIds); + + return numReaped; +} + void createTransactionTable(OperationContext* opCtx) { auto serviceCtx = opCtx->getServiceContext(); CollectionOptions options; @@ -422,33 +560,9 @@ void MongoDSessionCatalog::invalidateAllSessions(OperationContext* opCtx) { int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, SessionsCollection& sessionsCollection, Date_t possiblyExpired) { - { - const auto catalog = SessionCatalog::get(opCtx); - - // Capture the possbily expired in-memory session ids - LogicalSessionIdSet lsids; - catalog->scanSessions(SessionKiller::Matcher( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}), - [&](const ObservableSession& session) { - if (session.getLastCheckout() < possiblyExpired) { - lsids.insert(session.getSessionId()); - } - }); - - // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, lsids); - - // Remove the session ids from the in-memory catalog - for (const auto& lsid : expiredSessionIds) { - catalog->scanSession(lsid, [](ObservableSession& session) { - const auto participant = TransactionParticipant::get(session); - const auto txnRouter = TransactionRouter::get(session); - if (!participant.transactionIsOpen() && txnRouter.canBeReaped()) { - session.markForReap(); - } - }); - } - } + const auto expiredTransactionSessionIdsStillInUse = + removeExpiredTransactionSessionsNotInUseFromMemory( + opCtx, sessionsCollection, possiblyExpired); // The "unsafe" check for primary below is a best-effort attempt to ensure that the on-disk // state reaping code doesn't run if the node is secondary and cause log spam. It is a work @@ -458,34 +572,8 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, if (!replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, NamespaceString::kConfigDb)) return 0; - // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index - DBDirectClient client(opCtx); - FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; - findRequest.setFilter(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); - findRequest.setSort(kSortById); - findRequest.setProjection(kIdProjection); - auto cursor = client.find(std::move(findRequest)); - - // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size - // limit - const int kMaxBatchSize = 10'000; - - LogicalSessionIdSet lsids; - int numReaped = 0; - while (cursor->more()) { - auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( - "TransactionSession"_sd, cursor->next()); - - lsids.insert(transactionSession.get_id()); - if (lsids.size() > kMaxBatchSize) { - numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids); - lsids.clear(); - } - } - - numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids); - - return numReaped; + return removeExpiredTransactionSessionsFromDisk( + opCtx, sessionsCollection, possiblyExpired, expiredTransactionSessionIdsStillInUse); } MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* opCtx) diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index aeab5bf078a..0c9599e6529 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -79,6 +79,35 @@ protected: ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); } + /** + * Creates the session with the given 'lsid' by checking it out from the SessionCatalog and then + * checking it back in. + */ + void createSession(const LogicalSessionId& lsid) { + stdx::async(stdx::launch::async, + [this, lsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + }) + .get(); + } + + /** + * Returns the session ids for all sessions in the SessionCatalog. + */ + std::vector getAllSessionIds(OperationContext* opCtx) { + std::vector lsidsFound; + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + const auto getAllSessionIdsWorkerFn = [&lsidsFound](const ObservableSession& session) { + lsidsFound.push_back(session.getSessionId()); + }; + catalog()->scanSessions(matcherAllSessions, getAllSessionIdsWorkerFn); + return lsidsFound; + }; + RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true}; }; @@ -274,7 +303,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { TEST_F(SessionCatalogTest, ScanSession) { // Create sessions in the catalog. - const auto lsids = []() -> std::vector { + const auto& lsids = []() -> std::vector { auto lsid0 = makeLogicalSessionIdForTest(); auto lsid1 = makeLogicalSessionIdForTest(); auto lsid2 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(lsid1); @@ -313,46 +342,113 @@ TEST_F(SessionCatalogTest, ScanSession) { }); } -TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { - // Create sessions in the catalog. - const auto lsids = []() -> std::vector { - auto lsid0 = makeLogicalSessionIdForTest(); - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(lsid1); - auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); - return {lsid0, lsid1, lsid2, lsid3}; - }(); - for (const auto& lsid : lsids) { - stdx::async(stdx::launch::async, - [this, lsid] { - ThreadClient tc(getServiceContext()); - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - }) - .get(); +TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsForReapWhenSessionIsIdle) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid0 = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid); + auto childLsid1 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid); + auto otherParentLsid = makeLogicalSessionIdForTest(); + + createSession(parentLsid); + createSession(childLsid0); + createSession(childLsid1); + createSession(otherParentLsid); + auto lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(4U, lsidsFound.size()); + + // Mark otherParentSession for reap. The session should get reaped since it doesn't have any + // child session. + auto lsidsNotReaped = catalog()->scanSessionsForReap( + otherParentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) {}); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(3U, lsidsFound.size()); + catalog()->scanSession(otherParentLsid, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); + }); + ASSERT_EQ(0U, lsidsNotReaped.size()); + + // Mark parentSession for reap. The session should not get reaped since its child sessions + // (i.e. childSession0 and childSession1) are not marked for reaped. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) {}); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(3U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); } - catalog()->scanSession(lsids[0], - [&lsids](ObservableSession& session) { session.markForReap(); }); + // Mark childSession0 and childSession1 for reap with kNonExclusive mode. The sessions should + // not get reaped since parentSession is not marked for reaped. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { - catalog()->scanSession(lsids[0], [](const ObservableSession&) { - FAIL("The callback was called for non-existent session"); + }, + [](ObservableSession& childSession) { + childSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(3U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } + + // Mark childSession0 for reap with kExclusive mode. The session should get reaped. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) {}, + [&](ObservableSession& childSession) { + if (childSession.getSessionId() == childLsid0) { + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(2U, lsidsFound.size()); + catalog()->scanSession(childLsid0, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); }); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } - catalog()->scanSession(lsids[1], [&lsids](const ObservableSession& session) { - ASSERT_EQ(lsids[1], session.get()->getSessionId()); - }); + // Mark parentSession and childSession1 for reap with kNonExclusive mode. Both sessions should + // get reaped since all sessions are now marked for reap. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) { + childSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }); - catalog()->scanSession(lsids[2], - [&lsids](ObservableSession& session) { session.markForReap(); }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(0U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); +} - catalog()->scanSession(lsids[2], [](const ObservableSession&) { - FAIL("The callback was called for non-existent session"); - }); +DEATH_TEST_F(SessionCatalogTestWithDefaultOpCtx, + ScanSessionDoesNotSupportReaping, + "Cannot reap a session via 'scanSession'") { + auto lsid = makeLogicalSessionIdForTest(); - catalog()->scanSession(lsids[3], [&lsids](const ObservableSession& session) { - ASSERT_EQ(lsids[3], session.get()->getSessionId()); + { + _opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(_opCtx); + } + + catalog()->scanSession(lsid, [](ObservableSession& session) { + session.markForReap(ObservableSession::ReapMode::kNonExclusive); }); } @@ -370,7 +466,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { lsidsFound.clear(); // Create sessions in the catalog. - const auto lsids = []() -> std::vector { + const auto& lsids = []() -> std::vector { auto lsid0 = makeLogicalSessionIdForTest(); auto lsid1 = makeLogicalSessionIdForTest(); auto lsid2 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(lsid1); @@ -417,48 +513,216 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ErrorCodes::InvalidOptions); } -TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { - // Create sessions in the catalog. - const auto lsids = []() -> std::vector { - auto lsid0 = makeLogicalSessionIdForTest(); - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(lsid1); - auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); - return {lsid0, lsid1, lsid2, lsid3}; - }(); +TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsForReapWhenParentSessionIsCheckedOut) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid0 = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid); + auto childLsid1 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid); + + createSession(parentLsid); + createSession(childLsid0); + createSession(childLsid1); + auto lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(3U, lsidsFound.size()); unittest::Barrier sessionsCheckedOut(2); unittest::Barrier sessionsCheckedIn(2); + // Check out parentSession. auto f = stdx::async(stdx::launch::async, [&] { ThreadClient tc(getServiceContext()); auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsids[1]); + opCtx->setLogicalSessionId(parentLsid); OperationContextSession ocs(opCtx.get()); sessionsCheckedOut.countDownAndWait(); sessionsCheckedIn.countDownAndWait(); }); - - // After this wait, session 1 is checked-out and waiting on the barrier, because of which only - // sessions 0, 2 and 3 will be reaped. + // After this wait, parentSession is checked out and waiting on the barrier. sessionsCheckedOut.countDownAndWait(); - SessionKiller::Matcher matcherAllSessions( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}); + // Mark parentSession for reap, and additionally mark childSession0 and childSession1 for reap + // with kNonExclusive mode. parentSession should not get reaped because it is checked out. + // childSession0 and childSession1 also should not get reaped since they must be reaped with + // parentSession. + auto lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) { + childSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(3U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } - catalog()->scanSessions(matcherAllSessions, - [&](ObservableSession& session) { session.markForReap(); }); - - catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { - ASSERT_EQ(lsids[1], session.get()->getSessionId()); + // Mark childSession0 for reap with kExclusive mode. It should get reaped although parentSession + // is checked out. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) {}, + [&](ObservableSession& childSession) { + if (childSession.getSessionId() == childLsid0) { + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(2U, lsidsFound.size()); + catalog()->scanSession(childLsid0, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); }); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } - // After this point, session 1 is checked back in + // Mark childSession1 for reap with mode kExclusive. The session should get reaped although + // parentSession is checked out. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) {}, + [](ObservableSession& childSession) { + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(1U, lsidsFound.size()); + catalog()->scanSession(childLsid1, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); + }); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } + + // After this point, parentSession is checked back in. sessionsCheckedIn.countDownAndWait(); f.get(); - catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { - ASSERT_EQ(lsids[1], session.get()->getSessionId()); + // Mark parentSession for reap. The session should now get reaped. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) { + + }); + + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(0U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsForReapWhenChildSessionIsCheckedOut) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = TxnNumber{0}; + auto childLsid0 = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid); + auto childLsid1 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber++); + auto childLsid2 = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + + createSession(parentLsid); + createSession(childLsid0); + createSession(childLsid1); + createSession(childLsid2); + auto lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(4U, lsidsFound.size()); + + unittest::Barrier sessionsCheckedOut(2); + unittest::Barrier sessionsCheckedIn(2); + + // Check out childSession2. + auto f = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid2); + OperationContextSession ocs(opCtx.get()); + sessionsCheckedOut.countDownAndWait(); + sessionsCheckedIn.countDownAndWait(); + }); + // After this wait, childSession2 is checked out and waiting on the barrier. + sessionsCheckedOut.countDownAndWait(); + + // Mark childSession2 for reap with kExclusive mode. The session should not get reaped since it + // is checked out. + auto lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) {}, + [&](ObservableSession& childSession) { + if (childSession.getSessionId() == childLsid2) { + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } + }); + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(4U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } + + // Mark parentSession for reap, and additionally mark Reap childSession0 and childSession1 with + // mode kExclusive. parentSession should not get reaped because childSession2 is checked out. + // childSession0 and childSession1 should get reaped since they are not checked out. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [&](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [&](ObservableSession& childSession) { + auto lsid = childSession.getSessionId(); + if (lsid == childLsid0 || lsid == childLsid1) { + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } + }); + + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(2U, lsidsFound.size()); + catalog()->scanSession(childLsid0, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); + }); + catalog()->scanSession(childLsid1, [](const ObservableSession&) { + FAIL("Found a session that should have been reaped"); + }); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); + for (const auto& lsid : lsidsFound) { + ASSERT(lsidsNotReaped.find(lsid) != lsidsNotReaped.end()); + } + + // After this point, childSession2 is checked back in. + sessionsCheckedIn.countDownAndWait(); + f.get(); + + // Mark parentSession and childSession2 for reap with kNonExclusive mode. Both sessions should + // get reaped. + lsidsNotReaped = catalog()->scanSessionsForReap( + parentLsid, + [](ObservableSession& parentSession) { + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) { + childSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + }); + + lsidsFound = getAllSessionIds(_opCtx); + ASSERT_EQ(0U, lsidsFound.size()); + ASSERT_EQ(lsidsFound.size(), lsidsNotReaped.size()); +} + +DEATH_TEST_F(SessionCatalogTestWithDefaultOpCtx, + ScanSessionsDoesNotSupportReaping, + "Cannot reap a session via 'scanSessions'") { + { + auto lsid = makeLogicalSessionIdForTest(); + _opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(_opCtx); + } + + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}); + catalog()->scanSessions(matcherAllSessions, [](ObservableSession& session) { + session.markForReap(ObservableSession::ReapMode::kNonExclusive); }); } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 0d99dd45947..298cf5b3cc2 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -2698,7 +2698,11 @@ void RetryableWriteTransactionParticipantCatalog::addParticipant( _activeTxnNumber = *txnNumber; _participants.clear(); } - _participants.emplace(participant._sessionId(), participant); + if (auto it = _participants.find(participant._sessionId()); it != _participants.end()) { + invariant(it->second._tp == participant._tp); + } else { + _participants.emplace(participant._sessionId(), participant); + } } void RetryableWriteTransactionParticipantCatalog::reset() { diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index b1df1925627..26fb9b7ec7d 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -302,6 +302,14 @@ public: */ bool expiredAsOf(Date_t when) const; + /** + * Returns if this TransactionParticipant instance can be reaped. Always true unless there + * is an open transaction on this session. + */ + auto canBeReaped() const { + return !transactionIsOpen(); + } + /** * Returns whether we are in an open multi-document transaction, which means we have an * active transaction which has autocommit:false and has not been committed or aborted. It @@ -1212,9 +1220,9 @@ public: } /** - * Adds the given participant to the catalog (overrides any existing participant with the same - * session id) and sets the txnNumber to that of the retryable write running on the participant. - * Throws an invariant error if the participant requires a refresh. + * Adds the given participant to the catalog and sets the txnNumber to that of the retryable + * write running on the participant. If a participant with the same session id already exists, + * invariants that it corresponds to the same TransactionParticipant. */ void addParticipant(const TransactionParticipant::Participant& participant); diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp index e8b9470ca17..aef732d30b0 100644 --- a/src/mongo/s/session_catalog_router.cpp +++ b/src/mongo/s/session_catalog_router.cpp @@ -43,26 +43,54 @@ int RouterSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, Date_t possiblyExpired) { const auto catalog = SessionCatalog::get(opCtx); - // Capture the possbily expired in-memory session ids - LogicalSessionIdSet lsids; + // Find the possibly expired logical session ids in the in-memory catalog. + LogicalSessionIdSet possiblyExpiredLogicalSessionIds; catalog->scanSessions( SessionKiller::Matcher(KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}), [&](const ObservableSession& session) { - if (session.getLastCheckout() < possiblyExpired) { - lsids.insert(session.getSessionId()); + const auto sessionId = session.getSessionId(); + + // Skip child transaction sessions since they correspond to the same logical session as + // their parent transaction session so they have the same last check-out time as the + // the parent's. + if (session.getLastCheckout() < possiblyExpired && !getParentSessionId(sessionId)) { + possiblyExpiredLogicalSessionIds.insert(session.getSessionId()); } }); + // From the possibly expired logical session ids, find the ones that have been removed from + // from the config.system.sessions collection. + auto expiredLogicalSessionIds = + sessionsCollection.findRemovedSessions(opCtx, possiblyExpiredLogicalSessionIds); - // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, lsids); - - // Remove the session ids from the in-memory catalog + // For each removed logical session id, removes all of its transaction session ids that are no + // longer in use from the in-memory catalog. int numReaped = 0; - for (const auto& lsid : expiredSessionIds) { - catalog->scanSession(lsid, [&](ObservableSession& session) { - session.markForReap(); - ++numReaped; - }); + + for (const auto& logicalSessionId : expiredLogicalSessionIds) { + // Scan all the transaction sessions for this logical session at once so reaping can be done + // atomically. + int numTransactionSessions = 0; + const auto transactionSessionIdsNotReaped = catalog->scanSessionsForReap( + logicalSessionId, + [&](ObservableSession& parentSession) { + const auto txnRouter = TransactionRouter::get(parentSession); + if (txnRouter.canBeReaped()) { + // Only reap this transaction session if every other transaction session for + // this logical session is also safe to be reaped. + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + } + ++numTransactionSessions; + }, + [&](ObservableSession& childSession) { + const auto txnRouter = TransactionRouter::get(childSession); + if (txnRouter.canBeReaped()) { + // Only reap this transaction session if every other transaction session for + // this logical session is also safe to be reaped. + childSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + } + ++numTransactionSessions; + }); + numReaped += numTransactionSessions - transactionSessionIdsNotReaped.size(); } return numReaped; diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index b132e05bdd7..94a57f1ac2d 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -4671,8 +4671,12 @@ TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForInactiveTxn) { // Mark the session for reap which will also erase it from the catalog. auto catalog = SessionCatalog::get(operationContext()->getServiceContext()); - catalog->scanSession(*operationContext()->getLogicalSessionId(), - [](ObservableSession& session) { session.markForReap(); }); + catalog->scanSessionsForReap(*operationContext()->getLogicalSessionId(), + [](ObservableSession& parentSession) { + parentSession.markForReap( + ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) {}); // Verify the session was reaped. catalog->scanSession(*operationContext()->getLogicalSessionId(), [](const ObservableSession&) { @@ -4703,8 +4707,12 @@ TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForUnstartedTxn) { // Mark the session for reap which will also erase it from the catalog. auto catalog = SessionCatalog::get(operationContext()->getServiceContext()); - catalog->scanSession(*operationContext()->getLogicalSessionId(), - [](ObservableSession& session) { session.markForReap(); }); + catalog->scanSessionsForReap(*operationContext()->getLogicalSessionId(), + [](ObservableSession& parentSession) { + parentSession.markForReap( + ObservableSession::ReapMode::kNonExclusive); + }, + [](ObservableSession& childSession) {}); // Verify the session was reaped. catalog->scanSession(*operationContext()->getLogicalSessionId(), [](const ObservableSession&) {