From 6bab444bd728c7b91c588cdff6ca1af36b08e2ab Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 1 Oct 2018 15:49:34 -0700 Subject: [PATCH] PYTHON-1673 Mongos pinning for sharded transactions In a sharded transaction, a session is pinned to the mongos server selected for the initial command. All subsequent commands in the same transaction are routed to the pinned mongos server. --- pymongo/bulk.py | 2 +- pymongo/change_stream.py | 3 +- pymongo/client_session.py | 22 +- pymongo/collection.py | 21 +- pymongo/database.py | 8 +- pymongo/mongo_client.py | 55 +- pymongo/server_description.py | 4 + test/__init__.py | 35 +- test/test_read_preferences.py | 4 +- test/test_transactions.py | 103 ++- test/transactions/commit.json | 1 + test/transactions/pin-mongos.json | 794 +++++++++++++++++++++ test/transactions/transaction-options.json | 2 +- test/utils.py | 8 +- 14 files changed, 1002 insertions(+), 60 deletions(-) create mode 100644 test/transactions/pin-mongos.json diff --git a/pymongo/bulk.py b/pymongo/bulk.py index ccd58e4ec..bbe8f75d8 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -515,7 +515,7 @@ class _Bulk(object): client = self.collection.database.client if not write_concern.acknowledged: - with client._socket_for_writes() as sock_info: + with client._socket_for_writes(session) as sock_info: self.execute_no_results(sock_info, generator) else: return self.execute_command(generator, write_concern, session) diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index b8b94418a..249633b0b 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -107,7 +107,8 @@ class ChangeStream(object): """ read_preference = self._target._read_preference_for(session) client = self._database.client - with client._socket_for_reads(read_preference) as (sock_info, slave_ok): + with client._socket_for_reads( + read_preference, session) as (sock_info, slave_ok): pipeline = self._full_pipeline() cmd = SON([("aggregate", self._aggregation_target), ("pipeline", pipeline), diff --git a/pymongo/client_session.py b/pymongo/client_session.py index dff0cd70f..e404a41bf 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -246,6 +246,7 @@ class _Transaction(object): self.opts = opts self.state = _TxnState.NONE self.transaction_id = 0 + self.pinned_address = None def active(self): return self.state in (_TxnState.STARTING, _TxnState.IN_PROGRESS) @@ -369,6 +370,7 @@ class ClientSession(object): self._transaction.state = _TxnState.STARTING self._start_retryable_write() self._transaction.transaction_id = self._server_session.transaction_id + self._transaction.pinned_address = None return _TransactionContext(self) def commit_transaction(self): @@ -388,6 +390,10 @@ class ClientSession(object): elif state is _TxnState.ABORTED: raise InvalidOperation( "Cannot call commitTransaction after calling abortTransaction") + elif state is _TxnState.COMMITTED: + # We're rerunning the commit, move the state back to "in progress" + # so that _in_transaction returns true. + self._transaction.state = _TxnState.IN_PROGRESS try: self._finish_transaction_with_retry("commitTransaction") @@ -441,12 +447,10 @@ class ClientSession(object): self._transaction.state = _TxnState.ABORTED def _finish_transaction(self, command_name): - with self._client._socket_for_writes() as sock_info: + with self._client._socket_for_writes(self) as sock_info: return self._client.admin._command( sock_info, command_name, - txnNumber=self._transaction.transaction_id, - autocommit=False, session=self, write_concern=self._transaction.opts.write_concern, parse_write_concern_error=True) @@ -528,6 +532,17 @@ class ClientSession(object): """True if this session has an active multi-statement transaction.""" return self._transaction.active() + @property + def _pinned_address(self): + """The mongos address this transaction was created on.""" + if self._transaction.active(): + return self._transaction.pinned_address + return None + + def _pin_mongos(self, server): + """Pin this session to the given mongos Server.""" + self._transaction.pinned_address = server.description.address + def _txn_read_preference(self): """Return read preference of this transaction or None.""" if self._in_transaction: @@ -542,6 +557,7 @@ class ClientSession(object): if not self._in_transaction: self._transaction.state = _TxnState.NONE + self._transaction.pinned_address = None if is_retryable: command['txnNumber'] = self._server_session.transaction_id diff --git a/pymongo/collection.py b/pymongo/collection.py index 0059f9117..26a497876 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -185,15 +185,16 @@ class Collection(common.BaseObject): def _socket_for_reads(self, session): return self.__database.client._socket_for_reads( - self._read_preference_for(session)) + self._read_preference_for(session), session) def _socket_for_primary_reads(self, session): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - return self.__database.client._socket_for_reads(read_pref), read_pref + return self.__database.client._socket_for_reads( + read_pref, session), read_pref - def _socket_for_writes(self): - return self.__database.client._socket_for_writes() + def _socket_for_writes(self, session): + return self.__database.client._socket_for_writes(session) def _command(self, sock_info, command, slave_ok=False, read_preference=None, @@ -251,7 +252,7 @@ class Collection(common.BaseObject): if "size" in options: options["size"] = float(options["size"]) cmd.update(options) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, write_concern=self._write_concern_for(session), @@ -1804,7 +1805,7 @@ class Collection(common.BaseObject): """ common.validate_list('indexes', indexes) names = [] - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: supports_collations = sock_info.max_wire_version >= 5 def gen_indexes(): for index in indexes: @@ -1844,7 +1845,7 @@ class Collection(common.BaseObject): index_options.pop('collation', None)) index.update(index_options) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: if collation is not None: if sock_info.max_wire_version < 5: raise ConfigurationError( @@ -2070,7 +2071,7 @@ class Collection(common.BaseObject): self.__database.name, self.__name, name) cmd = SON([("dropIndexes", self.__name), ("index", name)]) cmd.update(kwargs) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: self._command(sock_info, cmd, read_preference=ReadPreference.PRIMARY, @@ -2106,7 +2107,7 @@ class Collection(common.BaseObject): """ cmd = SON([("reIndex", self.__name)]) cmd.update(kwargs) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: return self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, session=session) @@ -2606,7 +2607,7 @@ class Collection(common.BaseObject): cmd.update(kwargs) write_concern = self._write_concern_for_cmd(cmd, session) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: with self.__database.client._tmp_session(session) as s: return sock_info.command( 'admin', cmd, diff --git a/pymongo/database.py b/pymongo/database.py index a358cfc70..178679b8a 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -608,7 +608,7 @@ class Database(common.BaseObject): read_preference = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) with self.__client._socket_for_reads( - read_preference) as (sock_info, slave_ok): + read_preference, session) as (sock_info, slave_ok): return self._command(sock_info, command, slave_ok, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) @@ -671,7 +671,7 @@ class Database(common.BaseObject): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) with self.__client._socket_for_reads( - read_pref) as (sock_info, slave_okay): + read_pref, session) as (sock_info, slave_okay): return self._list_collections( sock_info, slave_okay, session, read_preference=read_pref, **kwargs) @@ -745,7 +745,7 @@ class Database(common.BaseObject): self.__client._purge_index(self.__name, name) - with self.__client._socket_for_writes() as sock_info: + with self.__client._socket_for_writes(session) as sock_info: return self._command( sock_info, 'drop', value=_unicode(name), allowable_errors=['ns not found'], @@ -826,7 +826,7 @@ class Database(common.BaseObject): Added ``session`` parameter. """ cmd = SON([("currentOp", 1), ("$all", include_all)]) - with self.__client._socket_for_writes() as sock_info: + with self.__client._socket_for_writes(session) as sock_info: if sock_info.max_wire_version >= 4: with self.__client._tmp_session(session) as s: return sock_info.command("admin", cmd, session=s, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 866e24f72..e79a1517a 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1000,7 +1000,8 @@ class MongoClient(common.BaseObject): # Use SocketInfo.command directly to avoid implicitly creating # another session. with self._socket_for_reads( - ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok): + ReadPreference.PRIMARY_PREFERRED, + None) as (sock_info, slave_ok): if not sock_info.supports_sessions: return @@ -1105,12 +1106,40 @@ class MongoClient(common.BaseObject): self.__reset_server(server.description.address) raise - def _socket_for_writes(self): - server = self._get_topology().select_server(writable_server_selector) + def _select_server(self, server_selector, session, address=None): + """Select a server to run an operation on this client. + + :Parameters: + - `server_selector`: The server selector to use if the session is + not pinned and no address is given. + - `session`: The ClientSession for the next operation, or None. May + be pinned to a mongos server address. + - `address` (optional): Address when sending a message + to a specific server, used for getMore. + """ + topology = self._get_topology() + address = address or (session and session._pinned_address) + if address: + # We're running a getMore or this session is pinned to a mongos. + server = topology.select_server_by_address(address) + if not server: + raise AutoReconnect('server %s:%d no longer available' + % address) + else: + server = topology.select_server(server_selector) + # Pin this session to the selected server if it's performing a + # sharded transaction. + if server.description.mongos and (session and + session._in_transaction): + session._pin_mongos(server) + return server + + def _socket_for_writes(self, session): + server = self._select_server(writable_server_selector, session) return self._get_socket(server) @contextlib.contextmanager - def _socket_for_reads(self, read_preference): + def _socket_for_reads(self, read_preference, session): assert read_preference is not None, "read_preference must not be None" # Get a socket for a server matching the read preference, and yield # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to @@ -1120,7 +1149,7 @@ class MongoClient(common.BaseObject): # Thread safe: if the type is single it cannot change. topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single - server = topology.select_server(read_preference) + server = self._select_server(read_preference, session) with self._get_socket(server) as sock_info: slave_ok = (single and not sock_info.is_mongos) or ( @@ -1139,14 +1168,9 @@ class MongoClient(common.BaseObject): - `address` (optional): Optional address when sending a message to a specific server, used for getMore. """ + server = self._select_server( + operation.read_preference, operation.session, address=address) topology = self._get_topology() - if address: - server = topology.select_server_by_address(address) - if not server: - raise AutoReconnect('server %s:%d no longer available' - % address) - else: - server = topology.select_server(operation.read_preference) # If this is a direct connection to a mongod, *always* set the slaveOk # bit. See bullet point 2 in server-selection.rst#topology-type-single. @@ -1206,8 +1230,7 @@ class MongoClient(common.BaseObject): while True: try: - server = self._get_topology().select_server( - writable_server_selector) + server = self._select_server(writable_server_selector, session) supports_session = ( session is not None and server.description.retryable_writes_supported) @@ -1736,7 +1759,7 @@ class MongoClient(common.BaseObject): "of %s or a Database" % (string_type.__name__,)) self._purge_index(name) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: self[name]._command( sock_info, "dropDatabase", @@ -1877,7 +1900,7 @@ class MongoClient(common.BaseObject): Added ``session`` parameter. """ cmd = SON([("fsyncUnlock", 1)]) - with self._socket_for_writes() as sock_info: + with self._socket_for_writes(session) as sock_info: if sock_info.max_wire_version >= 4: try: with self._tmp_session(session) as s: diff --git a/pymongo/server_description.py b/pymongo/server_description.py index b77785549..d978b4799 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -187,6 +187,10 @@ class ServerDescription(object): def is_readable(self): return self._is_readable + @property + def mongos(self): + return self._server_type == SERVER_TYPE.Mongos + @property def is_server_type_known(self): return self.server_type != SERVER_TYPE.Unknown diff --git a/test/__init__.py b/test/__init__.py index 24e28cfce..60cf2898e 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -162,6 +162,7 @@ class ClientContext(object): self.auth_enabled = False self.test_commands_enabled = False self.is_mongos = False + self.mongoses = [] self.is_rs = False self.has_ipv6 = False self.ssl = False @@ -295,6 +296,17 @@ class ClientContext(object): self.is_mongos = (self.ismaster.get('msg') == 'isdbgrid') self.has_ipv6 = self._server_started_with_ipv6() + if self.is_mongos: + # Check for another mongos on the next port. + address = self.client.address + next_address = address[0], address[1] + 1 + self.mongoses.append(address) + mongos_client = self._connect(*next_address, + **self.default_client_options) + if mongos_client: + ismaster = mongos_client.admin.command('ismaster') + if ismaster.get('msg') == 'isdbgrid': + self.mongoses.append(next_address) def init(self): with self.conn_lock: @@ -507,6 +519,13 @@ class ClientContext(object): "Must be connected to a mongos", func=func) + def require_multiple_mongoses(self, func): + """Run a test only if the client is connected to a sharded cluster + that has 2 mongos nodes.""" + return self._require(lambda: len(self.mongoses) > 1, + "Must have multiple mongoses available", + func=func) + def require_standalone(self, func): """Run a test only if the client is connected to a standalone.""" return self._require(lambda: not (self.is_mongos or self.is_rs), @@ -586,13 +605,25 @@ class ClientContext(object): "Sessions not supported", func=func) + def supports_transactions(self): + if self.version.at_least(4, 1, 6): + return self.is_mongos or self.is_rs + + if self.version.at_least(4, 0): + return self.is_rs + return False + def require_transactions(self, func): """Run a test only if the deployment might support transactions. *Might* because this does not test the storage engine or FCV. """ - new_func = self.require_version_min(4, 0, 0, -1)(func) - return self.require_replica_set(new_func) + return self._require(self.supports_transactions, + "Transactions are not supported", + func=func) + + def mongos_seeds(self): + return ','.join('%s:%s' % address for address in self.mongoses) @property def supports_reindex(self): diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 50c288aae..a0e30e9f0 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -318,9 +318,9 @@ class ReadPrefTester(MongoClient): super(ReadPrefTester, self).__init__(*args, **client_options) @contextlib.contextmanager - def _socket_for_reads(self, read_preference): + def _socket_for_reads(self, read_preference, session): context = super(ReadPrefTester, self)._socket_for_reads( - read_preference) + read_preference, session) with context as (sock_info, slave_ok): self.record_a_read(sock_info.address) yield sock_info, slave_ok diff --git a/test/test_transactions.py b/test/test_transactions.py index e955ecba6..113403e7a 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -38,7 +38,7 @@ from pymongo.read_preferences import ReadPreference from pymongo.results import _WriteResult, BulkWriteResult from test import unittest, client_context, IntegrationTest -from test.utils import OvertCommandListener, rs_client +from test.utils import OvertCommandListener, rs_client, single_client from test.utils_selection_tests import parse_read_preference # Location of JSON test specifications. @@ -47,6 +47,11 @@ _TEST_PATH = os.path.join( _TXN_TESTS_DEBUG = os.environ.get('TRANSACTION_TESTS_DEBUG') +# Max number of operations to perform after a transaction to prove unpinning +# occurs. Chosen so that there's a low false positive rate. With 2 mongoses, +# 20 attempts yields a 1 in 1048576 chance of a false positive (1/(0.5^20)). +UNPIN_TEST_MAX_ATTEMPTS = 20 + # TODO: factor the following functions with test_crud.py. def camel_to_snake(camel): @@ -67,6 +72,15 @@ def camel_to_snake_args(arguments): class TestTransactions(IntegrationTest): + + @classmethod + def setUpClass(cls): + super(TestTransactions, cls).setUpClass() + cls.mongos_clients = [] + if client_context.supports_transactions(): + for address in client_context.mongoses: + cls.mongos_clients.append(single_client('%s:%s' % address)) + def transaction_test_debug(self, msg): if _TXN_TESTS_DEBUG: print(msg) @@ -84,7 +98,18 @@ class TestTransactions(IntegrationTest): def set_fail_point(self, command_args): cmd = SON([('configureFailPoint', 'failCommand')]) cmd.update(command_args) - self.client.admin.command(cmd) + clients = self.mongos_clients if self.mongos_clients else [self.client] + for client in clients: + client.admin.command(cmd) + + def kill_all_sessions(self): + clients = self.mongos_clients if self.mongos_clients else [self.client] + for client in clients: + # Run killAllSessions without an implicit session to work + # around SERVER-38335. + with client._socket_for_writes(None) as sock_info: + spec = SON([('killAllSessions', [])]) + sock_info.command('admin', spec, client=client) @client_context.require_transactions def test_transaction_options_validation(self): @@ -111,6 +136,7 @@ class TestTransactions(IntegrationTest): def test_transaction_write_concern_override(self): """Test txn overrides Client/Database/Collection write_concern.""" client = rs_client(w=0) + self.addCleanup(client.close) db = client.test coll = db.test coll.insert_one({}) @@ -158,6 +184,49 @@ class TestTransactions(IntegrationTest): op(*args, **kwargs) s.abort_transaction() + @client_context.require_transactions + @client_context.require_multiple_mongoses + def test_unpin_for_next_transaction(self): + client = rs_client(client_context.mongos_seeds()) + self.addCleanup(client.close) + with client.start_session() as s: + # Session is pinned to Mongos. + with s.start_transaction(): + client.test.test.insert_one({}, session=s) + + addresses = set() + for _ in range(UNPIN_TEST_MAX_ATTEMPTS): + with s.start_transaction(): + cursor = client.test.test.find({}, session=s) + self.assertTrue(next(cursor)) + addresses.add(cursor.address) + # Break early if we can. + if len(addresses) > 1: + break + + self.assertGreater(len(addresses), 1) + + @client_context.require_transactions + @client_context.require_multiple_mongoses + def test_unpin_for_non_transaction_operation(self): + client = rs_client(client_context.mongos_seeds()) + self.addCleanup(client.close) + with client.start_session() as s: + # Session is pinned to Mongos. + with s.start_transaction(): + client.test.test.insert_one({}, session=s) + + addresses = set() + for _ in range(UNPIN_TEST_MAX_ATTEMPTS): + cursor = client.test.test.find({}, session=s) + self.assertTrue(next(cursor)) + addresses.add(cursor.address) + # Break early if we can. + if len(addresses) > 1: + break + + self.assertGreater(len(addresses), 1) + def check_command_result(self, expected_result, result): # Only compare the keys in the expected result. filtered_result = {} @@ -395,26 +464,23 @@ def create_test(scenario_def, test): raise unittest.SkipTest(test.get('skipReason')) listener = OvertCommandListener() - # New client, to avoid interference from pooled sessions. - # Convert test['clientOptions'] to dict to avoid a Jython bug using "**" - # with ScenarioDict. - client = rs_client(event_listeners=[listener], - **dict(test['clientOptions'])) + # Create a new client, to avoid interference from pooled sessions. + # Convert test['clientOptions'] to dict to avoid a Jython bug using + # "**" with ScenarioDict. + client_options = dict(test['clientOptions']) + if client_context.is_mongos: + client = rs_client(client_context.mongos_seeds(), + event_listeners=[listener], **client_options) + else: + client = rs_client(event_listeners=[listener], **client_options) # Close the client explicitly to avoid having too many threads open. self.addCleanup(client.close) # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. - def kill_all_sessions(): - try: - client.admin.command('killAllSessions', []) - except OperationFailure: - # "operation was interrupted" by killing the command's - # own session. - pass - kill_all_sessions() - self.addCleanup(kill_all_sessions) + self.kill_all_sessions() + self.addCleanup(self.kill_all_sessions) database_name = scenario_def['database_name'] collection_name = scenario_def['collection_name'] @@ -509,6 +575,11 @@ def create_test(scenario_def, test): self.check_events(test, listener, session_ids) + # Disable fail points. + if 'failPoint' in test: + self.set_fail_point({ + 'configureFailPoint': 'failCommand', 'mode': 'off'}) + # Assert final state is expected. expected_c = test['outcome'].get('collection') if expected_c is not None: diff --git a/test/transactions/commit.json b/test/transactions/commit.json index 173fae6c4..0a9331132 100644 --- a/test/transactions/commit.json +++ b/test/transactions/commit.json @@ -350,6 +350,7 @@ }, { "description": "write concern error on commit", + "skipReason": "SERVER-37458 Mongos does not yet support writeConcern on commit", "operations": [ { "name": "startTransaction", diff --git a/test/transactions/pin-mongos.json b/test/transactions/pin-mongos.json new file mode 100644 index 000000000..52229ee5b --- /dev/null +++ b/test/transactions/pin-mongos.json @@ -0,0 +1,794 @@ +{ + "database_name": "transaction-tests", + "collection_name": "test", + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ], + "tests": [ + { + "description": "countDocuments", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "distinct", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "distinct", + "object": "collection", + "arguments": { + "fieldName": "_id", + "session": "session0" + }, + "result": [ + 1, + 2 + ] + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "find", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 2 + }, + "session": "session0" + }, + "result": [ + { + "_id": 2 + } + ] + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "insertOne", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 3 + }, + "session": "session0" + }, + "result": { + "insertedId": 3 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 4 + }, + "session": "session0" + }, + "result": { + "insertedId": 4 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 5 + }, + "session": "session0" + }, + "result": { + "insertedId": 5 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 6 + }, + "session": "session0" + }, + "result": { + "insertedId": 6 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 7 + }, + "session": "session0" + }, + "result": { + "insertedId": 7 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 8 + }, + "session": "session0" + }, + "result": { + "insertedId": 8 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 9 + }, + "session": "session0" + }, + "result": { + "insertedId": 9 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 10 + }, + "session": "session0" + }, + "result": { + "insertedId": 10 + } + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + }, + { + "_id": 5 + }, + { + "_id": 6 + }, + { + "_id": 7 + }, + { + "_id": 8 + }, + { + "_id": 9 + }, + { + "_id": 10 + } + ] + } + } + }, + { + "description": "mixed read write operations", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 3 + }, + "session": "session0" + }, + "result": { + "insertedId": 3 + } + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 3 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 3 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 3 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 3 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "countDocuments", + "object": "collection", + "arguments": { + "filter": { + "_id": 3 + }, + "session": "session0" + }, + "result": 1 + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 4 + }, + "session": "session0" + }, + "result": { + "insertedId": 4 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 5 + }, + "session": "session0" + }, + "result": { + "insertedId": 5 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 6 + }, + "session": "session0" + }, + "result": { + "insertedId": 6 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 7 + }, + "session": "session0" + }, + "result": { + "insertedId": 7 + } + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + }, + { + "_id": 5 + }, + { + "_id": 6 + }, + { + "_id": 7 + } + ] + } + } + }, + { + "description": "multiple commits", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ], + "session": "session0" + }, + "result": { + "insertedIds": { + "0": 3, + "1": 4 + } + } + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/test/transactions/transaction-options.json b/test/transactions/transaction-options.json index 0c134e5f6..d811165a9 100644 --- a/test/transactions/transaction-options.json +++ b/test/transactions/transaction-options.json @@ -959,7 +959,7 @@ } }, { - "description": "readConcern local in startTransaction options", + "description": "readConcern snapshot in startTransaction options", "sessionOptions": { "session0": { "defaultTransactionOptions": { diff --git a/test/utils.py b/test/utils.py index 91bedf35a..c6db94a4f 100644 --- a/test/utils.py +++ b/test/utils.py @@ -120,13 +120,13 @@ class HeartbeatEventListener(monitoring.ServerHeartbeatListener): self.results.append(event) -def _connection_string(h, p, authenticate): +def _connection_string(h, authenticate): if h.startswith("mongodb://"): return h elif client_context.auth_enabled and authenticate: - return "mongodb://%s:%s@%s:%d" % (db_user, db_pwd, str(h), p) + return "mongodb://%s:%s@%s" % (db_user, db_pwd, str(h)) else: - return "mongodb://%s:%d" % (str(h), p) + return "mongodb://%s" % (str(h),) def _mongo_client(host, port, authenticate=True, direct=False, **kwargs): @@ -138,7 +138,7 @@ def _mongo_client(host, port, authenticate=True, direct=False, **kwargs): client_options['replicaSet'] = client_context.replica_set_name client_options.update(kwargs) - client = MongoClient(_connection_string(host, port, authenticate), port, + client = MongoClient(_connection_string(host, authenticate), port, **client_options) return client