PYTHON-1684 Support sharded transactions recovery token (#406)

Transient errors inside transaction unpins the session.
Add brief docs about sharded transactions and add 3.9 changelog.
Tests changes:
Add spec tests for sharded transaction recoveryToken.
Speed up txn tests by reducing SDAM waiting time after a network error.
Remove outdated test workaround for killAllSessions.
This commit is contained in:
Shane Harvey 2019-01-23 14:26:02 -08:00
parent 05565783d9
commit a886922157
34 changed files with 6017 additions and 75 deletions

View File

@ -1,23 +1,14 @@
Changelog
=========
Changes in Version 3.8.0
------------------------
Changes in Version 3.9.0.dev0
-----------------------------
.. warning:: PyMongo no longer supports Python 2.6. RHEL 6 users should install
Python 2.7 or newer from `Red Hat Software Collections
<https://www.softwarecollections.org>`_. CentOS 6 users should install Python
2.7 or newer from `SCL
<https://wiki.centos.org/AdditionalResources/Repositories/SCL>`_
Version 3.9 adds support for MongoDB 4.2. Highlights include:
.. warning:: PyMongo no longer supports PyPy3 versions older than 3.5. Users
must upgrade to PyPy3.5+.
- :class:`~bson.objectid.ObjectId` now implements the `ObjectID specification
version 0.2 <https://github.com/mongodb/specifications/blob/master/source/objectid.rst>`_.
- Version 3.8.0 implements the `URI options specification`_ in the
- Support for MongoDB 4.2 sharded transactions. Sharded transactions have
the same API as replica set transactions. See :ref:`transactions-ref`.
- Implement the `URI options specification`_ in the
:meth:`~pymongo.mongo_client.MongoClient` constructor. Consequently, there are
a number of changes in connection options:
@ -38,9 +29,32 @@ Changes in Version 3.8.0
- ``ssl_pem_passphrase`` has been deprecated in favor of ``tlsCertificateKeyFilePassword``.
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst`
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst
Issues Resolved
...............
See the `PyMongo 3.9 release notes in JIRA`_ for the list of resolved issues
in this release.
.. _PyMongo 3.9 release notes in JIRA: https://jira.mongodb.org/secure/ReleaseNote.jspa?projectId=10004&version=21787
Changes in Version 3.8.0.dev0
-----------------------------
.. warning:: PyMongo no longer supports Python 2.6. RHEL 6 users should install
Python 2.7 or newer from `Red Hat Software Collections
<https://www.softwarecollections.org>`_. CentOS 6 users should install Python
2.7 or newer from `SCL
<https://wiki.centos.org/AdditionalResources/Repositories/SCL>`_
.. warning:: PyMongo no longer supports PyPy3 versions older than 3.5. Users
must upgrade to PyPy3.5+.
- :class:`~bson.objectid.ObjectId` now implements the `ObjectID specification
version 0.2 <https://github.com/mongodb/specifications/blob/master/source/objectid.rst>`_.
Issues Resolved
...............

View File

@ -292,7 +292,7 @@ class _Bulk(object):
if not to_send:
raise InvalidOperation("cannot do an empty bulk write")
result = bwc.write_command(request_id, msg, to_send)
client._receive_cluster_time(result, session)
client._process_response(result, session)
# Retryable writeConcernErrors halt the execution of this run.
wce = result.get('writeConcernError', {})

View File

@ -75,6 +75,19 @@ transactions on the same session can be executed in sequence.
.. versionadded:: 3.7
Sharded Transactions
^^^^^^^^^^^^^^^^^^^^
PyMongo 3.9 adds support for transactions on sharded clusters running MongoDB
4.2. Sharded transactions have the same API as replica set transactions.
When running a transaction against a sharded cluster, the session is
pinned to the mongos server selected for the first operation in the
transaction. All subsequent operations that are part of the same transaction
are routed to the same mongos server. When the transaction is completed, by
running either commitTransaction or abortTransaction, the session is unpinned.
.. versionadded:: 3.9
.. mongodoc:: transactions
Classes
@ -88,6 +101,7 @@ import uuid
from bson.binary import Binary
from bson.int64 import Int64
from bson.py3compat import abc, reraise_instance
from bson.son import SON
from bson.timestamp import Timestamp
from pymongo import monotonic
@ -245,12 +259,19 @@ class _Transaction(object):
def __init__(self, opts):
self.opts = opts
self.state = _TxnState.NONE
self.transaction_id = 0
self.sharded = False
self.pinned_address = None
self.recovery_token = None
def active(self):
return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS)
def reset(self):
self.state = _TxnState.NONE
self.sharded = False
self.pinned_address = None
self.recovery_token = None
def _reraise_with_unknown_commit(exc):
"""Re-raise an exception with the UnknownTransactionCommitResult label."""
@ -367,10 +388,9 @@ class ClientSession(object):
self._transaction.opts = TransactionOptions(
read_concern, write_concern, read_preference)
self._transaction.reset()
self._transaction.state = _TxnState.STARTING
self._start_retryable_write()
self._transaction.transaction_id = self._server_session.transaction_id
self._transaction.pinned_address = None
return _TransactionContext(self)
def commit_transaction(self):
@ -482,15 +502,19 @@ class ClientSession(object):
# subsequent commitTransaction commands should be upgraded to use
# w:"majority" and set a default value of 10 seconds for wtimeout.
wc = self._transaction.opts.write_concern
if retrying and command_name == "commitTransaction":
is_commit = command_name == "commitTransaction"
if retrying and is_commit:
wc_doc = wc.document
wc_doc["w"] = "majority"
wc_doc.setdefault("wtimeout", 10000)
wc = WriteConcern(**wc_doc)
cmd = SON([(command_name, 1)])
if self._transaction.recovery_token and is_commit:
cmd['recoveryToken'] = self._transaction.recovery_token
with self._client._socket_for_writes(self) as sock_info:
return self._client.admin._command(
sock_info,
command_name,
cmd,
session=self,
write_concern=wc,
parse_write_concern_error=True)
@ -539,6 +563,15 @@ class ClientSession(object):
"of bson.timestamp.Timestamp")
self._advance_operation_time(operation_time)
def _process_response(self, reply):
"""Process a response to a command that was run with this session."""
self._advance_cluster_time(reply.get('$clusterTime'))
self._advance_operation_time(reply.get('operationTime'))
if self._in_transaction and self._transaction.sharded:
recovery_token = reply.get('recoveryToken')
if recovery_token:
self._transaction.recovery_token = recovery_token
@property
def has_ended(self):
"""True if this session is finished."""
@ -558,8 +591,13 @@ class ClientSession(object):
def _pin_mongos(self, server):
"""Pin this session to the given mongos Server."""
self._transaction.sharded = True
self._transaction.pinned_address = server.description.address
def _unpin_mongos(self):
"""Unpin this session from any pinned mongos address."""
self._transaction.pinned_address = None
def _txn_read_preference(self):
"""Return read preference of this transaction or None."""
if self._in_transaction:
@ -573,8 +611,7 @@ class ClientSession(object):
command['lsid'] = self._server_session.session_id
if not self._in_transaction:
self._transaction.state = _TxnState.NONE
self._transaction.pinned_address = None
self._transaction.reset()
if is_retryable:
command['txnNumber'] = self._server_session.transaction_id

