PYTHON-1673 Mongos pinning for sharded transactions

In a sharded transaction, a session is pinned to the mongos server
selected for the initial command. All subsequent commands in the same
transaction are routed to the pinned mongos server.
This commit is contained in:
Shane Harvey 2018-10-01 15:49:34 -07:00
parent 9902d239b4
commit 6bab444bd7
14 changed files with 1002 additions and 60 deletions

View File

@ -515,7 +515,7 @@ class _Bulk(object):
client = self.collection.database.client
if not write_concern.acknowledged:
with client._socket_for_writes() as sock_info:
with client._socket_for_writes(session) as sock_info:
self.execute_no_results(sock_info, generator)
else:
return self.execute_command(generator, write_concern, session)

View File

@ -107,7 +107,8 @@ class ChangeStream(object):
"""
read_preference = self._target._read_preference_for(session)
client = self._database.client
with client._socket_for_reads(read_preference) as (sock_info, slave_ok):
with client._socket_for_reads(
read_preference, session) as (sock_info, slave_ok):
pipeline = self._full_pipeline()
cmd = SON([("aggregate", self._aggregation_target),
("pipeline", pipeline),

View File

@ -246,6 +246,7 @@ class _Transaction(object):
self.opts = opts
self.state = _TxnState.NONE
self.transaction_id = 0
self.pinned_address = None
def active(self):
return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS)
@ -369,6 +370,7 @@ class ClientSession(object):
self._transaction.state = _TxnState.STARTING
self._start_retryable_write()
self._transaction.transaction_id = self._server_session.transaction_id
self._transaction.pinned_address = None
return _TransactionContext(self)
def commit_transaction(self):
@ -388,6 +390,10 @@ class ClientSession(object):
elif state is _TxnState.ABORTED:
raise InvalidOperation(
"Cannot call commitTransaction after calling abortTransaction")
elif state is _TxnState.COMMITTED:
# We're rerunning the commit, move the state back to "in progress"
# so that _in_transaction returns true.
self._transaction.state = _TxnState.IN_PROGRESS
try:
self._finish_transaction_with_retry("commitTransaction")
@ -441,12 +447,10 @@ class ClientSession(object):
self._transaction.state = _TxnState.ABORTED
def _finish_transaction(self, command_name):
with self._client._socket_for_writes() as sock_info:
with self._client._socket_for_writes(self) as sock_info:
return self._client.admin._command(
sock_info,
command_name,
txnNumber=self._transaction.transaction_id,
autocommit=False,
session=self,
write_concern=self._transaction.opts.write_concern,
parse_write_concern_error=True)
@ -528,6 +532,17 @@ class ClientSession(object):
"""True if this session has an active multi-statement transaction."""
return self._transaction.active()
@property
def _pinned_address(self):
"""The mongos address this transaction was created on."""
if self._transaction.active():
return self._transaction.pinned_address
return None
def _pin_mongos(self, server):
"""Pin this session to the given mongos Server."""
self._transaction.pinned_address = server.description.address
def _txn_read_preference(self):
"""Return read preference of this transaction or None."""
if self._in_transaction:
@ -542,6 +557,7 @@ class ClientSession(object):
if not self._in_transaction:
self._transaction.state = _TxnState.NONE
self._transaction.pinned_address = None
if is_retryable:
command['txnNumber'] = self._server_session.transaction_id

View File

@ -185,15 +185,16 @@ class Collection(common.BaseObject):
def _socket_for_reads(self, session):
return self.__database.client._socket_for_reads(
self._read_preference_for(session))
self._read_preference_for(session), session)
def _socket_for_primary_reads(self, session):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
return self.__database.client._socket_for_reads(read_pref), read_pref
return self.__database.client._socket_for_reads(
read_pref, session), read_pref
def _socket_for_writes(self):
return self.__database.client._socket_for_writes()
def _socket_for_writes(self, session):
return self.__database.client._socket_for_writes(session)
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
@ -251,7 +252,7 @@ class Collection(common.BaseObject):
if "size" in options:
options["size"] = float(options["size"])
cmd.update(options)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self._write_concern_for(session),
@ -1804,7 +1805,7 @@ class Collection(common.BaseObject):
"""
common.validate_list('indexes', indexes)
names = []
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
supports_collations = sock_info.max_wire_version >= 5
def gen_indexes():
for index in indexes:
@ -1844,7 +1845,7 @@ class Collection(common.BaseObject):
index_options.pop('collation', None))
index.update(index_options)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
if collation is not None:
if sock_info.max_wire_version < 5:
raise ConfigurationError(
@ -2070,7 +2071,7 @@ class Collection(common.BaseObject):
self.__database.name, self.__name, name)
cmd = SON([("dropIndexes", self.__name), ("index", name)])
cmd.update(kwargs)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
self._command(sock_info,
cmd,
read_preference=ReadPreference.PRIMARY,
@ -2106,7 +2107,7 @@ class Collection(common.BaseObject):
"""
cmd = SON([("reIndex", self.__name)])
cmd.update(kwargs)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
return self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
session=session)
@ -2606,7 +2607,7 @@ class Collection(common.BaseObject):
cmd.update(kwargs)
write_concern = self._write_concern_for_cmd(cmd, session)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
with self.__database.client._tmp_session(session) as s:
return sock_info.command(
'admin', cmd,

View File

@ -608,7 +608,7 @@ class Database(common.BaseObject):
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
read_preference) as (sock_info, slave_ok):
read_preference, session) as (sock_info, slave_ok):
return self._command(sock_info, command, slave_ok, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
@ -671,7 +671,7 @@ class Database(common.BaseObject):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
read_pref) as (sock_info, slave_okay):
read_pref, session) as (sock_info, slave_okay):
return self._list_collections(
sock_info, slave_okay, session, read_preference=read_pref,
**kwargs)
@ -745,7 +745,7 @@ class Database(common.BaseObject):
self.__client._purge_index(self.__name, name)
with self.__client._socket_for_writes() as sock_info:
with self.__client._socket_for_writes(session) as sock_info:
return self._command(
sock_info, 'drop', value=_unicode(name),
allowable_errors=['ns not found'],
@ -826,7 +826,7 @@ class Database(common.BaseObject):
Added ``session`` parameter.
"""
cmd = SON([("currentOp", 1), ("$all", include_all)])
with self.__client._socket_for_writes() as sock_info:
with self.__client._socket_for_writes(session) as sock_info:
if sock_info.max_wire_version >= 4:
with self.__client._tmp_session(session) as s:
return sock_info.command("admin", cmd, session=s,

View File

@ -1000,7 +1000,8 @@ class MongoClient(common.BaseObject):
# Use SocketInfo.command directly to avoid implicitly creating
# another session.
with self._socket_for_reads(
ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok):
ReadPreference.PRIMARY_PREFERRED,
None) as (sock_info, slave_ok):
if not sock_info.supports_sessions:
return
@ -1105,12 +1106,40 @@ class MongoClient(common.BaseObject):
self.__reset_server(server.description.address)
raise
def _socket_for_writes(self):
server = self._get_topology().select_server(writable_server_selector)
def _select_server(self, server_selector, session, address=None):
"""Select a server to run an operation on this client.
:Parameters:
- `server_selector`: The server selector to use if the session is
not pinned and no address is given.
- `session`: The ClientSession for the next operation, or None. May
be pinned to a mongos server address.
- `address` (optional): Address when sending a message
to a specific server, used for getMore.
"""
topology = self._get_topology()
address = address or (session and session._pinned_address)
if address:
# We're running a getMore or this session is pinned to a mongos.
server = topology.select_server_by_address(address)
if not server:
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
server = topology.select_server(server_selector)
# Pin this session to the selected server if it's performing a
# sharded transaction.
if server.description.mongos and (session and
session._in_transaction):
session._pin_mongos(server)
return server
def _socket_for_writes(self, session):
server = self._select_server(writable_server_selector, session)
return self._get_socket(server)
@contextlib.contextmanager
def _socket_for_reads(self, read_preference):
def _socket_for_reads(self, read_preference, session):
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
@ -1120,7 +1149,7 @@ class MongoClient(common.BaseObject):
# Thread safe: if the type is single it cannot change.
topology = self._get_topology()
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
server = topology.select_server(read_preference)
server = self._select_server(read_preference, session)
with self._get_socket(server) as sock_info:
slave_ok = (single and not sock_info.is_mongos) or (
@ -1139,14 +1168,9 @@ class MongoClient(common.BaseObject):
- `address` (optional): Optional address when sending a message
to a specific server, used for getMore.
"""
server = self._select_server(
operation.read_preference, operation.session, address=address)
topology = self._get_topology()
if address:
server = topology.select_server_by_address(address)
if not server:
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
server = topology.select_server(operation.read_preference)
# If this is a direct connection to a mongod, *always* set the slaveOk
# bit. See bullet point 2 in server-selection.rst#topology-type-single.
@ -1206,8 +1230,7 @@ class MongoClient(common.BaseObject):
while True:
try:
server = self._get_topology().select_server(
writable_server_selector)
server = self._select_server(writable_server_selector, session)
supports_session = (
session is not None and
server.description.retryable_writes_supported)
@ -1736,7 +1759,7 @@ class MongoClient(common.BaseObject):
"of %s or a Database" % (string_type.__name__,))
self._purge_index(name)
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
self[name]._command(
sock_info,
"dropDatabase",
@ -1877,7 +1900,7 @@ class MongoClient(common.BaseObject):
Added ``session`` parameter.
"""
cmd = SON([("fsyncUnlock", 1)])
with self._socket_for_writes() as sock_info:
with self._socket_for_writes(session) as sock_info:
if sock_info.max_wire_version >= 4:
try:
with self._tmp_session(session) as s:

View File

@ -187,6 +187,10 @@ class ServerDescription(object):
def is_readable(self):
return self._is_readable
@property
def mongos(self):
return self._server_type == SERVER_TYPE.Mongos
@property
def is_server_type_known(self):
return self.server_type != SERVER_TYPE.Unknown

View File

@ -162,6 +162,7 @@ class ClientContext(object):
self.auth_enabled = False
self.test_commands_enabled = False
self.is_mongos = False
self.mongoses = []
self.is_rs = False
self.has_ipv6 = False
self.ssl = False
@ -295,6 +296,17 @@ class ClientContext(object):
self.is_mongos = (self.ismaster.get('msg') == 'isdbgrid')
self.has_ipv6 = self._server_started_with_ipv6()
if self.is_mongos:
# Check for another mongos on the next port.
address = self.client.address
next_address = address[0], address[1] + 1
self.mongoses.append(address)
mongos_client = self._connect(*next_address,
**self.default_client_options)
if mongos_client:
ismaster = mongos_client.admin.command('ismaster')
if ismaster.get('msg') == 'isdbgrid':
self.mongoses.append(next_address)
def init(self):
with self.conn_lock:
@ -507,6 +519,13 @@ class ClientContext(object):
"Must be connected to a mongos",
func=func)
def require_multiple_mongoses(self, func):
"""Run a test only if the client is connected to a sharded cluster
that has 2 mongos nodes."""
return self._require(lambda: len(self.mongoses) > 1,
"Must have multiple mongoses available",
func=func)
def require_standalone(self, func):
"""Run a test only if the client is connected to a standalone."""
return self._require(lambda: not (self.is_mongos or self.is_rs),
@ -586,13 +605,25 @@ class ClientContext(object):
"Sessions not supported",
func=func)
def supports_transactions(self):
if self.version.at_least(4, 1, 6):
return self.is_mongos or self.is_rs
if self.version.at_least(4, 0):
return self.is_rs
return False
def require_transactions(self, func):
"""Run a test only if the deployment might support transactions.
*Might* because this does not test the storage engine or FCV.
"""
new_func = self.require_version_min(4, 0, 0, -1)(func)
return self.require_replica_set(new_func)
return self._require(self.supports_transactions,
"Transactions are not supported",
func=func)
def mongos_seeds(self):
return ','.join('%s:%s' % address for address in self.mongoses)
@property
def supports_reindex(self):

View File

@ -318,9 +318,9 @@ class ReadPrefTester(MongoClient):
super(ReadPrefTester, self).__init__(*args, **client_options)
@contextlib.contextmanager
def _socket_for_reads(self, read_preference):
def _socket_for_reads(self, read_preference, session):
context = super(ReadPrefTester, self)._socket_for_reads(
read_preference)
read_preference, session)
with context as (sock_info, slave_ok):
self.record_a_read(sock_info.address)
yield sock_info, slave_ok

