PYTHON-2803 Eliminate the use of 'slave'

This commit is contained in:
Bernie Hackett 2021-09-07 15:17:25 -07:00
parent 992761568d
commit 146179db53
14 changed files with 87 additions and 105 deletions

View File

@ -93,17 +93,17 @@ class _AggregationCommand(object):
"""Check whether the server version in-use supports aggregation."""
pass
def _process_result(self, result, session, server, sock_info, slave_ok):
def _process_result(self, result, session, server, sock_info, secondary_ok):
if self._result_processor:
self._result_processor(
result, session, server, sock_info, slave_ok)
result, session, server, sock_info, secondary_ok)
def get_read_preference(self, session):
if self._performs_write:
return ReadPreference.PRIMARY
return self._target._read_preference_for(session)
def get_cursor(self, session, server, sock_info, slave_ok):
def get_cursor(self, session, server, sock_info, secondary_ok):
# Ensure command compatibility.
self._check_compat(sock_info)
@ -136,7 +136,7 @@ class _AggregationCommand(object):
result = sock_info.command(
self._database.name,
cmd,
slave_ok,
secondary_ok,
self.get_read_preference(session),
self._target.codec_options,
parse_write_concern_error=True,
@ -147,7 +147,7 @@ class _AggregationCommand(object):
client=self._database.client,
user_fields=self._user_fields)
self._process_result(result, session, server, sock_info, slave_ok)
self._process_result(result, session, server, sock_info, secondary_ok)
# Extract cursor from result or mock/fake one if necessary.
if 'cursor' in result:

View File