View File

@ -149,13 +149,13 @@ class CommandCursor(object):
reply = response.data
try:
with client._reset_on_error(self.__address):
with client._reset_on_error(self.__address, self.__session):
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
client._process_response(first, self.__session)
helpers._check_command_response(first)
except OperationFailure as exc:

View File

@ -973,13 +973,13 @@ class Cursor(object):
raise
try:
with client._reset_on_error(self.__address):
with client._reset_on_error(self.__address, self.__session):
docs = self._unpack_response(reply,
self.__id,
self.__collection.codec_options)
if from_command:
first = docs[0]
client._receive_cluster_time(first, self.__session)
client._process_response(first, self.__session)
helpers._check_command_response(first)
except OperationFailure as exc:
self.__killed = True

View File

@ -1112,8 +1112,8 @@ class MongoClient(common.BaseObject):
return self._topology
@contextlib.contextmanager
def _get_socket(self, server):
with self._reset_on_error(server.description.address):
def _get_socket(self, server, session):
with self._reset_on_error(server.description.address, session):
with server.get_socket(self.__all_credentials) as sock_info:
yield sock_info
@ -1128,26 +1128,31 @@ class MongoClient(common.BaseObject):
- `address` (optional): Address when sending a message
to a specific server, used for getMore.
"""
topology = self._get_topology()
address = address or (session and session._pinned_address)
if address:
# We're running a getMore or this session is pinned to a mongos.
server = topology.select_server_by_address(address)
if not server:
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
server = topology.select_server(server_selector)
# Pin this session to the selected server if it's performing a
# sharded transaction.
if server.description.mongos and (session and
session._in_transaction):
session._pin_mongos(server)
return server
try:
topology = self._get_topology()
address = address or (session and session._pinned_address)
if address:
# We're running a getMore or this session is pinned to a mongos.
server = topology.select_server_by_address(address)
if not server:
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
server = topology.select_server(server_selector)
# Pin this session to the selected server if it's performing a
# sharded transaction.
if server.description.mongos and (session and
session._in_transaction):
session._pin_mongos(server)
return server
except PyMongoError as exc:
if session and exc.has_error_label("TransientTransactionError"):
session._unpin_mongos()
raise
def _socket_for_writes(self, session):
server = self._select_server(writable_server_selector, session)
return self._get_socket(server)
return self._get_socket(server, session)
@contextlib.contextmanager
def _socket_for_reads(self, read_preference, session):
@ -1162,7 +1167,7 @@ class MongoClient(common.BaseObject):
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
server = self._select_server(read_preference, session)
with self._get_socket(server) as sock_info:
with self._get_socket(server, session) as sock_info:
slave_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
yield sock_info, slave_ok
@ -1194,7 +1199,8 @@ class MongoClient(common.BaseObject):
and server.description.server_type != SERVER_TYPE.Mongos) or (
operation.read_preference != ReadPreference.PRIMARY)
with self._reset_on_error(server.description.address):
with self._reset_on_error(server.description.address,
operation.session):
return server.send_message_with_response(
operation,
set_slave_ok,
@ -1203,14 +1209,20 @@ class MongoClient(common.BaseObject):
exhaust)
@contextlib.contextmanager
def _reset_on_error(self, server_address):
def _reset_on_error(self, server_address, session):
"""On "not master" or "node is recovering" errors reset the server
according to the SDAM spec.
Unpin the session on transient transaction errors.
"""
try:
yield
try:
yield
except PyMongoError as exc:
if session and exc.has_error_label(
"TransientTransactionError"):
session._unpin_mongos()
raise
except NetworkTimeout:
# The socket has been closed. Don't reset the server.
# Server Discovery And Monitoring Spec: "When an application
@ -1264,7 +1276,7 @@ class MongoClient(common.BaseObject):
supports_session = (
session is not None and
server.description.retryable_writes_supported)
with self._get_socket(server) as sock_info:
with self._get_socket(server, session) as sock_info:
if retryable and not supports_session:
if is_retrying():
# A retry is not possible because this server does
@ -1674,12 +1686,10 @@ class MongoClient(common.BaseObject):
if cluster_time:
command['$clusterTime'] = cluster_time
def _receive_cluster_time(self, reply, session):
cluster_time = reply.get('$clusterTime')
self._topology.receive_cluster_time(cluster_time)
def _process_response(self, reply, session):
self._topology.receive_cluster_time(reply.get('$clusterTime'))
if session is not None:
session._advance_cluster_time(cluster_time)
session._advance_operation_time(reply.get("operationTime"))
session._process_response(reply)
def server_info(self, session=None):
"""Get information about the MongoDB server we're connected to.

