From 96aaf2f5279fb9eee5d0c1a2ce53d243b2772eee Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 12 Jun 2018 15:16:18 -0700 Subject: [PATCH] PYTHON-1562 Add transaction examples for docs --- test/test_examples.py | 197 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 196 insertions(+), 1 deletion(-) diff --git a/test/test_examples.py b/test/test_examples.py index 64a7282e7..bb2e2fd5b 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -21,8 +21,9 @@ sys.path[0:0] = [""] import pymongo from pymongo.errors import ConnectionFailure, OperationFailure +from pymongo.read_concern import ReadConcern from pymongo.write_concern import WriteConcern -from test import client_context, unittest +from test import client_context, unittest, IntegrationTest from test.utils import rs_or_single_client @@ -841,10 +842,204 @@ class TestSampleShellCommands(unittest.TestCase): {"$set": {"a.$[i].b": 2}}, array_filters=[{"i.b": 0}]) + +class TestTransactionExamples(IntegrationTest): + + @classmethod + @client_context.require_connection + def setUpClass(cls): + super(TestTransactionExamples, cls).setUpClass() + cls.client = rs_or_single_client(w="majority") + @client_context.require_transactions def test_transactions(self): # Transaction examples client = self.client + self.addCleanup(client.drop_database, "hr") + self.addCleanup(client.drop_database, "reporting") + + employees = client.hr.employees + events = client.reporting.events + employees.insert_one({"employee": 3, "status": "Active"}) + events.insert_one( + {"employee": 3, "status": {"new": "Active", "old": None}}) + + # Start Transactions Intro Example 1 + + def update_employee_info(session): + employees_coll = session.client.hr.employees + events_coll = session.client.reporting.events + + with session.start_transaction( + read_concern=ReadConcern("snapshot"), + write_concern=WriteConcern(w="majority")): + employees_coll.update_one( + {"employee": 3}, {"$set": {"status": "Inactive"}}, + session=session) + events_coll.insert_one( + {"employee": 3, "status": { + "new": "Inactive", "old": "Active"}}, + session=session) + + while True: + try: + # Commit uses write concern set at transaction start. + session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label( + "UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying " + "commit operation ...") + continue + else: + print("Error during commit ...") + raise + # End Transactions Intro Example 1 + + with client.start_session() as session: + update_employee_info(session) + + employee = employees.find_one({"employee": 3}) + self.assertIsNotNone(employee) + self.assertEqual(employee['status'], 'Inactive') + + # Start Transactions Retry Example 1 + def run_transaction_with_retry(txn_func, session): + while True: + try: + txn_func(session) # performs transaction + break + except (ConnectionFailure, OperationFailure) as exc: + print("Transaction aborted. Caught exception during " + "transaction.") + + # If transient error, retry the whole transaction + if exc.has_error_label("TransientTransactionError"): + print("TransientTransactionError, retrying" + "transaction ...") + continue + else: + raise + # End Transactions Retry Example 1 + + with client.start_session() as session: + run_transaction_with_retry(update_employee_info, session) + + employee = employees.find_one({"employee": 3}) + self.assertIsNotNone(employee) + self.assertEqual(employee['status'], 'Inactive') + + # Start Transactions Retry Example 2 + def commit_with_retry(session): + while True: + try: + # Commit uses write concern set at transaction start. + session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label("UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying " + "commit operation ...") + continue + else: + print("Error during commit ...") + raise + # End Transactions Retry Example 2 + + # Test commit_with_retry from the previous examples + def _insert_employee_retry_commit(session): + with session.start_transaction(): + employees.insert_one( + {"employee": 4, "status": "Active"}, + session=session) + events.insert_one( + {"employee": 4, "status": {"new": "Active", "old": None}}, + session=session) + + commit_with_retry(session) + + with client.start_session() as session: + run_transaction_with_retry(_insert_employee_retry_commit, session) + + employee = employees.find_one({"employee": 4}) + self.assertIsNotNone(employee) + self.assertEqual(employee['status'], 'Active') + + # Start Transactions Retry Example 3 + + def run_transaction_with_retry(txn_func, session): + while True: + try: + txn_func(session) # performs transaction + break + except (ConnectionFailure, OperationFailure) as exc: + # If transient error, retry the whole transaction + if exc.has_error_label("TransientTransactionError"): + print("TransientTransactionError, retrying " + "transaction ...") + continue + else: + raise + + def commit_with_retry(session): + while True: + try: + # Commit uses write concern set at transaction start. + session.commit_transaction() + print("Transaction committed.") + break + except (ConnectionFailure, OperationFailure) as exc: + # Can retry commit + if exc.has_error_label("UnknownTransactionCommitResult"): + print("UnknownTransactionCommitResult, retrying " + "commit operation ...") + continue + else: + print("Error during commit ...") + raise + + # Updates two collections in a transactions + + def update_employee_info(session): + employees_coll = session.client.hr.employees + events_coll = session.client.reporting.events + + with session.start_transaction( + read_concern=ReadConcern("snapshot"), + write_concern=WriteConcern(w="majority")): + employees_coll.update_one( + {"employee": 3}, {"$set": {"status": "Inactive"}}, + session=session) + events_coll.insert_one( + {"employee": 3, "status": { + "new": "Inactive", "old": "Active"}}, + session=session) + + commit_with_retry(session) + + # Start a session. + with client.start_session() as session: + try: + run_transaction_with_retry(update_employee_info, session) + except Exception as exc: + # Do something with error. + raise + + # End Transactions Retry Example 3 + + employee = employees.find_one({"employee": 3}) + self.assertIsNotNone(employee) + self.assertEqual(employee['status'], 'Inactive') + + @client_context.require_transactions + def test_transactions_beta(self): + # Transaction beta examples + client = self.client self.addCleanup(client.drop_database, "test") db = client.test