PYTHON-1562 Add transaction examples for docs
This commit is contained in:
parent
3d8155d87d
commit
96aaf2f527
@ -21,8 +21,9 @@ sys.path[0:0] = [""]
|
||||
|
||||
import pymongo
|
||||
from pymongo.errors import ConnectionFailure, OperationFailure
|
||||
from pymongo.read_concern import ReadConcern
|
||||
from pymongo.write_concern import WriteConcern
|
||||
from test import client_context, unittest
|
||||
from test import client_context, unittest, IntegrationTest
|
||||
from test.utils import rs_or_single_client
|
||||
|
||||
|
||||
@ -841,10 +842,204 @@ class TestSampleShellCommands(unittest.TestCase):
|
||||
{"$set": {"a.$[i].b": 2}},
|
||||
array_filters=[{"i.b": 0}])
|
||||
|
||||
|
||||
class TestTransactionExamples(IntegrationTest):
|
||||
|
||||
@classmethod
|
||||
@client_context.require_connection
|
||||
def setUpClass(cls):
|
||||
super(TestTransactionExamples, cls).setUpClass()
|
||||
cls.client = rs_or_single_client(w="majority")
|
||||
|
||||
@client_context.require_transactions
|
||||
def test_transactions(self):
|
||||
# Transaction examples
|
||||
client = self.client
|
||||
self.addCleanup(client.drop_database, "hr")
|
||||
self.addCleanup(client.drop_database, "reporting")
|
||||
|
||||
employees = client.hr.employees
|
||||
events = client.reporting.events
|
||||
employees.insert_one({"employee": 3, "status": "Active"})
|
||||
events.insert_one(
|
||||
{"employee": 3, "status": {"new": "Active", "old": None}})
|
||||
|
||||
# Start Transactions Intro Example 1
|
||||
|
||||
def update_employee_info(session):
|
||||
employees_coll = session.client.hr.employees
|
||||
events_coll = session.client.reporting.events
|
||||
|
||||
with session.start_transaction(
|
||||
read_concern=ReadConcern("snapshot"),
|
||||
write_concern=WriteConcern(w="majority")):
|
||||
employees_coll.update_one(
|
||||
{"employee": 3}, {"$set": {"status": "Inactive"}},
|
||||
session=session)
|
||||
events_coll.insert_one(
|
||||
{"employee": 3, "status": {
|
||||
"new": "Inactive", "old": "Active"}},
|
||||
session=session)
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Commit uses write concern set at transaction start.
|
||||
session.commit_transaction()
|
||||
print("Transaction committed.")
|
||||
break
|
||||
except (ConnectionFailure, OperationFailure) as exc:
|
||||
# Can retry commit
|
||||
if exc.has_error_label(
|
||||
"UnknownTransactionCommitResult"):
|
||||
print("UnknownTransactionCommitResult, retrying "
|
||||
"commit operation ...")
|
||||
continue
|
||||
else:
|
||||
print("Error during commit ...")
|
||||
raise
|
||||
# End Transactions Intro Example 1
|
||||
|
||||
with client.start_session() as session:
|
||||
update_employee_info(session)
|
||||
|
||||
employee = employees.find_one({"employee": 3})
|
||||
self.assertIsNotNone(employee)
|
||||
self.assertEqual(employee['status'], 'Inactive')
|
||||
|
||||
# Start Transactions Retry Example 1
|
||||
def run_transaction_with_retry(txn_func, session):
|
||||
while True:
|
||||
try:
|
||||
txn_func(session) # performs transaction
|
||||
break
|
||||
except (ConnectionFailure, OperationFailure) as exc:
|
||||
print("Transaction aborted. Caught exception during "
|
||||
"transaction.")
|
||||
|
||||
# If transient error, retry the whole transaction
|
||||
if exc.has_error_label("TransientTransactionError"):
|
||||
print("TransientTransactionError, retrying"
|
||||
"transaction ...")
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
# End Transactions Retry Example 1
|
||||
|
||||
with client.start_session() as session:
|
||||
run_transaction_with_retry(update_employee_info, session)
|
||||
|
||||
employee = employees.find_one({"employee": 3})
|
||||
self.assertIsNotNone(employee)
|
||||
self.assertEqual(employee['status'], 'Inactive')
|
||||
|
||||
# Start Transactions Retry Example 2
|
||||
def commit_with_retry(session):
|
||||
while True:
|
||||
try:
|
||||
# Commit uses write concern set at transaction start.
|
||||
session.commit_transaction()
|
||||
print("Transaction committed.")
|
||||
break
|
||||
except (ConnectionFailure, OperationFailure) as exc:
|
||||
# Can retry commit
|
||||
if exc.has_error_label("UnknownTransactionCommitResult"):
|
||||
print("UnknownTransactionCommitResult, retrying "
|
||||
"commit operation ...")
|
||||
continue
|
||||
else:
|
||||
print("Error during commit ...")
|
||||
raise
|
||||
# End Transactions Retry Example 2
|
||||
|
||||
# Test commit_with_retry from the previous examples
|
||||
def _insert_employee_retry_commit(session):
|
||||
with session.start_transaction():
|
||||
employees.insert_one(
|
||||
{"employee": 4, "status": "Active"},
|
||||
session=session)
|
||||
events.insert_one(
|
||||
{"employee": 4, "status": {"new": "Active", "old": None}},
|
||||
session=session)
|
||||
|
||||
commit_with_retry(session)
|
||||
|
||||
with client.start_session() as session:
|
||||
run_transaction_with_retry(_insert_employee_retry_commit, session)
|
||||
|
||||
employee = employees.find_one({"employee": 4})
|
||||
self.assertIsNotNone(employee)
|
||||
self.assertEqual(employee['status'], 'Active')
|
||||
|
||||
# Start Transactions Retry Example 3
|
||||
|
||||
def run_transaction_with_retry(txn_func, session):
|
||||
while True:
|
||||
try:
|
||||
txn_func(session) # performs transaction
|
||||
break
|
||||
except (ConnectionFailure, OperationFailure) as exc:
|
||||
# If transient error, retry the whole transaction
|
||||
if exc.has_error_label("TransientTransactionError"):
|
||||
print("TransientTransactionError, retrying "
|
||||
"transaction ...")
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
def commit_with_retry(session):
|
||||
while True:
|
||||
try:
|
||||
# Commit uses write concern set at transaction start.
|
||||
session.commit_transaction()
|
||||
print("Transaction committed.")
|
||||
break
|
||||
except (ConnectionFailure, OperationFailure) as exc:
|
||||
# Can retry commit
|
||||
if exc.has_error_label("UnknownTransactionCommitResult"):
|
||||
print("UnknownTransactionCommitResult, retrying "
|
||||
"commit operation ...")
|
||||
continue
|
||||
else:
|
||||
print("Error during commit ...")
|
||||
raise
|
||||
|
||||
# Updates two collections in a transactions
|
||||
|
||||
def update_employee_info(session):
|
||||
employees_coll = session.client.hr.employees
|
||||
events_coll = session.client.reporting.events
|
||||
|
||||
with session.start_transaction(
|
||||
read_concern=ReadConcern("snapshot"),
|
||||
write_concern=WriteConcern(w="majority")):
|
||||
employees_coll.update_one(
|
||||
{"employee": 3}, {"$set": {"status": "Inactive"}},
|
||||
session=session)
|
||||
events_coll.insert_one(
|
||||
{"employee": 3, "status": {
|
||||
"new": "Inactive", "old": "Active"}},
|
||||
session=session)
|
||||
|
||||
commit_with_retry(session)
|
||||
|
||||
# Start a session.
|
||||
with client.start_session() as session:
|
||||
try:
|
||||
run_transaction_with_retry(update_employee_info, session)
|
||||
except Exception as exc:
|
||||
# Do something with error.
|
||||
raise
|
||||
|
||||
# End Transactions Retry Example 3
|
||||
|
||||
employee = employees.find_one({"employee": 3})
|
||||
self.assertIsNotNone(employee)
|
||||
self.assertEqual(employee['status'], 'Inactive')
|
||||
|
||||
@client_context.require_transactions
|
||||
def test_transactions_beta(self):
|
||||
# Transaction beta examples
|
||||
client = self.client
|
||||
self.addCleanup(client.drop_database, "test")
|
||||
|
||||
db = client.test
|
||||
|
||||
Loading…
Reference in New Issue
Block a user