PYTHON-1864 PYTHON-2931 Spec complaint $readPreference (#809)

Stop sending $readPreference to standalone servers.
Stop sending $readPreference primary because it's the server default.
Remove outdated secondary_ok flag.
This commit is contained in:
Shane Harvey 2021-12-13 15:47:34 -08:00 committed by GitHub
parent b2f3c66575
commit ff3a8b44dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 123 additions and 121 deletions

View File

@ -92,11 +92,6 @@ class _AggregationCommand(object):
"""The database against which the aggregation command is run."""
raise NotImplementedError
def _process_result(self, result, session, server, sock_info, secondary_ok):
if self._result_processor:
self._result_processor(
result, session, server, sock_info, secondary_ok)
def get_read_preference(self, session):
if self._write_preference:
return self._write_preference
@ -105,7 +100,7 @@ class _AggregationCommand(object):
self._write_preference = pref = _AggWritePref(pref)
return pref
def get_cursor(self, session, server, sock_info, secondary_ok):
def get_cursor(self, session, server, sock_info, read_preference):
# Serialize command.
cmd = SON([("aggregate", self._aggregation_target),
("pipeline", self._pipeline)])
@ -134,8 +129,7 @@ class _AggregationCommand(object):
result = sock_info.command(
self._database.name,
cmd,
secondary_ok,
self.get_read_preference(session),
read_preference,
self._target.codec_options,
parse_write_concern_error=True,
read_concern=read_concern,
@ -145,7 +139,8 @@ class _AggregationCommand(object):
client=self._database.client,
user_fields=self._user_fields)
self._process_result(result, session, server, sock_info, secondary_ok)
if self._result_processor:
self._result_processor(result, sock_info)
# Extract cursor from result or mock/fake one if necessary.
if 'cursor' in result:

View File

@ -148,7 +148,7 @@ class ChangeStream(object):
full_pipeline.extend(self._pipeline)
return full_pipeline
def _process_result(self, result, session, server, sock_info, secondary_ok):
def _process_result(self, result, sock_info):
"""Callback that caches the postBatchResumeToken or
startAtOperationTime from a changeStream aggregate command response
containing an empty batch of change documents.

View File

@ -186,7 +186,7 @@ class Collection(common.BaseObject):
def _socket_for_writes(self, session):
return self.__database.client._socket_for_writes(session)
def _command(self, sock_info, command, secondary_ok=False,
def _command(self, sock_info, command,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
@ -200,7 +200,6 @@ class Collection(common.BaseObject):
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
@ -226,7 +225,6 @@ class Collection(common.BaseObject):
return sock_info.command(
self.__database.name,
command,
secondary_ok,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
@ -1356,14 +1354,14 @@ class Collection(common.BaseObject):
return RawBatchCursor(self, *args, **kwargs)
def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
def _count_cmd(self, session, sock_info, read_preference, cmd, collation):
"""Internal count command helper."""
# XXX: "ns missing" checks can be removed when we drop support for
# MongoDB 3.0, see SERVER-17051.
res = self._command(
sock_info,
cmd,
secondary_ok,
read_preference=read_preference,
allowable_errors=["ns missing"],
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
@ -1374,12 +1372,12 @@ class Collection(common.BaseObject):
return int(res["n"])
def _aggregate_one_result(
self, sock_info, secondary_ok, cmd, collation, session):
self, sock_info, read_preference, cmd, collation, session):
"""Internal helper to run an aggregate that returns a single result."""
result = self._command(
sock_info,
cmd,
secondary_ok,
read_preference,
allowable_errors=[26], # Ignore NamespaceNotFound.
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
@ -1413,7 +1411,7 @@ class Collection(common.BaseObject):
raise ConfigurationError(
'estimated_document_count does not support sessions')
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
if sock_info.max_wire_version >= 12:
# MongoDB 4.9+
pipeline = [
@ -1425,7 +1423,8 @@ class Collection(common.BaseObject):
('cursor', {})])
cmd.update(kwargs)
result = self._aggregate_one_result(
sock_info, secondary_ok, cmd, collation=None, session=session)
sock_info, read_preference, cmd, collation=None,
session=session)
if not result:
return 0
return int(result['n'])
@ -1433,7 +1432,8 @@ class Collection(common.BaseObject):
# MongoDB < 4.9
cmd = SON([('count', self.__name)])
cmd.update(kwargs)
return self._count_cmd(None, sock_info, secondary_ok, cmd, None)
return self._count_cmd(
None, sock_info, read_preference, cmd, collation=None)
return self.__database.client._retryable_read(
_cmd, self.read_preference, None)
@ -1506,9 +1506,9 @@ class Collection(common.BaseObject):
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
result = self._aggregate_one_result(
sock_info, secondary_ok, cmd, collation, session)
sock_info, read_preference, cmd, collation, session)
if not result:
return 0
return result['n']
@ -1799,12 +1799,12 @@ class Collection(common.BaseObject):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, secondary_ok,
read_pref,
cursor = self._command(sock_info, cmd,
read_preference,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
@ -2220,9 +2220,10 @@ class Collection(common.BaseObject):
kwargs["query"] = filter
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
return self._command(
sock_info, cmd, secondary_ok, read_concern=self.read_concern,
sock_info, cmd, read_preference=read_preference,
read_concern=self.read_concern,
collation=collation, session=session,
user_fields={"values": 1})["values"]

View File

@ -492,7 +492,7 @@ class Database(common.BaseObject):
batch_size, collation, start_at_operation_time, session,
start_after)
def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
def _command(self, sock_info, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS,
write_concern=None,
@ -506,7 +506,6 @@ class Database(common.BaseObject):
return sock_info.command(
self.__name,
command,
secondary_ok,
read_preference,
codec_options,
check,
@ -605,8 +604,8 @@ class Database(common.BaseObject):
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
read_preference, session) as (sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
read_preference, session) as (sock_info, read_preference):
return self._command(sock_info, command, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
@ -618,16 +617,15 @@ class Database(common.BaseObject):
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
def _cmd(session, server, sock_info, read_preference):
return self._command(sock_info, command, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
return self.__client._retryable_read(
_cmd, read_preference, session)
def _list_collections(self, sock_info, secondary_okay, session,
read_preference, **kwargs):
def _list_collections(self, sock_info, session, read_preference, **kwargs):
"""Internal listCollections helper."""
coll = self.get_collection(
@ -638,7 +636,7 @@ class Database(common.BaseObject):
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, secondary_okay,
sock_info, cmd,
read_preference=read_preference,
session=tmp_session)["cursor"]
cmd_cursor = CommandCursor(
@ -674,9 +672,9 @@ class Database(common.BaseObject):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, secondary_okay):
def _cmd(session, server, sock_info, read_preference):
return self._list_collections(
sock_info, secondary_okay, session, read_preference=read_pref,
sock_info, session, read_preference=read_preference,
**kwargs)
return self.__client._retryable_read(

View File

@ -316,9 +316,9 @@ class _Query(object):
self._as_command = cmd, self.db
return self._as_command
def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
def get_message(self, read_preference, sock_info, use_cmd=False):
"""Get a query message, possibly setting the secondaryOk bit."""
if set_secondary_ok:
if read_preference.mode:
# Set the secondaryOk bit.
flags = self.flags | 4
else:
@ -330,8 +330,7 @@ class _Query(object):
if use_cmd:
spec = self.as_command(sock_info)[0]
request_id, msg, size, _ = _op_msg(
0, spec, self.db, self.read_preference,
set_secondary_ok, self.codec_options,
0, spec, self.db, read_preference, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
@ -346,8 +345,7 @@ class _Query(object):
ntoreturn = self.limit
if sock_info.is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
spec = _maybe_add_read_preference(spec, read_preference)
return _query(flags, ns, self.ntoskip, ntoreturn,
spec, None if use_cmd else self.fields,
@ -429,8 +427,7 @@ class _GetMore(object):
else:
flags = 0
request_id, msg, size, _ = _op_msg(
flags, spec, self.db, None,
False, self.codec_options,
flags, spec, self.db, None, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size
@ -572,16 +569,13 @@ if _use_c:
_op_msg_uncompressed = _cmessage._op_msg
def _op_msg(flags, command, dbname, read_preference, secondary_ok,
opts, ctx=None):
def _op_msg(flags, command, dbname, read_preference, opts, ctx=None):
"""Get a OP_MSG message."""
command['$db'] = dbname
# getMore commands do not send $readPreference.
if read_preference is not None and "$readPreference" not in command:
if secondary_ok and not read_preference.mode:
command["$readPreference"] = (
ReadPreference.PRIMARY_PREFERRED.document)
else:
# Only send $readPreference if it's not primary (the default).
if read_preference.mode:
command["$readPreference"] = read_preference.document
name = next(iter(command))
try:

View File

@ -1025,7 +1025,7 @@ class MongoClient(common.BaseObject):
# another session.
with self._socket_for_reads(
ReadPreference.PRIMARY_PREFERRED,
None) as (sock_info, secondary_ok):
None) as (sock_info, read_pref):
if not sock_info.supports_sessions:
return
@ -1033,7 +1033,7 @@ class MongoClient(common.BaseObject):
spec = SON([('endSessions',
session_ids[i:i + common._MAX_END_SESSIONS])])
sock_info.command(
'admin', spec, secondary_ok=secondary_ok, client=self)
'admin', spec, read_preference=read_pref, client=self)
except PyMongoError:
# Drivers MUST ignore any errors returned by the endSessions
# command.
@ -1136,39 +1136,33 @@ class MongoClient(common.BaseObject):
return self._get_socket(server, session)
@contextlib.contextmanager
def _secondaryok_for_server(self, read_preference, server, session):
def _socket_from_server(self, read_preference, server, session):
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must
# be sent to mongods with topology type Single. If the server type is
# Mongos, follow the rules for passing read preference to mongos, even
# for topology type Single."
# sock_info with the effective read preference. The Server Selection
# Spec says not to send any $readPreference to standalones and to
# always send primaryPreferred when directly connected to a repl set
# member.
# Thread safe: if the type is single it cannot change.
topology = self._get_topology()
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
with self._get_socket(server, session) as sock_info:
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference.mode != ReadPreference.PRIMARY.mode)
yield sock_info, secondary_ok
if single:
if sock_info.is_repl:
# Use primary preferred to ensure any repl set member
# can handle the request.
read_preference = ReadPreference.PRIMARY_PREFERRED
elif sock_info.is_standalone:
# Don't send read preference to standalones.
read_preference = ReadPreference.PRIMARY
yield sock_info, read_preference
@contextlib.contextmanager
def _socket_for_reads(self, read_preference, session):
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must be
# sent to mongods with topology type Single. If the server type is
# Mongos, follow the rules for passing read preference to mongos, even
# for topology type Single."
# Thread safe: if the type is single it cannot change.
topology = self._get_topology()
server = self._select_server(read_preference, session)
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
with self._get_socket(server, session) as sock_info:
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
yield sock_info, secondary_ok
return self._socket_from_server(read_preference, server, session)
def _should_pin_cursor(self, session):
return (self.__options.load_balanced and
@ -1195,9 +1189,9 @@ class MongoClient(common.BaseObject):
operation.sock_mgr.sock, operation, True,
self._event_listeners, unpack_res)
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
return server.run_operation(
sock_info, operation, secondary_ok, self._event_listeners,
sock_info, operation, read_preference, self._event_listeners,
unpack_res)
return self._retryable_read(
@ -1292,13 +1286,13 @@ class MongoClient(common.BaseObject):
try:
server = self._select_server(
read_pref, session, address=address)
with self._secondaryok_for_server(read_pref, server, session) as (
sock_info, secondary_ok):
with self._socket_from_server(read_pref, server, session) as (
sock_info, read_pref):
if retrying and not retryable:
# A retry is not possible because this server does
# not support retryable reads, raise the last error.
raise last_error
return func(session, server, sock_info, secondary_ok)
return func(session, server, sock_info, read_pref)
except ServerSelectionTimeoutError:
if retrying:
# The application may think the write was never attempted

View File

@ -38,7 +38,7 @@ from pymongo.socket_checker import _errno_from_exception
_UNPACK_HEADER = struct.Struct("<iiii").unpack
def command(sock_info, dbname, spec, secondary_ok, is_mongos,
def command(sock_info, dbname, spec, is_mongos,
read_preference, codec_options, session, client, check=True,
allowable_errors=None, address=None,
listeners=None, max_bson_size=None,
@ -56,7 +56,6 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
- `sock`: a raw socket instance
- `dbname`: name of the database on which to run the command
- `spec`: a command document as an ordered dict type, eg SON.
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit
- `is_mongos`: are we connected to a mongos?
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
@ -81,7 +80,6 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
"""
name = next(iter(spec))
ns = dbname + '.$cmd'
flags = 4 if secondary_ok else 0
speculative_hello = False
# Publish the original command document, perhaps with lsid and $clusterTime.
@ -112,7 +110,7 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
request_id, msg, size, max_doc_size = message._op_msg(
flags, spec, dbname, read_preference, secondary_ok,
flags, spec, dbname, read_preference,
codec_options, ctx=compression_ctx)
# If this is an unacknowledged write then make sure the encoded doc(s)
# are small enough, otherwise rely on the server to return an error.
@ -121,8 +119,7 @@ def command(sock_info, dbname, spec, secondary_ok, is_mongos,
message._raise_document_too_large(name, size, max_bson_size)
else:
request_id, msg, size = message._query(
flags, ns, 0, -1, spec, None, codec_options,
compression_ctx)
0, ns, 0, -1, spec, None, codec_options, compression_ctx)
if (max_bson_size is not None
and size > max_bson_size + message._COMMAND_OVERHEAD):

View File

@ -608,6 +608,10 @@ class SocketInfo(object):
self.supports_sessions = (
hello.logical_session_timeout_minutes is not None)
self.hello_ok = hello.hello_ok
self.is_repl = hello.server_type in (
SERVER_TYPE.RSPrimary, SERVER_TYPE.RSSecondary,
SERVER_TYPE.RSArbiter, SERVER_TYPE.RSOther, SERVER_TYPE.RSGhost)
self.is_standalone = hello.server_type == SERVER_TYPE.Standalone
self.is_mongos = hello.server_type == SERVER_TYPE.Mongos
if performing_handshake and self.compression_settings:
ctx = self.compression_settings.get_compression_context(
@ -641,7 +645,7 @@ class SocketInfo(object):
response_doc.pop('serviceId', None)
return response_doc
def command(self, dbname, spec, secondary_ok=False,
def command(self, dbname, spec,
read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS, check=True,
allowable_errors=None,
@ -660,7 +664,6 @@ class SocketInfo(object):
:Parameters:
- `dbname`: name of the database on which to run the command
- `spec`: a command document as a dict, SON, or mapping object
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit
- `read_preference`: a read preference
- `codec_options`: a CodecOptions instance
- `check`: raise OperationFailure if there are errors
@ -703,7 +706,7 @@ class SocketInfo(object):
if self.op_msg_enabled:
self._raise_if_not_writable(unacknowledged)
try:
return command(self, dbname, spec, secondary_ok,
return command(self, dbname, spec,
self.is_mongos, read_preference, codec_options,
session, client, check, allowable_errors,
self.address, listeners,

View File

@ -68,7 +68,7 @@ class Server(object):
"""Check the server's state soon."""
self._monitor.request_check()
def run_operation(self, sock_info, operation, set_secondary_okay, listeners,
def run_operation(self, sock_info, operation, read_preference, listeners,
unpack_res):
"""Run a _Query or _GetMore operation and return a Response object.
@ -95,7 +95,7 @@ class Server(object):
request_id = 0
else:
message = operation.get_message(
set_secondary_okay, sock_info, use_cmd)
read_preference, sock_info, use_cmd)
request_id, data, max_doc_size = self._split_message(message)
if publish:

View File

@ -237,7 +237,7 @@
}
],
"$readPreference": {
"mode": "primary"
"$$exists": false
},
"readConcern": {
"level": "local"
@ -425,7 +425,7 @@
}
],
"$readPreference": {
"mode": "primary"
"$$exists": false
},
"readConcern": {
"level": "local"

View File

@ -222,7 +222,7 @@
}
],
"$readPreference": {
"mode": "primary"
"$$exists": false
},
"readConcern": {
"level": "local"
@ -416,7 +416,7 @@
}
],
"$readPreference": {
"mode": "primary"
"$$exists": false
},
"readConcern": {
"level": "local"

View File

@ -79,8 +79,11 @@ def create_mongos_read_mode_test(mode, operation):
slave_ok = False
elif operation.op_type == 'may-use-secondary':
slave_ok = mode != 'primary'
self.assertEqual(pref.document,
request.doc.get('$readPreference'))
actual_pref = request.doc.get('$readPreference')
if mode == 'primary':
self.assertIsNone(actual_pref)
else:
self.assertEqual(pref.document, actual_pref)
else:
self.fail('unrecognized op_type %r' % operation.op_type)

View File

@ -148,24 +148,31 @@ def create_op_msg_read_mode_test(mode, operation):
expected_pref = ReadPreference.SECONDARY
elif operation.op_type == 'must-use-primary':
expected_server = self.primary
expected_pref = ReadPreference.PRIMARY
expected_pref = None
elif operation.op_type == 'may-use-secondary':
if mode in ('primary', 'primaryPreferred'):
if mode == 'primary':
expected_server = self.primary
expected_pref = None
elif mode == 'primaryPreferred':
expected_server = self.primary
expected_pref = pref
else:
expected_server = self.secondary
expected_pref = pref
expected_pref = pref
else:
self.fail('unrecognized op_type %r' % operation.op_type)
# For single mongod we send primaryPreferred instead of primary.
if expected_pref == ReadPreference.PRIMARY and self.single_mongod:
expected_pref = ReadPreference.PRIMARY_PREFERRED
# For single mongod we omit the read preference.
if self.single_mongod:
expected_pref = None
with going(operation.function, client):
request = expected_server.receive()
request.reply(operation.reply)
self.assertEqual(expected_pref.document,
request.doc.get('$readPreference'))
actual_pref = request.doc.get('$readPreference')
if expected_pref:
self.assertEqual(expected_pref.document, actual_pref)
else:
self.assertIsNone(actual_pref)
self.assertNotIn('$query', request.doc)
return test

View File

@ -47,17 +47,18 @@ class TestQueryAndReadModeSharded(unittest.TestCase):
SecondaryPreferred([{'tag': 'value'}]),)
for query in ({'a': 1}, {'$query': {'a': 1}},):
for mode in read_prefs:
for pref in read_prefs:
collection = client.db.get_collection('test',
read_preference=mode)
read_preference=pref)
cursor = collection.find(query.copy())
with going(next, cursor):
request = server.receives()
# Command is not nested in $query.
request.assert_matches(OpMsg(
SON([('find', 'test'),
('filter', {'a': 1}),
('$readPreference', mode.document)])))
expected_cmd = SON([('find', 'test'),
('filter', {'a': 1})])
if pref.mode:
expected_cmd['$readPreference'] = pref.document
request.assert_matches(OpMsg(expected_cmd))
request.replies({'cursor': {'id': 0, 'firstBatch': [{}]}})

View File

@ -36,6 +36,7 @@ from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import (client_context,
unittest,
@ -1257,7 +1258,9 @@ class TestCursor(IntegrationTest):
client = rs_or_single_client(
event_listeners=[listener])
self.addCleanup(client.close)
coll = client[self.db.name].test
# We never send primary read preference so override the default.
coll = client[self.db.name].get_collection(
'test', read_preference=ReadPreference.PRIMARY_PREFERRED)
coll.delete_many({})
coll.insert_many([{} for _ in range(5)])
@ -1267,7 +1270,10 @@ class TestCursor(IntegrationTest):
started = listener.results['started']
self.assertEqual(2, len(started))
self.assertEqual('find', started[0].command_name)
self.assertIn('$readPreference', started[0].command)
if client_context.is_rs or client_context.is_mongos:
self.assertIn('$readPreference', started[0].command)
else:
self.assertNotIn('$readPreference', started[0].command)
self.assertEqual('getMore', started[1].command_name)
self.assertNotIn('$readPreference', started[1].command)

View File

@ -309,17 +309,17 @@ class ReadPrefTester(MongoClient):
def _socket_for_reads(self, read_preference, session):
context = super(ReadPrefTester, self)._socket_for_reads(
read_preference, session)
with context as (sock_info, secondary_ok):
with context as (sock_info, read_preference):
self.record_a_read(sock_info.address)
yield sock_info, secondary_ok
yield sock_info, read_preference
@contextlib.contextmanager
def _secondaryok_for_server(self, read_preference, server, session):
context = super(ReadPrefTester, self)._secondaryok_for_server(
def _socket_from_server(self, read_preference, server, session):
context = super(ReadPrefTester, self)._socket_from_server(
read_preference, server, session)
with context as (sock_info, secondary_ok):
with context as (sock_info, read_preference):
self.record_a_read(sock_info.address)
yield sock_info, secondary_ok
yield sock_info, read_preference
def record_a_read(self, address):
server = self._get_topology().select_server_by_address(address, 0)
@ -597,8 +597,11 @@ class TestMongosAndReadPreference(IntegrationTest):
started = listener.results['started']
self.assertEqual(len(started), 1, started)
cmd = started[0].command
self.assertIn('$readPreference', cmd)
self.assertEqual(cmd['$readPreference'], pref.document)
if client_context.is_rs or client_context.is_mongos:
self.assertIn('$readPreference', cmd)
self.assertEqual(cmd['$readPreference'], pref.document)
else:
self.assertNotIn('$readPreference', cmd)
def test_maybe_add_read_preference(self):