View File

@ -38,7 +38,7 @@ from pymongo.read_preferences import ReadPreference
from pymongo.results import _WriteResult, BulkWriteResult
from test import unittest, client_context, IntegrationTest
from test.utils import OvertCommandListener, rs_client
from test.utils import OvertCommandListener, rs_client, single_client
from test.utils_selection_tests import parse_read_preference
# Location of JSON test specifications.
@ -47,6 +47,11 @@ _TEST_PATH = os.path.join(
_TXN_TESTS_DEBUG = os.environ.get('TRANSACTION_TESTS_DEBUG')
# Max number of operations to perform after a transaction to prove unpinning
# occurs. Chosen so that there's a low false positive rate. With 2 mongoses,
# 20 attempts yields a 1 in 1048576 chance of a false positive (1/(0.5^20)).
UNPIN_TEST_MAX_ATTEMPTS = 20
# TODO: factor the following functions with test_crud.py.
def camel_to_snake(camel):
@ -67,6 +72,15 @@ def camel_to_snake_args(arguments):
class TestTransactions(IntegrationTest):
@classmethod
def setUpClass(cls):
super(TestTransactions, cls).setUpClass()
cls.mongos_clients = []
if client_context.supports_transactions():
for address in client_context.mongoses:
cls.mongos_clients.append(single_client('%s:%s' % address))
def transaction_test_debug(self, msg):
if _TXN_TESTS_DEBUG:
print(msg)
@ -84,7 +98,18 @@ class TestTransactions(IntegrationTest):
def set_fail_point(self, command_args):
cmd = SON([('configureFailPoint', 'failCommand')])
cmd.update(command_args)
self.client.admin.command(cmd)
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
client.admin.command(cmd)
def kill_all_sessions(self):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
# Run killAllSessions without an implicit session to work
# around SERVER-38335.
with client._socket_for_writes(None) as sock_info:
spec = SON([('killAllSessions', [])])
sock_info.command('admin', spec, client=client)
@client_context.require_transactions
def test_transaction_options_validation(self):
@ -111,6 +136,7 @@ class TestTransactions(IntegrationTest):
def test_transaction_write_concern_override(self):
"""Test txn overrides Client/Database/Collection write_concern."""
client = rs_client(w=0)
self.addCleanup(client.close)
db = client.test
coll = db.test
coll.insert_one({})
@ -158,6 +184,49 @@ class TestTransactions(IntegrationTest):
op(*args, **kwargs)
s.abort_transaction()
@client_context.require_transactions
@client_context.require_multiple_mongoses
def test_unpin_for_next_transaction(self):
client = rs_client(client_context.mongos_seeds())
self.addCleanup(client.close)
with client.start_session() as s:
# Session is pinned to Mongos.
with s.start_transaction():
client.test.test.insert_one({}, session=s)
addresses = set()
for _ in range(UNPIN_TEST_MAX_ATTEMPTS):
with s.start_transaction():
cursor = client.test.test.find({}, session=s)
self.assertTrue(next(cursor))
addresses.add(cursor.address)
# Break early if we can.
if len(addresses) > 1:
break
self.assertGreater(len(addresses), 1)
@client_context.require_transactions
@client_context.require_multiple_mongoses
def test_unpin_for_non_transaction_operation(self):
client = rs_client(client_context.mongos_seeds())
self.addCleanup(client.close)
with client.start_session() as s:
# Session is pinned to Mongos.
with s.start_transaction():
client.test.test.insert_one({}, session=s)
addresses = set()
for _ in range(UNPIN_TEST_MAX_ATTEMPTS):
cursor = client.test.test.find({}, session=s)
self.assertTrue(next(cursor))
addresses.add(cursor.address)
# Break early if we can.
if len(addresses) > 1:
break
self.assertGreater(len(addresses), 1)
def check_command_result(self, expected_result, result):
# Only compare the keys in the expected result.
filtered_result = {}
@ -395,26 +464,23 @@ def create_test(scenario_def, test):
raise unittest.SkipTest(test.get('skipReason'))
listener = OvertCommandListener()
# New client, to avoid interference from pooled sessions.
# Convert test['clientOptions'] to dict to avoid a Jython bug using "**"
# with ScenarioDict.
client = rs_client(event_listeners=[listener],
**dict(test['clientOptions']))
# Create a new client, to avoid interference from pooled sessions.
# Convert test['clientOptions'] to dict to avoid a Jython bug using
# "**" with ScenarioDict.
client_options = dict(test['clientOptions'])
if client_context.is_mongos:
client = rs_client(client_context.mongos_seeds(),
event_listeners=[listener], **client_options)
else:
client = rs_client(event_listeners=[listener], **client_options)
# Close the client explicitly to avoid having too many threads open.
self.addCleanup(client.close)
# Kill all sessions before and after each test to prevent an open
# transaction (from a test failure) from blocking collection/database
# operations during test set up and tear down.
def kill_all_sessions():
try:
client.admin.command('killAllSessions', [])
except OperationFailure:
# "operation was interrupted" by killing the command's
# own session.
pass
kill_all_sessions()
self.addCleanup(kill_all_sessions)
self.kill_all_sessions()
self.addCleanup(self.kill_all_sessions)
database_name = scenario_def['database_name']
collection_name = scenario_def['collection_name']
@ -509,6 +575,11 @@ def create_test(scenario_def, test):
self.check_events(test, listener, session_ids)
# Disable fail points.
if 'failPoint' in test:
self.set_fail_point({
'configureFailPoint': 'failCommand', 'mode': 'off'})
# Assert final state is expected.
expected_c = test['outcome'].get('collection')
if expected_c is not None:

View File

@ -350,6 +350,7 @@
},
{
"description": "write concern error on commit",
"skipReason": "SERVER-37458 Mongos does not yet support writeConcern on commit",
"operations": [
{
"name": "startTransaction",

View File

@ -0,0 +1,794 @@
{
"database_name": "transaction-tests",
"collection_name": "test",
"data": [
{
"_id": 1
},
{
"_id": 2
}
],
"tests": [
{
"description": "countDocuments",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": 1
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
}
]
}
}
},
{
"description": "distinct",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "distinct",
"object": "collection",
"arguments": {
"fieldName": "_id",
"session": "session0"
},
"result": [
1,
2
]
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
}
]
}
}
},
{
"description": "find",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "find",
"object": "collection",
"arguments": {
"filter": {
"_id": 2
},
"session": "session0"
},
"result": [
{
"_id": 2
}
]
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
}
]
}
}
},
{
"description": "insertOne",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 3
},
"session": "session0"
},
"result": {
"insertedId": 3
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 4
},
"session": "session0"
},
"result": {
"insertedId": 4
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 5
},
"session": "session0"
},
"result": {
"insertedId": 5
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 6
},
"session": "session0"
},
"result": {
"insertedId": 6
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 7
},
"session": "session0"
},
"result": {
"insertedId": 7
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 8
},
"session": "session0"
},
"result": {
"insertedId": 8
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 9
},
"session": "session0"
},
"result": {
"insertedId": 9
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 10
},
"session": "session0"
},
"result": {
"insertedId": 10
}
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
},
{
"_id": 5
},
{
"_id": 6
},
{
"_id": 7
},
{
"_id": 8
},
{
"_id": 9
},
{
"_id": 10
}
]
}
}
},
{
"description": "mixed read write operations",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 3
},
"session": "session0"
},
"result": {
"insertedId": 3
}
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 3
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 3
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 3
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 3
},
"session": "session0"
},
"result": 1
},
{
"name": "countDocuments",
"object": "collection",
"arguments": {
"filter": {
"_id": 3
},
"session": "session0"
},
"result": 1
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 4
},
"session": "session0"
},
"result": {
"insertedId": 4
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 5
},
"session": "session0"
},
"result": {
"insertedId": 5
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 6
},
"session": "session0"
},
"result": {
"insertedId": 6
}
},
{
"name": "insertOne",
"object": "collection",
"arguments": {
"document": {
"_id": 7
},
"session": "session0"
},
"result": {
"insertedId": 7
}
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
},
{
"_id": 5
},
{
"_id": 6
},
{
"_id": 7
}
]
}
}
},
{
"description": "multiple commits",
"operations": [
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertMany",
"object": "collection",
"arguments": {
"documents": [
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 3,
"1": 4
}
}
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
},
{
"name": "commitTransaction",
"object": "session0"
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
}
}
}
]
}

View File

@ -959,7 +959,7 @@
}
},
{
"description": "readConcern local in startTransaction options",
"description": "readConcern snapshot in startTransaction options",
"sessionOptions": {
"session0": {
"defaultTransactionOptions": {

View File

@ -120,13 +120,13 @@ class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
self.results.append(event)
def _connection_string(h, p, authenticate):
def _connection_string(h, authenticate):
if h.startswith("mongodb://"):
return h
elif client_context.auth_enabled and authenticate:
return "mongodb://%s:%s@%s:%d" % (db_user, db_pwd, str(h), p)
return "mongodb://%s:%s@%s" % (db_user, db_pwd, str(h))
else:
return "mongodb://%s:%d" % (str(h), p)
return "mongodb://%s" % (str(h),)
def _mongo_client(host, port, authenticate=True, direct=False, **kwargs):
@ -138,7 +138,7 @@ def _mongo_client(host, port, authenticate=True, direct=False, **kwargs):
client_options['replicaSet'] = client_context.replica_set_name
client_options.update(kwargs)
client = MongoClient(_connection_string(host, port, authenticate), port,
client = MongoClient(_connection_string(host, authenticate), port,
**client_options)
return client