View File

@ -143,7 +143,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
response_doc = unpacked_docs[0]
if client:
client._receive_cluster_time(response_doc, session)
client._process_response(response_doc, session)
if check:
helpers._check_command_response(
response_doc, None, allowable_errors,

View File

@ -33,10 +33,11 @@ from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.results import _WriteResult, BulkWriteResult
from test import unittest, client_context, IntegrationTest
from test import unittest, client_context, IntegrationTest, client_knobs
from test.utils import (camel_to_snake, camel_to_upper_camel,
camel_to_snake_args, rs_client, single_client,
wait_until, OvertCommandListener, TestCreator)
wait_until, CompareType, OvertCommandListener,
TestCreator)
from test.utils_selection_tests import parse_read_preference
# Location of JSON test specifications.
@ -56,11 +57,19 @@ class TestTransactions(IntegrationTest):
@classmethod
def setUpClass(cls):
super(TestTransactions, cls).setUpClass()
# Speed up tests by reducing SDAM waiting time after a network error.
cls.knobs = client_knobs(min_heartbeat_interval=0.1)
cls.knobs.enable()
cls.mongos_clients = []
if client_context.supports_transactions():
for address in client_context.mongoses:
cls.mongos_clients.append(single_client('%s:%s' % address))
@classmethod
def tearDownClass(cls):
cls.knobs.disable()
super(TestTransactions, cls).tearDownClass()
def transaction_test_debug(self, msg):
if _TXN_TESTS_DEBUG:
print(msg)
@ -75,21 +84,45 @@ class TestTransactions(IntegrationTest):
exc.has_error_label(label),
msg='error labels should not contain %s' % (label,))
def _set_fail_point(self, client, command_args):
cmd = SON([('configureFailPoint', 'failCommand')])
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
cmd = SON([('configureFailPoint', 'failCommand')])
cmd.update(command_args)
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
client.admin.command(cmd)
self._set_fail_point(client, cmd)
def targeted_fail_point(self, session, fail_point):
"""Run the targetedFailPoint test operation.
Enable the fail point on the session's pinned mongos.
"""
clients = {c.address: c for c in self.mongos_clients}
client = clients[session._pinned_address]
self._set_fail_point(client, fail_point)
def assert_session_pinned(self, session):
"""Assert that the given session is pinned."""
self.assertIsNotNone(session._transaction.pinned_address)
def assert_session_unpinned(self, session):
"""Assert that the given session is not pinned."""
self.assertIsNone(session._pinned_address)
self.assertIsNone(session._transaction.pinned_address)
def kill_all_sessions(self):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
# Run killAllSessions without an implicit session to work
# around SERVER-38335.
with client._socket_for_writes(None) as sock_info:
spec = SON([('killAllSessions', [])])
sock_info.command('admin', spec, client=client)
try:
client.admin.command('killAllSessions', [])
except OperationFailure:
# "operation was interrupted" by killing the command's
# own session.
pass
@client_context.require_transactions
def test_transaction_options_validation(self):
@ -297,7 +330,11 @@ class TestTransactions(IntegrationTest):
collection = collection.with_options(
**dict(parse_options(operation['collectionOptions'])))
objects = {'database': database, 'collection': collection}
objects = {
'database': database,
'collection': collection,
'testRunner': self
}
objects.update(sessions)
obj = objects[operation['object']]
@ -385,6 +422,9 @@ class TestTransactions(IntegrationTest):
'readConcern', {}).get('afterClusterTime')
if actual_time is not None:
expected_read_concern['afterClusterTime'] = actual_time
recovery_token = expected_cmd.get('recoveryToken')
if recovery_token == 42:
expected_cmd['recoveryToken'] = CompareType(dict)
# Replace lsid with a name like "session0" to match test.
if 'lsid' in event.command:
@ -454,6 +494,8 @@ def end_sessions(sessions):
def create_test(scenario_def, test, name):
@client_context.require_transactions
@client_context.require_cluster_type(
scenario_def.get('topology', ['single', 'replicaset', 'sharded']))
def run_scenario(self):
if test.get('skipReason'):
raise unittest.SkipTest(test.get('skipReason'))
@ -463,7 +505,8 @@ def create_test(scenario_def, test, name):
# Convert test['clientOptions'] to dict to avoid a Jython bug using
# "**" with ScenarioDict.
client_options = dict(test['clientOptions'])
if client_context.is_mongos:
use_multi_mongos = test['useMultipleMongoses']
if client_context.is_mongos and use_multi_mongos:
client = rs_client(client_context.mongos_seeds(),
event_listeners=[listener], **client_options)
else:
@ -571,9 +614,8 @@ def create_test(scenario_def, test, name):
self.check_events(test, listener, session_ids)
# Disable fail points.
if 'failPoint' in test:
self.set_fail_point({
'configureFailPoint': 'failCommand', 'mode': 'off'})
self.set_fail_point({
'configureFailPoint': 'failCommand', 'mode': 'off'})
# Assert final state is expected.
expected_c = test['outcome'].get('collection')
@ -587,7 +629,8 @@ def create_test(scenario_def, test, name):
if 'secondary' in name:
run_scenario = client_context._require(
lambda: client_context.has_secondaries, 'No secondaries',
lambda: client_context.has_secondaries or client_context.is_mongos,
'No secondaries',
run_scenario)
return run_scenario

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],
@ -356,7 +360,6 @@
},
{
"description": "write concern error on commit",
"skipReason": "SERVER-37458 Mongos does not yet support writeConcern on commit",
"operations": [
{
"name": "startTransaction",

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,501 @@
{
"topology": [
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],
"tests": [
{
"description": "commitTransaction explicit retries include recoveryToken",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 1
}
},
"result": {
"insertedId": 1
}
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": {
"w": "majority",
"wtimeout": 10000
},
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": {
"w": "majority",
"wtimeout": 10000
},
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
}
]
}
}
},
{
"description": "commitTransaction retry succeeds on new mongos",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
"object": "session0",
"arguments": {
"options": {
"writeConcern": {
"w": "majority"
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 1
}
},
"result": {
"insertedId": 1
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"commitTransaction"
],
"writeConcernError": {
"code": 91,
"errmsg": "Replication is being shut down"
}
}
}
}
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": {
"w": "majority"
},
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": {
"w": "majority",
"wtimeout": 10000
},
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
}
]
}
}
},
{
"description": "commitTransaction retry fails on new mongos",
"useMultipleMongoses": true,
"clientOptions": {
"heartbeatFrequencyMS": 5000
},
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 1
}
},
"result": {
"insertedId": 1
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 7
},
"data": {
"failCommands": [
"commitTransaction",
"isMaster"
],
"closeConnection": true
}
}
}
},
{
"name": "commitTransaction",
"object": "session0",
"result": {
"errorLabelsContain": [
"TransientTransactionError"
],
"errorLabelsOmit": [
"UnknownTransactionCommitResult"
],
"errorCodeName": "NoSuchTransaction"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": {
"w": "majority",
"wtimeout": 10000
},
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
},
{
"description": "abortTransaction does not send recoveryToken",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 1
}
},
"result": {
"insertedId": 1
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"abortTransaction"
],
"closeConnection": true
}
}
}
},
{
"name": "abortTransaction",
"object": "session0"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"abortTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": null
},
"command_name": "abortTransaction",
"database_name": "admin"
}
},
{
"command_started_event": {
"command": {
"abortTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": null
},
"command_name": "abortTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
}
]
}