@ -58,7 +58,7 @@ _RESUMABLE_GETMORE_ERRORS = frozenset([
class ChangeStream(object):
"""The internal abstract base class for change stream cursors.
Should not be called directly by application developers. Use
Should not be called directly by application developers. Use
:meth:`pymongo.collection.Collection.watch`,
:meth:`pymongo.database.Database.watch`, or
:meth:`pymongo.mongo_client.MongoClient.watch` instead.
@ -148,7 +148,7 @@ class ChangeStream(object):
full_pipeline.extend(self._pipeline)
return full_pipeline
def _process_result(self, result, session, server, sock_info, slave_ok):
def _process_result(self, result, session, server, sock_info, secondary_ok):
"""Callback that caches the postBatchResumeToken or
startAtOperationTime from a changeStream aggregate command response
containing an empty batch of change documents.

View File

@ -180,7 +180,7 @@ class Collection(common.BaseObject):
def _socket_for_writes(self, session):
return self.__database.client._socket_for_writes(session)
def _command(self, sock_info, command, slave_ok=False,
def _command(self, sock_info, command, secondary_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
@ -194,7 +194,7 @@ class Collection(common.BaseObject):
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit.
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
@ -221,7 +221,7 @@ class Collection(common.BaseObject):
return sock_info.command(
self.__database.name,
command,
slave_ok,
secondary_ok,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
@ -1413,14 +1413,14 @@ class Collection(common.BaseObject):
return RawBatchCursor(self, *args, **kwargs)
def _count_cmd(self, session, sock_info, slave_ok, cmd, collation):
def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
"""Internal count command helper."""
# XXX: "ns missing" checks can be removed when we drop support for
# MongoDB 3.0, see SERVER-17051.
res = self._command(
sock_info,
cmd,
slave_ok,
secondary_ok,
allowable_errors=["ns missing"],
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
@ -1431,12 +1431,12 @@ class Collection(common.BaseObject):
return int(res["n"])
def _aggregate_one_result(
self, sock_info, slave_ok, cmd, collation, session):
self, sock_info, secondary_ok, cmd, collation, session):
"""Internal helper to run an aggregate that returns a single result."""
result = self._command(
sock_info,
cmd,
slave_ok,
secondary_ok,
allowable_errors=[26], # Ignore NamespaceNotFound.
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
@ -1470,7 +1470,7 @@ class Collection(common.BaseObject):
raise ConfigurationError(
'estimated_document_count does not support sessions')
def _cmd(session, server, sock_info, slave_ok):
def _cmd(session, server, sock_info, secondary_ok):
if sock_info.max_wire_version >= 12:
# MongoDB 4.9+
pipeline = [
@ -1482,7 +1482,7 @@ class Collection(common.BaseObject):
('cursor', {})])
cmd.update(kwargs)
result = self._aggregate_one_result(
sock_info, slave_ok, cmd, collation=None, session=session)
sock_info, secondary_ok, cmd, collation=None, session=session)
if not result:
return 0
return int(result['n'])
@ -1490,7 +1490,7 @@ class Collection(common.BaseObject):
# MongoDB < 4.9
cmd = SON([('count', self.__name)])
cmd.update(kwargs)
return self._count_cmd(None, sock_info, slave_ok, cmd, None)
return self._count_cmd(None, sock_info, secondary_ok, cmd, None)
return self.__database.client._retryable_read(
_cmd, self.read_preference, None)
@ -1567,9 +1567,9 @@ class Collection(common.BaseObject):
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
def _cmd(session, server, sock_info, slave_ok):
def _cmd(session, server, sock_info, secondary_ok):
result = self._aggregate_one_result(
sock_info, slave_ok, cmd, collation, session)
sock_info, secondary_ok, cmd, collation, session)
if not result:
return 0
return result['n']
@ -1873,12 +1873,12 @@ class Collection(common.BaseObject):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, slave_ok):
def _cmd(session, server, sock_info, secondary_ok):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
if sock_info.max_wire_version > 2:
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, slave_ok,
cursor = self._command(sock_info, cmd, secondary_ok,
read_pref,
codec_options,
session=s)["cursor"]
@ -1894,7 +1894,7 @@ class Collection(common.BaseObject):
else:
res = message._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, slave_ok, codec_options,
{"ns": self.__full_name}, 0, secondary_ok, codec_options,
read_pref, cmd,
self.database.client._event_listeners)
cursor = res["cursor"]
@ -2304,9 +2304,9 @@ class Collection(common.BaseObject):
kwargs["query"] = filter
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
def _cmd(session, server, sock_info, slave_ok):
def _cmd(session, server, sock_info, secondary_ok):
return self._command(
sock_info, cmd, slave_ok, read_concern=self.read_concern,
sock_info, cmd, secondary_ok, read_concern=self.read_concern,
collation=collation, session=session,
user_fields={"values": 1})["values"]

View File

@ -68,7 +68,7 @@ _CURSOR_CLOSED_ERRORS = frozenset([
_QUERY_OPTIONS = {
"tailable_cursor": 2,
"slave_okay": 4,
"secondary_okay": 4,
"oplog_replay": 8,
"no_timeout": 16,
"await_data": 32,

View File

@ -478,7 +478,7 @@ class Database(common.BaseObject):
batch_size, collation, start_at_operation_time, session,
start_after)
def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS,
write_concern=None,
@ -492,7 +492,7 @@ class Database(common.BaseObject):
return sock_info.command(
self.__name,
command,
slave_ok,
secondary_ok,
read_preference,
codec_options,
check,
@ -591,8 +591,8 @@ class Database(common.BaseObject):
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
read_preference, session) as (sock_info, slave_ok):
return self._command(sock_info, command, slave_ok, value,
read_preference, session) as (sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
@ -604,15 +604,15 @@ class Database(common.BaseObject):
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, slave_ok):
return self._command(sock_info, command, slave_ok, value,
def _cmd(session, server, sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
return self.__client._retryable_read(
_cmd, read_preference, session)
def _list_collections(self, sock_info, slave_okay, session,
def _list_collections(self, sock_info, secondary_okay, session,
read_preference, **kwargs):
"""Internal listCollections helper."""
@ -625,7 +625,7 @@ class Database(common.BaseObject):
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, slave_okay,
sock_info, cmd, secondary_okay,
read_preference=read_preference,
session=tmp_session)["cursor"]
cmd_cursor = CommandCursor(
@ -647,7 +647,7 @@ class Database(common.BaseObject):
cmd = SON([("aggregate", "system.namespaces"),
("pipeline", pipeline),
("cursor", kwargs.get("cursor", {}))])
cursor = self._command(sock_info, cmd, slave_okay)["cursor"]
cursor = self._command(sock_info, cmd, secondary_okay)["cursor"]
cmd_cursor = CommandCursor(coll, cursor, sock_info.address)
cmd_cursor._maybe_pin_connection(sock_info)
return cmd_cursor
@ -676,9 +676,9 @@ class Database(common.BaseObject):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, slave_okay):
def _cmd(session, server, sock_info, secondary_okay):
return self._list_collections(
sock_info, slave_okay, session, read_preference=read_pref,
sock_info, secondary_okay, session, read_preference=read_pref,
**kwargs)
return self.__client._retryable_read(

View File

@ -101,7 +101,7 @@ def _maybe_add_read_preference(spec, read_preference):
# problems with mongos versions that don't support read preferences. Also,
# for maximum backwards compatibility, don't add $readPreference for
# secondaryPreferred unless tags or maxStalenessSeconds are in use (setting
# the slaveOkay bit has the same effect).
# the secondaryOkay bit has the same effect).
if mode and (
mode != ReadPreference.SECONDARY_PREFERRED.mode or
len(document) > 1):
@ -328,10 +328,10 @@ class _Query(object):
self._as_command = cmd, self.db
return self._as_command
def get_message(self, set_slave_ok, sock_info, use_cmd=False):
"""Get a query message, possibly setting the slaveOk bit."""
if set_slave_ok:
# Set the slaveOk bit.
def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
"""Get a query message, possibly setting the secondaryOk bit."""
if set_secondary_ok:
# Set the secondaryOk bit.
flags = self.flags | 4
else:
flags = self.flags
@ -344,7 +344,7 @@ class _Query(object):
if sock_info.op_msg_enabled:
request_id, msg, size, _ = _op_msg(
0, spec, self.db, self.read_preference,
set_slave_ok, False, self.codec_options,
set_secondary_ok, False, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
ns = "%s.%s" % (self.db, "$cmd")
@ -699,13 +699,13 @@ if _use_c:
_op_msg_uncompressed = _cmessage._op_msg
def _op_msg(flags, command, dbname, read_preference, slave_ok, check_keys,
def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys,
opts, ctx=None):
"""Get a OP_MSG message."""
command['$db'] = dbname
# getMore commands do not send $readPreference.
if read_preference is not None and "$readPreference" not in command:
if slave_ok and not read_preference.mode:
if secondary_ok and not read_preference.mode:
command["$readPreference"] = (
ReadPreference.PRIMARY_PREFERRED.document)
else:
@ -1675,7 +1675,7 @@ _UNPACK_REPLY = {
def _first_batch(sock_info, db, coll, query, ntoreturn,
slave_ok, codec_options, read_preference, cmd, listeners):
secondary_ok, codec_options, read_preference, cmd, listeners):
"""Simple query helper for retrieving a first (and possibly only) batch."""
query = _Query(
0, db, coll, 0, query, None, codec_options,
@ -1687,7 +1687,7 @@ def _first_batch(sock_info, db, coll, query, ntoreturn,
if publish:
start = datetime.datetime.now()
request_id, msg, max_doc_size = query.get_message(slave_ok, sock_info)
request_id, msg, max_doc_size = query.get_message(secondary_ok, sock_info)
if publish:
encoding_duration = datetime.datetime.now() - start

View File

@ -1105,7 +1105,7 @@ class MongoClient(common.BaseObject):
# another session.
with self._socket_for_reads(
ReadPreference.PRIMARY_PREFERRED,
None) as (sock_info, slave_ok):
None) as (sock_info, secondary_ok):
if not sock_info.supports_sessions:
return
@ -1113,7 +1113,7 @@ class MongoClient(common.BaseObject):
spec = SON([('endSessions',
session_ids[i:i + common._MAX_END_SESSIONS])])
sock_info.command(
'admin', spec, slave_ok=slave_ok, client=self)
'admin', spec, secondary_ok=secondary_ok, client=self)
except PyMongoError:
# Drivers MUST ignore any errors returned by the endSessions
# command.
@ -1217,39 +1217,39 @@ class MongoClient(common.BaseObject):
return self._get_socket(server, session)
@contextlib.contextmanager
def _slaveok_for_server(self, read_preference, server, session):
def _secondaryok_for_server(self, read_preference, server, session):
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
# mongods with topology type Single. If the server type is Mongos,
# follow the rules for passing read preference to mongos, even for
# topology type Single."
# sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must
# be sent to mongods with topology type Single. If the server type is
# Mongos, follow the rules for passing read preference to mongos, even
# for topology type Single."
# Thread safe: if the type is single it cannot change.
topology = self._get_topology()
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
with self._get_socket(server, session) as sock_info:
slave_ok = (single and not sock_info.is_mongos) or (
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
yield sock_info, slave_ok
yield sock_info, secondary_ok
@contextlib.contextmanager
def _socket_for_reads(self, read_preference, session):
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
# mongods with topology type Single. If the server type is Mongos,
# follow the rules for passing read preference to mongos, even for
# topology type Single."
# sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must be
# sent to mongods with topology type Single. If the server type is
# Mongos, follow the rules for passing read preference to mongos, even
# for topology type Single."
# Thread safe: if the type is single it cannot change.
topology = self._get_topology()
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
server = self._select_server(read_preference, session)
with self._get_socket(server, session) as sock_info:
slave_ok = (single and not sock_info.is_mongos) or (
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
yield sock_info, slave_ok
yield sock_info, secondary_ok
def _should_pin_cursor(self, session):
return (self.__options.load_balanced and
@ -1276,9 +1276,9 @@ class MongoClient(common.BaseObject):
operation.sock_mgr.sock, operation, True,
self._event_listeners, unpack_res)
def _cmd(session, server, sock_info, slave_ok):
def _cmd(session, server, sock_info, secondary_ok):
return server.run_operation(
sock_info, operation, slave_ok, self._event_listeners,
sock_info, operation, secondary_ok, self._event_listeners,
unpack_res)
return self._retryable_read(
@ -1375,13 +1375,13 @@ class MongoClient(common.BaseObject):
read_pref, session, address=address)
if not server.description.retryable_reads_supported:
retryable = False
with self._slaveok_for_server(read_pref, server, session) as (
sock_info, slave_ok):
with self._secondaryok_for_server(read_pref, server, session) as (
sock_info, secondary_ok):
if retrying and not retryable:
# A retry is not possible because this server does
# not support retryable reads, raise the last error.
raise last_error
return func(session, server, sock_info, slave_ok)
return func(session, server, sock_info, secondary_ok)
except ServerSelectionTimeoutError:
if retrying:
# The application may think the write was never attempted

View File

@ -38,7 +38,7 @@ from pymongo.socket_checker import _errno_from_exception
_UNPACK_HEADER = struct.Struct("<iiii").unpack
def command(sock_info, dbname, spec, slave_ok, is_mongos,
def command(sock_info, dbname, spec, secondary_ok, is_mongos,
read_preference, codec_options, session, client, check=True,
allowable_errors=None, address=None,
check_keys=False, listeners=None, max_bson_size=None,
@ -56,7 +56,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
- `sock`: a raw socket instance
- `dbname`: name of the database on which to run the command
- `spec`: a command document as an ordered dict type, eg SON.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit
- `is_mongos`: are we connected to a mongos?
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
@ -82,7 +82,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
"""
name = next(iter(spec))
ns = dbname + '.$cmd'
flags = 4 if slave_ok else 0
flags = 4 if secondary_ok else 0
speculative_hello = False
# Publish the original command document, perhaps with lsid and $clusterTime.
@ -116,7 +116,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
request_id, msg, size, max_doc_size = message._op_msg(
flags, spec, dbname, read_preference, slave_ok, check_keys,
flags, spec, dbname, read_preference, secondary_ok, check_keys,
codec_options, ctx=compression_ctx)
# If this is an unacknowledged write then make sure the encoded doc(s)
# are small enough, otherwise rely on the server to return an error.

View File

@ -642,7 +642,7 @@ class SocketInfo(object):
response_doc.pop('serviceId', None)
return response_doc
def command(self, dbname, spec, slave_ok=False,
def command(self, dbname, spec, secondary_ok=False,
read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS, check=True,
allowable_errors=None, check_keys=False,
@ -661,7 +661,7 @@ class SocketInfo(object):
:Parameters:
- `dbname`: name of the database on which to run the command
- `spec`: a command document as a dict, SON, or mapping object
- `slave_ok`: whether to set the SlaveOkay wire protocol bit
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
- `check`: raise OperationFailure if there are errors
@ -715,7 +715,7 @@ class SocketInfo(object):
if self.op_msg_enabled:
self._raise_if_not_writable(unacknowledged)
try:
return command(self, dbname, spec, slave_ok,
return command(self, dbname, spec, secondary_ok,
self.is_mongos, read_preference, codec_options,
session, client, check, allowable_errors,
self.address, check_keys, listeners,

View File

@ -68,7 +68,7 @@ class Server(object):
"""Check the server's state soon."""
self._monitor.request_check()
def run_operation(self, sock_info, operation, set_slave_okay, listeners,
def run_operation(self, sock_info, operation, set_secondary_okay, listeners,
unpack_res):
"""Run a _Query or _GetMore operation and return a Response object.
@ -78,7 +78,7 @@ class Server(object):
:Parameters:
- `operation`: A _Query or _GetMore object.
- `set_slave_okay`: Pass to operation.get_message.
- `set_secondary_okay`: Pass to operation.get_message.
- `all_credentials`: dict, maps auth source to MongoCredential.
- `listeners`: Instance of _EventListeners or None.
- `unpack_res`: A callable that decodes the wire protocol response.
@ -95,7 +95,7 @@ class Server(object):
request_id = 0
else:
message = operation.get_message(
set_slave_okay, sock_info, use_cmd)
set_secondary_okay, sock_info, use_cmd)
request_id, data, max_doc_size = self._split_message(message)
if publish:

View File

@ -133,7 +133,7 @@ class TestReadPreferencesBase(IntegrationTest):
used, expected))
class TestSingleSlaveOk(TestReadPreferencesBase):
class TestSingleSecondaryOk(TestReadPreferencesBase):
def test_reads_from_secondary(self):
@ -313,17 +313,17 @@ class ReadPrefTester(MongoClient):
def _socket_for_reads(self, read_preference, session):
context = super(ReadPrefTester, self)._socket_for_reads(
read_preference, session)
with context as (sock_info, slave_ok):
with context as (sock_info, secondary_ok):
self.record_a_read(sock_info.address)
yield sock_info, slave_ok
yield sock_info, secondary_ok
@contextlib.contextmanager
def _slaveok_for_server(self, read_preference, server, session):
context = super(ReadPrefTester, self)._slaveok_for_server(
def _secondaryok_for_server(self, read_preference, server, session):
context = super(ReadPrefTester, self)._secondaryok_for_server(
read_preference, server, session)
with context as (sock_info, slave_ok):
with context as (sock_info, secondary_ok):
self.record_a_read(sock_info.address)
yield sock_info, slave_ok
yield sock_info, secondary_ok
def record_a_read(self, address):
server = self._get_topology().select_server_by_address(address, 0)

View File

@ -138,6 +138,7 @@ class TestServerDescription(unittest.TestCase):
self.assertEqual(SERVER_TYPE.Standalone, s.server_type)
# Mongod started with --slave.
# master-slave replication was removed in MongoDB 4.0.
s = parse_ismaster_response({'ok': 1, 'ismaster': False})
self.assertEqual(SERVER_TYPE.Standalone, s.server_type)
self.assertTrue(s.is_writable)

View File

@ -160,7 +160,9 @@ class TestSingleServerTopology(TopologyTest):
'ismaster': True,
'maxWireVersion': 6}),
# Slave.
# A "slave" in a master-slave deployment.
# This replication type was removed in MongoDB
# 4.0.
(SERVER_TYPE.Standalone, {
'ok': 1,
'ismaster': False,

View File

@ -693,27 +693,6 @@ def server_started_with_auth(client):
return '--auth' in argv or '--keyFile' in argv
def server_started_with_nojournal(client):
command_line = get_command_line(client)
# MongoDB 2.6.
if 'parsed' in command_line:
parsed = command_line['parsed']
if 'storage' in parsed:
storage = parsed['storage']
if 'journal' in storage:
return not storage['journal']['enabled']
return server_started_with_option(client, '--nojournal', 'nojournal')
def server_is_master_with_slave(client):
command_line = get_command_line(client)
if 'parsed' in command_line:
return command_line['parsed'].get('master', False)
return '--master' in command_line['argv']
def drop_collections(db):
# Drop all non-system collections in this database.
for coll in db.list_collection_names(