From a2937aa8ebbfa25ae4c8ab484af16ba2d05e85d2 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 12 Apr 2018 15:49:01 -0700 Subject: [PATCH] Revert "Pin transactions to a single server address" This reverts commit 25bc0858ff566abfabadeca4a3fea177e9c2697c. --- pymongo/bulk.py | 2 +- pymongo/client_session.py | 7 ------ pymongo/collection.py | 44 ++++++++++++++++---------------- pymongo/database.py | 9 +++---- pymongo/mongo_client.py | 47 +++++++++++------------------------ test/test_read_preferences.py | 5 ++-- 6 files changed, 42 insertions(+), 72 deletions(-) diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 0835955c0..0a7a4ed6e 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -427,7 +427,7 @@ class _Bulk(object): client = self.collection.database.client if not write_concern.acknowledged: - with client._socket_for_writes(session) as sock_info: + with client._socket_for_writes() as sock_info: self.execute_no_results(sock_info, generator) else: return self.execute_command(generator, write_concern, session) diff --git a/pymongo/client_session.py b/pymongo/client_session.py index 4c4359d1b..751893680 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -319,13 +319,6 @@ class ClientSession(object): """True if this session has an active multi-statement transaction.""" return self._transaction is not None - def _pin_server_address(self, address): - assert self._transaction.address is None, "Transaction already pinned" - self._transaction.address = address - - def _pinned_server_address(self): - return self._transaction.address - def _apply_to(self, command, is_retryable, read_preference): self._check_ended() diff --git a/pymongo/collection.py b/pymongo/collection.py index f728921a6..13c4042b8 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -184,16 +184,14 @@ class Collection(common.BaseObject): unicode_decode_error_handler='replace', document_class=dict) - def _socket_for_reads(self, session): - return self.__database.client._socket_for_reads( - self.read_preference, session) + def _socket_for_reads(self): + return self.__database.client._socket_for_reads(self.read_preference) - def _socket_for_primary_reads(self, session): - return self.__database.client._socket_for_reads( - ReadPreference.PRIMARY, session) + def _socket_for_primary_reads(self): + return self.__database.client._socket_for_reads(ReadPreference.PRIMARY) - def _socket_for_writes(self, session): - return self.__database.client._socket_for_writes(session) + def _socket_for_writes(self): + return self.__database.client._socket_for_writes() def _command(self, sock_info, command, slave_ok=False, read_preference=None, @@ -254,7 +252,7 @@ class Collection(common.BaseObject): if "size" in options: options["size"] = float(options["size"]) cmd.update(options) - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, write_concern=self.write_concern, @@ -581,7 +579,7 @@ class Collection(common.BaseObject): True, _insert_command, session) _check_write_command_response(result) else: - with self._socket_for_writes(session=None) as sock_info: + with self._socket_for_writes() as sock_info: # Legacy OP_INSERT. self._legacy_write( sock_info, 'insert', command, op_id, @@ -1495,7 +1493,7 @@ class Collection(common.BaseObject): ('numCursors', num_cursors)]) cmd.update(kwargs) - with self._socket_for_reads(session) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): result = self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, session=session) @@ -1511,7 +1509,7 @@ class Collection(common.BaseObject): def _count(self, cmd, collation=None, session=None): """Internal count helper.""" - with self._socket_for_reads(session) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): res = self._command( sock_info, cmd, slave_ok, allowable_errors=["ns missing"], @@ -1608,7 +1606,7 @@ class Collection(common.BaseObject): """ common.validate_list('indexes', indexes) names = [] - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: supports_collations = sock_info.max_wire_version >= 5 def gen_indexes(): for index in indexes: @@ -1649,7 +1647,7 @@ class Collection(common.BaseObject): index_options.pop('collation', None)) index.update(index_options) - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: if collation is not None: if sock_info.max_wire_version < 5: raise ConfigurationError( @@ -1876,7 +1874,7 @@ class Collection(common.BaseObject): self.__database.name, self.__name, name) cmd = SON([("dropIndexes", self.__name), ("index", name)]) cmd.update(kwargs) - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: self._command(sock_info, cmd, read_preference=ReadPreference.PRIMARY, @@ -1913,7 +1911,7 @@ class Collection(common.BaseObject): """ cmd = SON([("reIndex", self.__name)]) cmd.update(kwargs) - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: return self._command( sock_info, cmd, read_preference=ReadPreference.PRIMARY, parse_write_concern_error=True, session=session) @@ -1942,7 +1940,7 @@ class Collection(common.BaseObject): codec_options = CodecOptions(SON) coll = self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY) - with self._socket_for_primary_reads(session) as (sock_info, slave_ok): + with self._socket_for_primary_reads() as (sock_info, slave_ok): cmd = SON([("listIndexes", self.__name), ("cursor", {})]) if sock_info.max_wire_version > 2: with self.__database.client._tmp_session(session, False) as s: @@ -2063,7 +2061,7 @@ class Collection(common.BaseObject): "batchSize", kwargs.pop("batchSize", None)) # If the server does not support the "cursor" option we # ignore useCursor and batchSize. - with self._socket_for_reads(session) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): dollar_out = pipeline and '$out' in pipeline[-1] if use_cursor: if "cursor" not in kwargs: @@ -2352,7 +2350,7 @@ class Collection(common.BaseObject): collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads(session=None) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): return self._command(sock_info, cmd, slave_ok, collation=collation)["retval"] @@ -2398,7 +2396,7 @@ class Collection(common.BaseObject): new_name = "%s.%s" % (self.__database.name, new_name) cmd = SON([("renameCollection", self.__full_name), ("to", new_name)]) - with self._socket_for_writes(session) as sock_info: + with self._socket_for_writes() as sock_info: with self.__database.client._tmp_session(session) as s: if sock_info.max_wire_version >= 5 and self.write_concern: cmd['writeConcern'] = self.write_concern.document @@ -2453,7 +2451,7 @@ class Collection(common.BaseObject): kwargs["query"] = filter collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads(session) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): return self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, collation=collation, session=session)["values"] @@ -2525,7 +2523,7 @@ class Collection(common.BaseObject): cmd.update(kwargs) inline = 'inline' in cmd['out'] - with self._socket_for_primary_reads(session) as (sock_info, slave_ok): + with self._socket_for_primary_reads() as (sock_info, slave_ok): if (sock_info.max_wire_version >= 5 and self.write_concern and not inline): cmd['writeConcern'] = self.write_concern.document @@ -2594,7 +2592,7 @@ class Collection(common.BaseObject): ("out", {"inline": 1})]) collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - with self._socket_for_reads(session) as (sock_info, slave_ok): + with self._socket_for_reads() as (sock_info, slave_ok): if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd: res = self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern, diff --git a/pymongo/database.py b/pymongo/database.py index d18bf045c..08ff9f5ad 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -526,8 +526,7 @@ class Database(common.BaseObject): .. mongodoc:: commands """ client = self.__client - with client._socket_for_reads( - read_preference, session) as (sock_info, slave_ok): + with client._socket_for_reads(read_preference) as (sock_info, slave_ok): return self._command(sock_info, command, slave_ok, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) @@ -585,7 +584,7 @@ class Database(common.BaseObject): .. versionadded:: 3.6 """ with self.__client._socket_for_reads( - ReadPreference.PRIMARY, session) as (sock_info, slave_okay): + ReadPreference.PRIMARY) as (sock_info, slave_okay): return self._list_collections( sock_info, slave_okay, session=session, **kwargs) @@ -650,7 +649,7 @@ class Database(common.BaseObject): self.__client._purge_index(self.__name, name) with self.__client._socket_for_reads( - ReadPreference.PRIMARY, session) as (sock_info, slave_ok): + ReadPreference.PRIMARY) as (sock_info, slave_ok): return self._command( sock_info, 'drop', slave_ok, _unicode(name), allowable_errors=['ns not found'], @@ -731,7 +730,7 @@ class Database(common.BaseObject): Added ``session`` parameter. """ cmd = SON([("currentOp", 1), ("$all", include_all)]) - with self.__client._socket_for_writes(session) as sock_info: + with self.__client._socket_for_writes() as sock_info: if sock_info.max_wire_version >= 4: with self.__client._tmp_session(session) as s: return sock_info.command("admin", cmd, session=s, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 0415ed331..74f03c078 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -871,8 +871,7 @@ class MongoClient(common.BaseObject): # Use SocketInfo.command directly to avoid implicitly creating # another session. with self._socket_for_reads( - ReadPreference.PRIMARY_PREFERRED, - None) as (sock_info, slave_ok): + ReadPreference.PRIMARY_PREFERRED) as (sock_info, slave_ok): if not sock_info.supports_sessions: return @@ -968,31 +967,13 @@ class MongoClient(common.BaseObject): self.__reset_server(server.description.address) raise - def _select_server(self, read_preference, session): - topology = self._get_topology() - if session and session.in_transaction: - address = session._pinned_server_address() - if address: - server = topology.select_server_by_address(address) - if not server: - raise AutoReconnect( - 'Pinned server %s:%d for transaction no longer' - 'available' % address) - return server - - server = topology.select_server(read_preference) - session._pin_server_address(server.description.address) - return server - else: - return topology.select_server(read_preference) - - def _socket_for_writes(self, session): - return self._get_socket(self._select_server( - ReadPreference.PRIMARY, session)) + def _socket_for_writes(self): + server = self._get_topology().select_server(writable_server_selector) + return self._get_socket(server) @contextlib.contextmanager - def _socket_for_reads(self, read_preference, session): - assert read_preference is not None, "read_preference must not be None" + def _socket_for_reads(self, read_preference): + preference = read_preference or ReadPreference.PRIMARY # Get a socket for a server matching the read preference, and yield # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to # mongods with topology type Single. If the server type is Mongos, @@ -1001,11 +982,10 @@ class MongoClient(common.BaseObject): # Thread safe: if the type is single it cannot change. topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single - server = self._select_server(read_preference, session) - + server = topology.select_server(read_preference) with self._get_socket(server) as sock_info: slave_ok = (single and not sock_info.is_mongos) or ( - read_preference != ReadPreference.PRIMARY) + preference != ReadPreference.PRIMARY) yield sock_info, slave_ok def _send_message_with_response(self, operation, read_preference=None, @@ -1025,14 +1005,14 @@ class MongoClient(common.BaseObject): self._kill_cursors_executor.open() topology = self._get_topology() - session = operation.session if address: server = topology.select_server_by_address(address) if not server: raise AutoReconnect('server %s:%d no longer available' % address) else: - server = self._select_server(read_preference, session) + selector = read_preference or writable_server_selector + server = topology.select_server(selector) # A _Query's slaveOk bit is already set for queries with non-primary # read preference. If this is a direct connection to a mongod, override @@ -1085,7 +1065,8 @@ class MongoClient(common.BaseObject): return bulk.retrying if bulk else retrying while True: try: - server = self._select_server(ReadPreference.PRIMARY, session) + server = self._get_topology().select_server( + writable_server_selector) supports_session = ( session is not None and server.description.retryable_writes_supported) @@ -1559,7 +1540,7 @@ class MongoClient(common.BaseObject): self._purge_index(name) with self._socket_for_reads( - ReadPreference.PRIMARY, None) as (sock_info, slave_ok): + ReadPreference.PRIMARY) as (sock_info, slave_ok): self[name]._command( sock_info, "dropDatabase", @@ -1701,7 +1682,7 @@ class MongoClient(common.BaseObject): Added ``session`` parameter. """ cmd = SON([("fsyncUnlock", 1)]) - with self._socket_for_writes(session=None) as sock_info: + with self._socket_for_writes() as sock_info: if sock_info.max_wire_version >= 4: try: with self._tmp_session(session) as s: diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index f6c8ac7b2..d6616afad 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -315,9 +315,8 @@ class ReadPrefTester(MongoClient): super(ReadPrefTester, self).__init__(*args, **client_options) @contextlib.contextmanager - def _socket_for_reads(self, read_preference, session): - context = super(ReadPrefTester, self)._socket_for_reads( - read_preference, session) + def _socket_for_reads(self, read_preference): + context = super(ReadPrefTester, self)._socket_for_reads(read_preference) with context as (sock_info, slave_ok): self.record_a_read(sock_info.address) yield sock_info, slave_ok