View File

@ -1,4 +1,7 @@
{
"topology": [
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [
@ -12,6 +15,7 @@
"tests": [
{
"description": "countDocuments",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -125,6 +129,7 @@
},
{
"description": "distinct",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -246,6 +251,7 @@
},
{
"description": "find",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -391,6 +397,7 @@
},
{
"description": "insertOne",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -544,6 +551,7 @@
},
{
"description": "mixed read write operations",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -704,6 +712,7 @@
},
{
"description": "multiple commits",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
@ -730,6 +739,24 @@
}
}
},
{
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
},
{
"name": "commitTransaction",
"object": "session0"
@ -767,8 +794,11 @@
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
@ -789,6 +819,404 @@
]
}
}
},
{
"description": "remain pinned after non-transient error on commit",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertMany",
"object": "collection",
"arguments": {
"documents": [
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 3,
"1": 4
}
}
},
{
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"commitTransaction"
],
"errorCode": 50
}
}
}
},
{
"name": "commitTransaction",
"object": "session0",
"result": {
"errorLabelsOmit": [
"TransientTransactionError"
],
"errorCode": 50
}
},
{
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "assertSessionPinned",
"object": "testRunner",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
}
}
},
{
"description": "unpin after transient error within a transaction",
"useMultipleMongoses": true,
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 3
}
},
"result": {
"insertedId": 3
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 1
},
"data": {
"failCommands": [
"insert"
],
"closeConnection": true
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 4
}
},
"result": {
"errorLabelsContain": [
"TransientTransactionError"
],
"errorLabelsOmit": [
"UnknownTransactionCommitResult"
]
}
},
{
"name": "abortTransaction",
"object": "session0"
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 3
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 4
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"abortTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": null
},
"command_name": "abortTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
}
]
}
}
},
{
"description": "unpin after transient error within a transaction and commit",
"useMultipleMongoses": true,
"clientOptions": {
"heartbeatFrequencyMS": 5000
},
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 3
}
},
"result": {
"insertedId": 3
}
},
{
"name": "targetedFailPoint",
"object": "testRunner",
"arguments": {
"session": "session0",
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 7
},
"data": {
"failCommands": [
"insert",
"isMaster"
],
"closeConnection": true
}
}
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"session": "session0",
"document": {
"_id": 4
}
},
"result": {
"errorLabelsContain": [
"TransientTransactionError"
],
"errorLabelsOmit": [
"UnknownTransactionCommitResult"
]
}
},
{
"name": "commitTransaction",
"object": "session0",
"result": {
"errorLabelsContain": [
"TransientTransactionError"
],
"errorLabelsOmit": [
"UnknownTransactionCommitResult"
],
"errorCodeName": "NoSuchTransaction"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 3
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 4
}
],
"ordered": true,
"readConcern": null,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"writeConcern": null,
"recoveryToken": 42
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
}
]
}
}
}
]
}

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [

View File

@ -1,4 +1,8 @@
{
"topology": [
"replicaset",
"sharded"
],
"database_name": "transaction-tests",
"collection_name": "test",
"data": [],

View File

@ -147,6 +147,19 @@ class ScenarioDict(dict):
return ScenarioDict({})
class CompareType(object):
"""Class that compares equal to any object of the given type."""
def __init__(self, type):
self.type = type
def __eq__(self, other):
return isinstance(other, self.type)
def __ne__(self, other):
"""Needed for Python 2."""
return not self.__eq__(other)
class TestCreator(object):
"""Class to create test cases from specifications."""
def __init__(self, create_test, test_class, test_path):