From 146179db53a8cf020830127c405b8dff1199b63b Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Tue, 7 Sep 2021 15:17:25 -0700 Subject: [PATCH] PYTHON-2803 Eliminate the use of 'slave' --- pymongo/aggregation.py | 10 ++++----- pymongo/change_stream.py | 4 ++-- pymongo/collection.py | 34 ++++++++++++++-------------- pymongo/cursor.py | 2 +- pymongo/database.py | 22 +++++++++--------- pymongo/message.py | 20 ++++++++--------- pymongo/mongo_client.py | 40 ++++++++++++++++----------------- pymongo/network.py | 8 +++---- pymongo/pool.py | 6 ++--- pymongo/server.py | 6 ++--- test/test_read_preferences.py | 14 ++++++------ test/test_server_description.py | 1 + test/test_topology.py | 4 +++- test/utils.py | 21 ----------------- 14 files changed, 87 insertions(+), 105 deletions(-) diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index ae1b9d9eb..812ca23b7 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -93,17 +93,17 @@ class _AggregationCommand(object): """Check whether the server version in-use supports aggregation.""" pass - def _process_result(self, result, session, server, sock_info, slave_ok): + def _process_result(self, result, session, server, sock_info, secondary_ok): if self._result_processor: self._result_processor( - result, session, server, sock_info, slave_ok) + result, session, server, sock_info, secondary_ok) def get_read_preference(self, session): if self._performs_write: return ReadPreference.PRIMARY return self._target._read_preference_for(session) - def get_cursor(self, session, server, sock_info, slave_ok): + def get_cursor(self, session, server, sock_info, secondary_ok): # Ensure command compatibility. self._check_compat(sock_info) @@ -136,7 +136,7 @@ class _AggregationCommand(object): result = sock_info.command( self._database.name, cmd, - slave_ok, + secondary_ok, self.get_read_preference(session), self._target.codec_options, parse_write_concern_error=True, @@ -147,7 +147,7 @@ class _AggregationCommand(object): client=self._database.client, user_fields=self._user_fields) - self._process_result(result, session, server, sock_info, slave_ok) + self._process_result(result, session, server, sock_info, secondary_ok) # Extract cursor from result or mock/fake one if necessary. if 'cursor' in result: diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 936059e14..00d049a83 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -58,7 +58,7 @@ _RESUMABLE_GETMORE_ERRORS = frozenset([ class ChangeStream(object): """The internal abstract base class for change stream cursors. - Should not be called directly by application developers. Use + Should not be called directly by application developers. Use :meth:`pymongo.collection.Collection.watch`, :meth:`pymongo.database.Database.watch`, or :meth:`pymongo.mongo_client.MongoClient.watch` instead. @@ -148,7 +148,7 @@ class ChangeStream(object): full_pipeline.extend(self._pipeline) return full_pipeline - def _process_result(self, result, session, server, sock_info, slave_ok): + def _process_result(self, result, session, server, sock_info, secondary_ok): """Callback that caches the postBatchResumeToken or startAtOperationTime from a changeStream aggregate command response containing an empty batch of change documents. diff --git a/pymongo/collection.py b/pymongo/collection.py index 68471c06c..7994c4d8a 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -180,7 +180,7 @@ class Collection(common.BaseObject): def _socket_for_writes(self, session): return self.__database.client._socket_for_writes(session) - def _command(self, sock_info, command, slave_ok=False, + def _command(self, sock_info, command, secondary_ok=False, read_preference=None, codec_options=None, check=True, allowable_errors=None, read_concern=None, @@ -194,7 +194,7 @@ class Collection(common.BaseObject): :Parameters: - `sock_info` - A SocketInfo instance. - `command` - The command itself, as a SON instance. - - `slave_ok`: whether to set the SlaveOkay wire protocol bit. + - `secondary_ok`: whether to set the secondaryOkay wire protocol bit. - `codec_options` (optional) - An instance of :class:`~bson.codec_options.CodecOptions`. - `check`: raise OperationFailure if there are errors @@ -221,7 +221,7 @@ class Collection(common.BaseObject): return sock_info.command( self.__database.name, command, - slave_ok, + secondary_ok, read_preference or self._read_preference_for(session), codec_options or self.codec_options, check, @@ -1413,14 +1413,14 @@ class Collection(common.BaseObject): return RawBatchCursor(self, *args, **kwargs) - def _count_cmd(self, session, sock_info, slave_ok, cmd, collation): + def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation): """Internal count command helper.""" # XXX: "ns missing" checks can be removed when we drop support for # MongoDB 3.0, see SERVER-17051. res = self._command( sock_info, cmd, - slave_ok, + secondary_ok, allowable_errors=["ns missing"], codec_options=self.__write_response_codec_options, read_concern=self.read_concern, @@ -1431,12 +1431,12 @@ class Collection(common.BaseObject): return int(res["n"]) def _aggregate_one_result( - self, sock_info, slave_ok, cmd, collation, session): + self, sock_info, secondary_ok, cmd, collation, session): """Internal helper to run an aggregate that returns a single result.""" result = self._command( sock_info, cmd, - slave_ok, + secondary_ok, allowable_errors=[26], # Ignore NamespaceNotFound. codec_options=self.__write_response_codec_options, read_concern=self.read_concern, @@ -1470,7 +1470,7 @@ class Collection(common.BaseObject): raise ConfigurationError( 'estimated_document_count does not support sessions') - def _cmd(session, server, sock_info, slave_ok): + def _cmd(session, server, sock_info, secondary_ok): if sock_info.max_wire_version >= 12: # MongoDB 4.9+ pipeline = [ @@ -1482,7 +1482,7 @@ class Collection(common.BaseObject): ('cursor', {})]) cmd.update(kwargs) result = self._aggregate_one_result( - sock_info, slave_ok, cmd, collation=None, session=session) + sock_info, secondary_ok, cmd, collation=None, session=session) if not result: return 0 return int(result['n']) @@ -1490,7 +1490,7 @@ class Collection(common.BaseObject): # MongoDB < 4.9 cmd = SON([('count', self.__name)]) cmd.update(kwargs) - return self._count_cmd(None, sock_info, slave_ok, cmd, None) + return self._count_cmd(None, sock_info, secondary_ok, cmd, None) return self.__database.client._retryable_read( _cmd, self.read_preference, None) @@ -1567,9 +1567,9 @@ class Collection(common.BaseObject): collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - def _cmd(session, server, sock_info, slave_ok): + def _cmd(session, server, sock_info, secondary_ok): result = self._aggregate_one_result( - sock_info, slave_ok, cmd, collation, session) + sock_info, secondary_ok, cmd, collation, session) if not result: return 0 return result['n'] @@ -1873,12 +1873,12 @@ class Collection(common.BaseObject): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, slave_ok): + def _cmd(session, server, sock_info, secondary_ok): cmd = SON([("listIndexes", self.__name), ("cursor", {})]) if sock_info.max_wire_version > 2: with self.__database.client._tmp_session(session, False) as s: try: - cursor = self._command(sock_info, cmd, slave_ok, + cursor = self._command(sock_info, cmd, secondary_ok, read_pref, codec_options, session=s)["cursor"] @@ -1894,7 +1894,7 @@ class Collection(common.BaseObject): else: res = message._first_batch( sock_info, self.__database.name, "system.indexes", - {"ns": self.__full_name}, 0, slave_ok, codec_options, + {"ns": self.__full_name}, 0, secondary_ok, codec_options, read_pref, cmd, self.database.client._event_listeners) cursor = res["cursor"] @@ -2304,9 +2304,9 @@ class Collection(common.BaseObject): kwargs["query"] = filter collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - def _cmd(session, server, sock_info, slave_ok): + def _cmd(session, server, sock_info, secondary_ok): return self._command( - sock_info, cmd, slave_ok, read_concern=self.read_concern, + sock_info, cmd, secondary_ok, read_concern=self.read_concern, collation=collation, session=session, user_fields={"values": 1})["values"] diff --git a/pymongo/cursor.py b/pymongo/cursor.py index 5f3419b7c..65f270512 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -68,7 +68,7 @@ _CURSOR_CLOSED_ERRORS = frozenset([ _QUERY_OPTIONS = { "tailable_cursor": 2, - "slave_okay": 4, + "secondary_okay": 4, "oplog_replay": 8, "no_timeout": 16, "await_data": 32, diff --git a/pymongo/database.py b/pymongo/database.py index 89a38a15a..df8d730fb 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -478,7 +478,7 @@ class Database(common.BaseObject): batch_size, collation, start_at_operation_time, session, start_after) - def _command(self, sock_info, command, slave_ok=False, value=1, check=True, + def _command(self, sock_info, command, secondary_ok=False, value=1, check=True, allowable_errors=None, read_preference=ReadPreference.PRIMARY, codec_options=DEFAULT_CODEC_OPTIONS, write_concern=None, @@ -492,7 +492,7 @@ class Database(common.BaseObject): return sock_info.command( self.__name, command, - slave_ok, + secondary_ok, read_preference, codec_options, check, @@ -591,8 +591,8 @@ class Database(common.BaseObject): read_preference = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) with self.__client._socket_for_reads( - read_preference, session) as (sock_info, slave_ok): - return self._command(sock_info, command, slave_ok, value, + read_preference, session) as (sock_info, secondary_ok): + return self._command(sock_info, command, secondary_ok, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) @@ -604,15 +604,15 @@ class Database(common.BaseObject): read_preference = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, slave_ok): - return self._command(sock_info, command, slave_ok, value, + def _cmd(session, server, sock_info, secondary_ok): + return self._command(sock_info, command, secondary_ok, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) return self.__client._retryable_read( _cmd, read_preference, session) - def _list_collections(self, sock_info, slave_okay, session, + def _list_collections(self, sock_info, secondary_okay, session, read_preference, **kwargs): """Internal listCollections helper.""" @@ -625,7 +625,7 @@ class Database(common.BaseObject): with self.__client._tmp_session( session, close=False) as tmp_session: cursor = self._command( - sock_info, cmd, slave_okay, + sock_info, cmd, secondary_okay, read_preference=read_preference, session=tmp_session)["cursor"] cmd_cursor = CommandCursor( @@ -647,7 +647,7 @@ class Database(common.BaseObject): cmd = SON([("aggregate", "system.namespaces"), ("pipeline", pipeline), ("cursor", kwargs.get("cursor", {}))]) - cursor = self._command(sock_info, cmd, slave_okay)["cursor"] + cursor = self._command(sock_info, cmd, secondary_okay)["cursor"] cmd_cursor = CommandCursor(coll, cursor, sock_info.address) cmd_cursor._maybe_pin_connection(sock_info) return cmd_cursor @@ -676,9 +676,9 @@ class Database(common.BaseObject): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, slave_okay): + def _cmd(session, server, sock_info, secondary_okay): return self._list_collections( - sock_info, slave_okay, session, read_preference=read_pref, + sock_info, secondary_okay, session, read_preference=read_pref, **kwargs) return self.__client._retryable_read( diff --git a/pymongo/message.py b/pymongo/message.py index 12ca524ff..22effb1b2 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -101,7 +101,7 @@ def _maybe_add_read_preference(spec, read_preference): # problems with mongos versions that don't support read preferences. Also, # for maximum backwards compatibility, don't add $readPreference for # secondaryPreferred unless tags or maxStalenessSeconds are in use (setting - # the slaveOkay bit has the same effect). + # the secondaryOkay bit has the same effect). if mode and ( mode != ReadPreference.SECONDARY_PREFERRED.mode or len(document) > 1): @@ -328,10 +328,10 @@ class _Query(object): self._as_command = cmd, self.db return self._as_command - def get_message(self, set_slave_ok, sock_info, use_cmd=False): - """Get a query message, possibly setting the slaveOk bit.""" - if set_slave_ok: - # Set the slaveOk bit. + def get_message(self, set_secondary_ok, sock_info, use_cmd=False): + """Get a query message, possibly setting the secondaryOk bit.""" + if set_secondary_ok: + # Set the secondaryOk bit. flags = self.flags | 4 else: flags = self.flags @@ -344,7 +344,7 @@ class _Query(object): if sock_info.op_msg_enabled: request_id, msg, size, _ = _op_msg( 0, spec, self.db, self.read_preference, - set_slave_ok, False, self.codec_options, + set_secondary_ok, False, self.codec_options, ctx=sock_info.compression_context) return request_id, msg, size ns = "%s.%s" % (self.db, "$cmd") @@ -699,13 +699,13 @@ if _use_c: _op_msg_uncompressed = _cmessage._op_msg -def _op_msg(flags, command, dbname, read_preference, slave_ok, check_keys, +def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys, opts, ctx=None): """Get a OP_MSG message.""" command['$db'] = dbname # getMore commands do not send $readPreference. if read_preference is not None and "$readPreference" not in command: - if slave_ok and not read_preference.mode: + if secondary_ok and not read_preference.mode: command["$readPreference"] = ( ReadPreference.PRIMARY_PREFERRED.document) else: @@ -1675,7 +1675,7 @@ _UNPACK_REPLY = { def _first_batch(sock_info, db, coll, query, ntoreturn, - slave_ok, codec_options, read_preference, cmd, listeners): + secondary_ok, codec_options, read_preference, cmd, listeners): """Simple query helper for retrieving a first (and possibly only) batch.""" query = _Query( 0, db, coll, 0, query, None, codec_options, @@ -1687,7 +1687,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn, if publish: start = datetime.datetime.now() - request_id, msg, max_doc_size = query.get_message(slave_ok, sock_info) + request_id, msg, max_doc_size = query.get_message(secondary_ok, sock_info) if publish: encoding_duration = datetime.datetime.now() - start diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index cf7b4d3a6..549389f84 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1105,7 +1105,7 @@ class MongoClient(common.BaseObject): # another session. with self._socket_for_reads( ReadPreference.PRIMARY_PREFERRED, - None) as (sock_info, slave_ok): + None) as (sock_info, secondary_ok): if not sock_info.supports_sessions: return @@ -1113,7 +1113,7 @@ class MongoClient(common.BaseObject): spec = SON([('endSessions', session_ids[i:i + common._MAX_END_SESSIONS])]) sock_info.command( - 'admin', spec, slave_ok=slave_ok, client=self) + 'admin', spec, secondary_ok=secondary_ok, client=self) except PyMongoError: # Drivers MUST ignore any errors returned by the endSessions # command. @@ -1217,39 +1217,39 @@ class MongoClient(common.BaseObject): return self._get_socket(server, session) @contextlib.contextmanager - def _slaveok_for_server(self, read_preference, server, session): + def _secondaryok_for_server(self, read_preference, server, session): assert read_preference is not None, "read_preference must not be None" # Get a socket for a server matching the read preference, and yield - # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to - # mongods with topology type Single. If the server type is Mongos, - # follow the rules for passing read preference to mongos, even for - # topology type Single." + # sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must + # be sent to mongods with topology type Single. If the server type is + # Mongos, follow the rules for passing read preference to mongos, even + # for topology type Single." # Thread safe: if the type is single it cannot change. topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single with self._get_socket(server, session) as sock_info: - slave_ok = (single and not sock_info.is_mongos) or ( + secondary_ok = (single and not sock_info.is_mongos) or ( read_preference != ReadPreference.PRIMARY) - yield sock_info, slave_ok + yield sock_info, secondary_ok @contextlib.contextmanager def _socket_for_reads(self, read_preference, session): assert read_preference is not None, "read_preference must not be None" # Get a socket for a server matching the read preference, and yield - # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to - # mongods with topology type Single. If the server type is Mongos, - # follow the rules for passing read preference to mongos, even for - # topology type Single." + # sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must be + # sent to mongods with topology type Single. If the server type is + # Mongos, follow the rules for passing read preference to mongos, even + # for topology type Single." # Thread safe: if the type is single it cannot change. topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single server = self._select_server(read_preference, session) with self._get_socket(server, session) as sock_info: - slave_ok = (single and not sock_info.is_mongos) or ( + secondary_ok = (single and not sock_info.is_mongos) or ( read_preference != ReadPreference.PRIMARY) - yield sock_info, slave_ok + yield sock_info, secondary_ok def _should_pin_cursor(self, session): return (self.__options.load_balanced and @@ -1276,9 +1276,9 @@ class MongoClient(common.BaseObject): operation.sock_mgr.sock, operation, True, self._event_listeners, unpack_res) - def _cmd(session, server, sock_info, slave_ok): + def _cmd(session, server, sock_info, secondary_ok): return server.run_operation( - sock_info, operation, slave_ok, self._event_listeners, + sock_info, operation, secondary_ok, self._event_listeners, unpack_res) return self._retryable_read( @@ -1375,13 +1375,13 @@ class MongoClient(common.BaseObject): read_pref, session, address=address) if not server.description.retryable_reads_supported: retryable = False - with self._slaveok_for_server(read_pref, server, session) as ( - sock_info, slave_ok): + with self._secondaryok_for_server(read_pref, server, session) as ( + sock_info, secondary_ok): if retrying and not retryable: # A retry is not possible because this server does # not support retryable reads, raise the last error. raise last_error - return func(session, server, sock_info, slave_ok) + return func(session, server, sock_info, secondary_ok) except ServerSelectionTimeoutError: if retrying: # The application may think the write was never attempted diff --git a/pymongo/network.py b/pymongo/network.py index c6146566c..cc83d9057 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -38,7 +38,7 @@ from pymongo.socket_checker import _errno_from_exception _UNPACK_HEADER = struct.Struct("