PYTHON-1508 Add read_preference to TransactionOptions

All read operations use the transaction's read preference.
Add transaction read preference spec tests.
Remove write test in test_read_preferences.
This commit is contained in:
Shane Harvey 2018-04-30 09:04:05 -07:00
parent 04693ae33a
commit 5a652be993
16 changed files with 1012 additions and 353 deletions

View File

@ -95,7 +95,7 @@ from pymongo.errors import (ConfigurationError,
InvalidOperation,
OperationFailure)
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.write_concern import WriteConcern
@ -159,9 +159,11 @@ class TransactionOptions(object):
.. versionadded:: 3.7
"""
def __init__(self, read_concern=None, write_concern=None):
def __init__(self, read_concern=None, write_concern=None,
read_preference=None):
self._read_concern = read_concern
self._write_concern = write_concern
self._read_preference = read_preference
if read_concern is not None:
if not isinstance(read_concern, ReadConcern):
raise TypeError("read_concern must be an instance of "
@ -176,6 +178,11 @@ class TransactionOptions(object):
raise ConfigurationError(
"transactions must use an acknowledged write concern, "
"not: %r" % (write_concern,))
if read_preference is not None:
if not isinstance(read_preference, _ServerMode):
raise TypeError("%r is not valid for read_preference. See "
"pymongo.read_preferences for valid "
"options." % (read_preference,))
@property
def read_concern(self):
@ -187,6 +194,11 @@ class TransactionOptions(object):
"""This transaction's :class:`~write_concern.WriteConcern`."""
return self._write_concern
@property
def read_preference(self):
"""This transaction's :class:`~read_preference.ReadPreference`."""
return self._read_preference
class _TransactionContext(object):
"""Internal transaction context manager for start_transaction."""
@ -294,7 +306,8 @@ class ClientSession(object):
return val
return getattr(self.client, name)
def start_transaction(self, read_concern=None, write_concern=None):
def start_transaction(self, read_concern=None, write_concern=None,
read_preference=None):
"""Start a multi-statement transaction.
Takes the same arguments as :class:`TransactionOptions`.
@ -308,9 +321,11 @@ class ClientSession(object):
read_concern = self._inherit_option("read_concern", read_concern)
write_concern = self._inherit_option("write_concern", write_concern)
read_preference = self._inherit_option(
"read_preference", read_preference)
self._transaction = _Transaction(TransactionOptions(
read_concern=read_concern, write_concern=write_concern))
read_concern, write_concern, read_preference))
self._server_session._transaction_id += 1
return _TransactionContext(self)
@ -342,13 +357,16 @@ class ClientSession(object):
# Not really started.
return
# TODO: retryable. And it's weird to pass parse_write_concern_error
# from outside database.py.
self._client.admin.command(
command_name,
session=self,
write_concern=self._transaction.opts.write_concern,
parse_write_concern_error=True)
# TODO: commitTransaction should be a retryable write.
# Use _command directly because commit/abort are writes and must
# always go to the primary.
with self._client._socket_for_writes() as sock_info:
return self._client.admin._command(
sock_info,
command_name,
session=self,
write_concern=self._transaction.opts.write_concern,
parse_write_concern_error=True)
finally:
self._transaction = None
@ -415,6 +433,12 @@ class ClientSession(object):
return True
return False
def _txn_read_preference(self):
"""Return read preference of this transaction or None."""
if self._in_transaction_or_auto_start():
return self._transaction.opts.read_preference
return None
def _apply_to(self, command, is_retryable, read_preference):
self._check_ended()
self._in_transaction_or_auto_start()

View File

@ -14,7 +14,6 @@
"""Collection level utilities for Mongo."""
import collections
import datetime
import warnings
@ -184,11 +183,14 @@ class Collection(common.BaseObject):
unicode_decode_error_handler='replace',
document_class=dict)
def _socket_for_reads(self):
return self.__database.client._socket_for_reads(self.read_preference)
def _socket_for_reads(self, session):
return self.__database.client._socket_for_reads(
self._read_preference_for(session))
def _socket_for_primary_reads(self):
return self.__database.client._socket_for_reads(ReadPreference.PRIMARY)
def _socket_for_primary_reads(self, session):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
return self.__database.client._socket_for_reads(read_pref)
def _socket_for_writes(self):
return self.__database.client._socket_for_writes()
@ -198,7 +200,6 @@ class Collection(common.BaseObject):
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
write_concern=None,
parse_write_concern_error=False,
collation=None,
session=None,
retryable_write=False):
@ -217,8 +218,6 @@ class Collection(common.BaseObject):
- `write_concern`: An instance of
:class:`~pymongo.write_concern.WriteConcern`. This option is only
valid for MongoDB 3.4 and above.
- `parse_write_concern_error` (optional): Whether to parse a
``writeConcernError`` field in the command response.
- `collation` (optional) - An instance of
:class:`~pymongo.collation.Collation`.
- `session` (optional): a
@ -232,13 +231,13 @@ class Collection(common.BaseObject):
self.__database.name,
command,
slave_ok,
read_preference or self.read_preference,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern,
write_concern=write_concern,
parse_write_concern_error=parse_write_concern_error,
parse_write_concern_error=True,
collation=collation,
session=s,
client=self.__database.client,
@ -256,7 +255,6 @@ class Collection(common.BaseObject):
self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True,
collation=collation, session=session)
def __getattr__(self, name):
@ -1493,7 +1491,7 @@ class Collection(common.BaseObject):
('numCursors', num_cursors)])
cmd.update(kwargs)
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session) as (sock_info, slave_ok):
result = self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern,
session=session)
@ -1509,7 +1507,7 @@ class Collection(common.BaseObject):
def _count(self, cmd, collation=None, session=None):
"""Internal count helper."""
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session) as (sock_info, slave_ok):
res = self._command(
sock_info, cmd, slave_ok,
allowable_errors=["ns missing"],
@ -1628,7 +1626,6 @@ class Collection(common.BaseObject):
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
write_concern=self.write_concern,
parse_write_concern_error=True,
session=session)
return names
@ -1660,7 +1657,6 @@ class Collection(common.BaseObject):
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
write_concern=self.write_concern,
parse_write_concern_error=True,
session=session)
def create_index(self, keys, session=None, **kwargs):
@ -1880,7 +1876,6 @@ class Collection(common.BaseObject):
read_preference=ReadPreference.PRIMARY,
allowable_errors=["ns not found"],
write_concern=self.write_concern,
parse_write_concern_error=True,
session=session)
def reindex(self, session=None, **kwargs):
@ -1914,7 +1909,7 @@ class Collection(common.BaseObject):
with self._socket_for_writes() as sock_info:
return self._command(
sock_info, cmd, read_preference=ReadPreference.PRIMARY,
parse_write_concern_error=True, session=session)
session=session)
def list_indexes(self, session=None):
"""Get a cursor over the index documents for this collection.
@ -1940,13 +1935,15 @@ class Collection(common.BaseObject):
codec_options = CodecOptions(SON)
coll = self.with_options(codec_options=codec_options,
read_preference=ReadPreference.PRIMARY)
with self._socket_for_primary_reads() as (sock_info, slave_ok):
with self._socket_for_primary_reads(session) as (sock_info, slave_ok):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
if sock_info.max_wire_version > 2:
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, slave_ok,
ReadPreference.PRIMARY,
read_pref,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
@ -1962,7 +1959,7 @@ class Collection(common.BaseObject):
res = message._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, slave_ok, codec_options,
ReadPreference.PRIMARY, cmd,
read_pref, cmd,
self.database.client._event_listeners)
cursor = res["cursor"]
# Note that a collection can only have 64 indexes, so there
@ -2061,7 +2058,7 @@ class Collection(common.BaseObject):
"batchSize", kwargs.pop("batchSize", None))
# If the server does not support the "cursor" option we
# ignore useCursor and batchSize.
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session) as (sock_info, slave_ok):
dollar_out = pipeline and '$out' in pipeline[-1]
if use_cursor:
if "cursor" not in kwargs:
@ -2092,9 +2089,9 @@ class Collection(common.BaseObject):
self.__database.name,
cmd,
slave_ok,
self.read_preference,
self._read_preference_for(session),
self.codec_options,
parse_write_concern_error=dollar_out,
parse_write_concern_error=True,
read_concern=read_concern,
collation=collation,
session=session,
@ -2350,7 +2347,7 @@ class Collection(common.BaseObject):
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session=None) as (sock_info, slave_ok):
return self._command(sock_info, cmd, slave_ok,
collation=collation)["retval"]
@ -2451,7 +2448,7 @@ class Collection(common.BaseObject):
kwargs["query"] = filter
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session) as (sock_info, slave_ok):
return self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern,
collation=collation, session=session)["values"]
@ -2523,24 +2520,21 @@ class Collection(common.BaseObject):
cmd.update(kwargs)
inline = 'inline' in cmd['out']
with self._socket_for_primary_reads() as (sock_info, slave_ok):
with self._socket_for_primary_reads(session) as (sock_info, slave_ok):
if (sock_info.max_wire_version >= 5 and self.write_concern and
not inline):
cmd['writeConcern'] = self.write_concern.document
cmd.update(kwargs)
if (sock_info.max_wire_version >= 4 and 'readConcern' not in cmd and
inline):
# No need to parse 'writeConcernError' here, since the command
# is an inline map reduce.
response = self._command(
sock_info, cmd, slave_ok, ReadPreference.PRIMARY,
read_concern=self.read_concern,
collation=collation, session=session)
read_concern = self.read_concern
else:
response = self._command(
sock_info, cmd, slave_ok, ReadPreference.PRIMARY,
parse_write_concern_error=not inline,
collation=collation, session=session)
read_concern = None
response = self._command(
sock_info, cmd, slave_ok, self._read_preference_for(session),
read_concern=read_concern,
collation=collation, session=session)
if full_response or not response.get('result'):
return response
@ -2592,7 +2586,7 @@ class Collection(common.BaseObject):
("out", {"inline": 1})])
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
with self._socket_for_reads() as (sock_info, slave_ok):
with self._socket_for_reads(session) as (sock_info, slave_ok):
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
res = self._command(sock_info, cmd, slave_ok,
read_concern=self.read_concern,

View File

@ -223,13 +223,14 @@ class CommandCursor(object):
if self.__id: # Get More
dbname, collname = self.__ns.split('.', 1)
read_pref = self.__collection._read_preference_for(self.session)
self.__send_message(
self._getmore_class(dbname,
collname,
self.__batch_size,
self.__id,
self.__collection.codec_options,
self.__collection.read_preference,
read_pref,
self.__session,
self.__collection.database.client,
self.__max_await_time_ms))

View File

@ -674,6 +674,14 @@ class BaseObject(object):
"""
return self.__read_preference
def _read_preference_for(self, session):
"""Read only access to the read preference of this instance or session.
"""
# Override this operation's read preference with the transaction's.
if session:
return session._txn_read_preference() or self.__read_preference
return self.__read_preference
@property
def read_concern(self):
"""Read only access to the :class:`~pymongo.read_concern.ReadConcern`

View File

@ -211,12 +211,11 @@ class Cursor(object):
self.__retrieved = 0
self.__codec_options = collection.codec_options
self.__read_preference = collection.read_preference
# Read preference is set when the initial find is sent.
self.__read_preference = None
self.__read_concern = collection.read_concern
self.__query_flags = cursor_type
if self.__read_preference != ReadPreference.PRIMARY:
self.__query_flags |= _QUERY_OPTIONS["slave_okay"]
if no_cursor_timeout:
self.__query_flags |= _QUERY_OPTIONS["no_timeout"]
if allow_partial_results:
@ -918,16 +917,9 @@ class Cursor(object):
def duration(): return datetime.datetime.now() - start
if operation:
kwargs = {
"read_preference": self.__read_preference,
"exhaust": self.__exhaust,
}
if self.__address is not None:
kwargs["address"] = self.__address
try:
response = client._send_message_with_response(operation,
**kwargs)
response = client._send_message_with_response(
operation, exhaust=self.__exhaust, address=self.__address)
self.__address = response.address
if self.__exhaust:
# 'response' is an ExhaustResponse.
@ -1060,6 +1052,13 @@ class Cursor(object):
def _unpack_response(self, response, cursor_id, codec_options):
return response.unpack_response(cursor_id, codec_options)
def _read_preference(self):
if self.__read_preference is None:
# Save the read preference for getMore commands.
self.__read_preference = self.__collection._read_preference_for(
self.session)
return self.__read_preference
def _refresh(self):
"""Refreshes the cursor with more data from Mongo.
@ -1081,7 +1080,7 @@ class Cursor(object):
self.__query_spec(),
self.__projection,
self.__codec_options,
self.__read_preference,
self._read_preference(),
self.__limit,
self.__batch_size,
self.__read_concern,
@ -1106,7 +1105,7 @@ class Cursor(object):
limit,
self.__id,
self.__codec_options,
self.__read_preference,
self._read_preference(),
self.__session,
self.__collection.database.client,
self.__max_await_time_ms)

View File

@ -300,15 +300,6 @@ class Database(common.BaseObject):
self, name, False, codec_options, read_preference,
write_concern, read_concern)
def _collection_default_options(self, name, **kargs):
"""Get a Collection instance with the default settings."""
wc = (self.write_concern
if self.write_concern.acknowledged else WriteConcern())
return self.get_collection(
name, codec_options=DEFAULT_CODEC_OPTIONS,
read_preference=ReadPreference.PRIMARY,
write_concern=wc)
def create_collection(self, name, codec_options=None,
read_preference=None, write_concern=None,
read_concern=None, session=None, **kwargs):
@ -525,17 +516,20 @@ class Database(common.BaseObject):
.. mongodoc:: commands
"""
client = self.__client
with client._socket_for_reads(read_preference) as (sock_info, slave_ok):
read_preference = ((session and session._txn_read_preference())
or read_preference)
with self.__client._socket_for_reads(
read_preference) as (sock_info, slave_ok):
return self._command(sock_info, command, slave_ok, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)
def _list_collections(self, sock_info, slave_okay, session=None, **kwargs):
def _list_collections(self, sock_info, slave_okay, session,
read_preference, **kwargs):
"""Internal listCollections helper."""
coll = self.get_collection(
"$cmd", read_preference=ReadPreference.PRIMARY)
"$cmd", read_preference=read_preference)
if sock_info.max_wire_version > 2:
cmd = SON([("listCollections", 1),
("cursor", {})])
@ -543,7 +537,9 @@ class Database(common.BaseObject):
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, slave_okay, session=tmp_session)["cursor"]
sock_info, cmd, slave_okay,
read_preference=read_preference,
session=tmp_session)["cursor"]
return CommandCursor(
coll,
cursor,
@ -583,10 +579,13 @@ class Database(common.BaseObject):
.. versionadded:: 3.6
"""
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
ReadPreference.PRIMARY) as (sock_info, slave_okay):
read_pref) as (sock_info, slave_okay):
return self._list_collections(
sock_info, slave_okay, session=session, **kwargs)
sock_info, slave_okay, session, read_preference=read_pref,
**kwargs)
def list_collection_names(self, session=None):
"""Get a list of all the collection names in this database.
@ -648,10 +647,9 @@ class Database(common.BaseObject):
self.__client._purge_index(self.__name, name)
with self.__client._socket_for_reads(
ReadPreference.PRIMARY) as (sock_info, slave_ok):
with self.__client._socket_for_writes() as sock_info:
return self._command(
sock_info, 'drop', slave_ok, _unicode(name),
sock_info, 'drop', value=_unicode(name),
allowable_errors=['ns not found'],
write_concern=self.write_concern,
parse_write_concern_error=True,

View File

@ -973,7 +973,7 @@ class MongoClient(common.BaseObject):
@contextlib.contextmanager
def _socket_for_reads(self, read_preference):
preference = read_preference or ReadPreference.PRIMARY
assert read_preference is not None, "read_preference must not be None"
# Get a socket for a server matching the read preference, and yield
# sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
# mongods with topology type Single. If the server type is Mongos,
@ -983,13 +983,14 @@ class MongoClient(common.BaseObject):
topology = self._get_topology()
single = topology.description.topology_type == TOPOLOGY_TYPE.Single
server = topology.select_server(read_preference)
with self._get_socket(server) as sock_info:
slave_ok = (single and not sock_info.is_mongos) or (
preference != ReadPreference.PRIMARY)
read_preference != ReadPreference.PRIMARY)
yield sock_info, slave_ok
def _send_message_with_response(self, operation, read_preference=None,
exhaust=False, address=None):
def _send_message_with_response(self, operation, exhaust=False,
address=None):
"""Send a message to MongoDB and return a Response.
:Parameters:
@ -1011,16 +1012,14 @@ class MongoClient(common.BaseObject):
raise AutoReconnect('server %s:%d no longer available'
% address)
else:
selector = read_preference or writable_server_selector
server = topology.select_server(selector)
server = topology.select_server(operation.read_preference)
# A _Query's slaveOk bit is already set for queries with non-primary
# read preference. If this is a direct connection to a mongod, override
# and *always* set the slaveOk bit. See bullet point 2 in
# server-selection.rst#topology-type-single.
# If this is a direct connection to a mongod, *always* set the slaveOk
# bit. See bullet point 2 in server-selection.rst#topology-type-single.
set_slave_ok = (
topology.description.topology_type == TOPOLOGY_TYPE.Single
and server.description.server_type != SERVER_TYPE.Mongos)
and server.description.server_type != SERVER_TYPE.Mongos) or (
operation.read_preference != ReadPreference.PRIMARY)
return self._reset_on_error(
server,
@ -1542,12 +1541,10 @@ class MongoClient(common.BaseObject):
"of %s or a Database" % (string_type.__name__,))
self._purge_index(name)
with self._socket_for_reads(
ReadPreference.PRIMARY) as (sock_info, slave_ok):
with self._socket_for_writes() as sock_info:
self[name]._command(
sock_info,
"dropDatabase",
slave_ok=slave_ok,
read_preference=ReadPreference.PRIMARY,
write_concern=self.write_concern,
parse_write_concern_error=True,

View File

@ -25,11 +25,10 @@ import pymongo
from bson import json_util
from pymongo import monitoring
from pymongo.errors import OperationFailure
from pymongo.read_preferences import (make_read_preference,
read_pref_mode_from_name)
from pymongo.write_concern import WriteConcern
from test import unittest, client_context
from test.utils import single_client, wait_until, EventListener
from test.utils_selection_tests import parse_read_preference
# Location of JSON test specifications.
_TEST_PATH = os.path.join(
@ -82,10 +81,8 @@ def create_test(scenario_def, test):
self.listener.results.clear()
name = camel_to_snake(test['operation']['name'])
if 'read_preference' in test['operation']:
mode = read_pref_mode_from_name(
test['operation']['read_preference']['mode'])
coll = coll.with_options(
read_preference=make_read_preference(mode, None))
coll = coll.with_options(read_preference=parse_read_preference(
test['operation']['read_preference']))
test_args = test['operation']['arguments']
if 'writeConcern' in test_args:

View File

@ -316,7 +316,8 @@ class ReadPrefTester(MongoClient):
@contextlib.contextmanager
def _socket_for_reads(self, read_preference):
context = super(ReadPrefTester, self)._socket_for_reads(read_preference)
context = super(ReadPrefTester, self)._socket_for_reads(
read_preference)
with context as (sock_info, slave_ok):
self.record_a_read(sock_info.address)
yield sock_info, slave_ok
@ -412,18 +413,12 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
self._test_fn(server_type, func)
def test_create_collection(self):
# Collections should be created on primary, obviously
# create_collection runs listCollections on the primary to check if
# the collection already exists.
self._test_primary_helper(
lambda: self.c.pymongo_test.create_collection(
'some_collection%s' % random.randint(0, MAXSIZE)))
def test_drop_collection(self):
self._test_primary_helper(
lambda: self.c.pymongo_test.drop_collection('some_collection'))
self._test_primary_helper(
lambda: self.c.pymongo_test.some_collection.drop())
def test_group(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")

View File

@ -134,7 +134,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase):
cursor = c.pymongo_test.test.find()
self.assertEqual(
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
ReadPreference.PRIMARY, cursor._read_preference())
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
secondary = Secondary(tag_sets=tag_sets)
@ -155,13 +155,13 @@ class TestReplicaSetClient(TestReplicaSetClientBase):
cursor = c.pymongo_test.test.find()
self.assertEqual(
secondary, cursor._Cursor__read_preference)
secondary, cursor._read_preference())
nearest = Nearest(tag_sets=[{'dc': 'ny'}, {}])
cursor = c.pymongo_test.get_collection(
"test", read_preference=nearest).find()
self.assertEqual(nearest, cursor._Cursor__read_preference)
self.assertEqual(nearest, cursor._read_preference())
self.assertEqual(c.max_bson_size, 16777216)
c.close()

View File

@ -27,7 +27,6 @@ from bson.son import SON
from pymongo.errors import (ConnectionFailure,
ServerSelectionTimeoutError)
from pymongo.monitoring import _SENSITIVE_COMMANDS
from pymongo.mongo_client import MongoClient
from pymongo.operations import (InsertOne,
DeleteMany,
@ -38,7 +37,9 @@ from pymongo.operations import (InsertOne,
from pymongo.write_concern import WriteConcern
from test import unittest, client_context, IntegrationTest, SkipTest
from test.utils import rs_or_single_client, EventListener, DeprecationFilter
from test.utils import (rs_or_single_client,
DeprecationFilter,
OvertCommandListener)
from test.test_crud import check_result, run_operation
# Location of JSON test specifications.
@ -46,23 +47,6 @@ _TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'retryable_writes')
class CommandListener(EventListener):
def started(self, event):
if event.command_name.lower() in _SENSITIVE_COMMANDS:
return
super(CommandListener, self).started(event)
def succeeded(self, event):
if event.command_name.lower() in _SENSITIVE_COMMANDS:
return
super(CommandListener, self).succeeded(event)
def failed(self, event):
if event.command_name.lower() in _SENSITIVE_COMMANDS:
return
super(CommandListener, self).failed(event)
class TestAllScenarios(IntegrationTest):
@classmethod
@ -240,7 +224,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
@classmethod
def setUpClass(cls):
super(TestRetryableWrites, cls).setUpClass()
cls.listener = CommandListener()
cls.listener = OvertCommandListener()
cls.client = rs_or_single_client(
retryWrites=True, event_listeners=[cls.listener])
cls.db = cls.client.pymongo_test
@ -260,7 +244,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
('mode', 'off')]))
def test_supported_single_statement_no_retry(self):
listener = CommandListener()
listener = OvertCommandListener()
client = rs_or_single_client(
retryWrites=False, event_listeners=[listener])
self.addCleanup(client.close)
@ -350,7 +334,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
def test_server_selection_timeout_not_retried(self):
"""A ServerSelectionTimeoutError is not retried."""
listener = CommandListener()
listener = OvertCommandListener()
client = MongoClient(
'somedomainthatdoesntexist.org',
serverSelectionTimeoutMS=1,
@ -370,7 +354,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
"""A ServerSelectionTimeoutError on the retry attempt raises the
original error.
"""
listener = CommandListener()
listener = OvertCommandListener()
client = rs_or_single_client(
retryWrites=True, event_listeners=[listener])
self.addCleanup(client.close)

View File

@ -32,12 +32,12 @@ from pymongo.errors import (ConfigurationError,
OperationFailure,
PyMongoError)
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import (make_read_preference,
read_pref_mode_from_name)
from pymongo.read_preferences import ReadPreference
from pymongo.results import _WriteResult, BulkWriteResult
from test import unittest, client_context, IntegrationTest
from test.utils import EventListener, rs_client
from test.utils import OvertCommandListener, rs_client
from test.utils_selection_tests import parse_read_preference
# Location of JSON test specifications.
_TEST_PATH = os.path.join(
@ -74,8 +74,10 @@ class TestTransactions(IntegrationTest):
default_options = TransactionOptions()
self.assertIsNone(default_options.read_concern)
self.assertIsNone(default_options.write_concern)
self.assertIsNone(default_options.read_preference)
TransactionOptions(read_concern=ReadConcern(),
write_concern=WriteConcern())
write_concern=WriteConcern(),
read_preference=ReadPreference.PRIMARY)
with self.assertRaisesRegex(TypeError, "read_concern must be "):
TransactionOptions(read_concern={})
with self.assertRaisesRegex(TypeError, "write_concern must be "):
@ -84,6 +86,9 @@ class TestTransactions(IntegrationTest):
ConfigurationError,
"transactions must use an acknowledged write concern"):
TransactionOptions(write_concern=WriteConcern(w=0))
with self.assertRaisesRegex(
TypeError, "is not valid for read_preference"):
TransactionOptions(read_preference={})
# TODO: factor the following function with test_crud.py.
def check_result(self, expected_result, result):
@ -138,8 +143,7 @@ class TestTransactions(IntegrationTest):
arguments.update(arguments.pop("options", {}))
pref = write_c = read_c = None
if 'readPreference' in arguments:
pref = make_read_preference(read_pref_mode_from_name(
arguments.pop('readPreference')['mode']), tag_sets=None)
pref = parse_read_preference(arguments.pop('readPreference'))
if 'writeConcern' in arguments:
write_c = WriteConcern(**dict(arguments.pop('writeConcern')))
@ -150,7 +154,8 @@ class TestTransactions(IntegrationTest):
if name == 'start_transaction':
cmd = partial(session.start_transaction,
write_concern=write_c,
read_concern=read_c)
read_concern=read_c,
read_preference=pref)
elif name in ('commit_transaction', 'abort_transaction'):
cmd = getattr(session, name)
else:
@ -195,7 +200,10 @@ class TestTransactions(IntegrationTest):
if name == "aggregate":
if arguments["pipeline"] and "$out" in arguments["pipeline"][-1]:
out = collection.database[arguments["pipeline"][-1]["$out"]]
# Read from the primary to ensure causal consistency.
out = collection.database.get_collection(
arguments["pipeline"][-1]["$out"],
read_preference=ReadPreference.PRIMARY)
return out.find()
if isinstance(result, Cursor) or isinstance(result, CommandCursor):
@ -279,7 +287,7 @@ def end_sessions(sessions):
def create_test(scenario_def, test):
def run_scenario(self):
listener = EventListener()
listener = OvertCommandListener()
# New client, to avoid interference from pooled sessions.
# Convert test['clientOptions'] to dict to avoid a Jython bug using "**"
# with ScenarioDict.
@ -319,9 +327,16 @@ def create_test(scenario_def, test):
else:
write_concern = None
if 'readPreference' in txn_opts:
read_pref = parse_read_preference(
txn_opts['readPreference'])
else:
read_pref = None
txn_opts = client_session.TransactionOptions(
read_concern=read_concern,
write_concern=write_concern
write_concern=write_concern,
read_preference=read_pref,
)
opts['default_transaction_options'] = txn_opts
@ -339,7 +354,8 @@ def create_test(scenario_def, test):
for op in test['operations']:
expected_result = op.get('result')
if expect_error_message(expected_result):
with self.assertRaises(PyMongoError) as context:
with self.assertRaises(PyMongoError,
msg=op.get('name')) as context:
self.run_operation(sessions, collection, op.copy())
self.assertIn(expected_result['errorContains'].lower(),
@ -363,7 +379,10 @@ def create_test(scenario_def, test):
# Assert final state is expected.
expected_c = test['outcome'].get('collection')
if expected_c is not None:
self.assertEqual(list(collection.find()), expected_c['data'])
# Read from the primary to ensure causal consistency.
primary_coll = collection.with_options(
read_preference=ReadPreference.PRIMARY)
self.assertEqual(list(primary_coll.find()), expected_c['data'])
return run_scenario

View File

@ -2,7 +2,7 @@
"data": [],
"tests": [
{
"description": "primary",
"description": "default readPreference",
"operations": [
{
"name": "startTransaction",
@ -11,26 +11,42 @@
}
},
{
"name": "insertOne",
"name": "insertMany",
"arguments": {
"document": {
"_id": 1
},
"documents": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedId": 1
"insertedIds": {
"0": 1,
"1": 2,
"2": 3,
"3": 4
}
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"filter": {
"_id": 1
},
"readPreference": {
"mode": "primary"
},
"session": "session0"
},
"result": 1
@ -38,17 +54,33 @@
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"batchSize": 3,
"session": "session0"
},
"result": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
},
{
"name": "aggregate",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"pipeline": [
{
"$project": {
@ -62,6 +94,15 @@
"result": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
},
@ -77,18 +118,477 @@
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
}
}
},
{
"description": "write and read",
"description": "primary readPreference",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "Primary"
}
}
}
},
{
"name": "insertMany",
"arguments": {
"documents": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 1,
"1": 2,
"2": 3,
"3": 4
}
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"filter": {
"_id": 1
},
"session": "session0"
},
"result": 1
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"batchSize": 3,
"session": "session0"
},
"result": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
},
{
"name": "aggregate",
"arguments": {
"readPreference": {
"mode": "Secondary"
},
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"batchSize": 3,
"session": "session0"
},
"result": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
]
}
}
},
{
"description": "secondary readPreference",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "Secondary"
}
}
}
},
{
"name": "insertMany",
"arguments": {
"documents": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 1,
"1": 2,
"2": 3,
"3": 4
}
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "abortTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
},
{
"description": "primaryPreferred readPreference",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "PrimaryPreferred"
}
}
}
},
{
"name": "insertMany",
"arguments": {
"documents": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 1,
"1": 2,
"2": 3,
"3": 4
}
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "abortTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
},
{
"description": "nearest readPreference",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "Nearest"
}
}
}
},
{
"name": "insertMany",
"arguments": {
"documents": [
{
"_id": 1
},
{
"_id": 2
},
{
"_id": 3
},
{
"_id": 4
}
],
"session": "session0"
},
"result": {
"insertedIds": {
"0": 1,
"1": 2,
"2": 3,
"3": 4
}
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"readPreference": {
"mode": "Primary"
},
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"batchSize": 3,
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "abortTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
},
{
"description": "secondary write only",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "Secondary"
}
}
}
},
{
@ -103,49 +603,6 @@
"insertedId": 1
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
@ -162,146 +619,6 @@
]
}
}
},
{
"description": "non-primary readPreference",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0"
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
},
{
"description": "conflict",
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0"
}
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "primary"
},
"session": "session0"
},
"result": 0
},
{
"name": "count",
"arguments": {
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "find",
"arguments": {
"session": "session0",
"readPreference": {
"mode": "secondary"
}
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "aggregate",
"arguments": {
"pipeline": [
{
"$project": {
"_id": 1
}
}
],
"readPreference": {
"mode": "secondary"
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"outcome": {
"collection": {
"data": []
}
}
}
]
}

View File

@ -1258,6 +1258,314 @@
]
}
}
},
{
"description": "readPreference inherited from client",
"clientOptions": {
"readPreference": "secondary"
},
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0"
}
},
{
"name": "insertOne",
"arguments": {
"document": {
"_id": 1
},
"session": "session0"
},
"result": {
"insertedId": 1
}
},
{
"name": "count",
"arguments": {
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
}
]
}
}
},
{
"description": "readPreference inherited from defaultTransactionOptions",
"clientOptions": {
"readPreference": "primary"
},
"sessionOptions": {
"session0": {
"defaultTransactionOptions": {
"readPreference": {
"mode": "Secondary"
}
}
}
},
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0"
}
},
{
"name": "insertOne",
"arguments": {
"document": {
"_id": 1
},
"session": "session0"
},
"result": {
"insertedId": 1
}
},
{
"name": "count",
"arguments": {
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
}
]
}
}
},
{
"description": "startTransaction overrides readPreference",
"clientOptions": {
"readPreference": "primary"
},
"sessionOptions": {
"session0": {
"defaultTransactionOptions": {
"readPreference": {
"mode": "Primary"
}
}
}
},
"operations": [
{
"name": "startTransaction",
"arguments": {
"session": "session0",
"options": {
"readPreference": {
"mode": "Secondary"
}
}
}
},
{
"name": "insertOne",
"arguments": {
"document": {
"_id": 1
},
"session": "session0"
},
"result": {
"insertedId": 1
}
},
{
"name": "count",
"arguments": {
"filter": {
"_id": 1
},
"session": "session0"
},
"result": {
"errorContains": "read preference in a transaction must be primary"
}
},
{
"name": "commitTransaction",
"arguments": {
"session": "session0"
}
}
],
"expectations": [
{
"command_started_event": {
"command": {
"insert": "test",
"documents": [
{
"_id": 1
}
],
"ordered": true,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": true,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "insert",
"database_name": "transaction-tests"
}
},
{
"command_started_event": {
"command": {
"commitTransaction": 1,
"lsid": "session0",
"txnNumber": {
"$numberLong": "1"
},
"startTransaction": null,
"autocommit": false,
"readConcern": null,
"writeConcern": null
},
"command_name": "commitTransaction",
"database_name": "admin"
}
}
],
"outcome": {
"collection": {
"data": [
{
"_id": 1
}
]
}
}
}
]
}

View File

@ -29,6 +29,7 @@ from functools import partial
from pymongo import MongoClient, monitoring
from pymongo.errors import AutoReconnect, OperationFailure
from pymongo.monitoring import _SENSITIVE_COMMANDS
from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
from pymongo.write_concern import WriteConcern
@ -74,6 +75,21 @@ class EventListener(monitoring.CommandListener):
self.results['failed'].append(event)
class OvertCommandListener(EventListener):
"""A CommandListener that ignores sensitive commands."""
def started(self, event):
if event.command_name.lower() not in _SENSITIVE_COMMANDS:
super(OvertCommandListener, self).started(event)
def succeeded(self, event):
if event.command_name.lower() not in _SENSITIVE_COMMANDS:
super(OvertCommandListener, self).succeeded(event)
def failed(self, event):
if event.command_name.lower() not in _SENSITIVE_COMMANDS:
super(OvertCommandListener, self).failed(event)
class ServerAndTopologyEventListener(monitoring.ServerListener,
monitoring.TopologyListener):
"""Listens to all events."""

View File

@ -180,23 +180,14 @@ def create_test(scenario_def):
else:
# Make first letter lowercase to match read_pref's modes.
pref_def = scenario_def['read_preference']
mode_string = pref_def.get('mode', 'primary')
mode_string = mode_string[:1].lower() + mode_string[1:]
mode = read_preferences.read_pref_mode_from_name(mode_string)
max_staleness = pref_def.get('maxStalenessSeconds', -1)
tag_sets = pref_def.get('tag_sets')
if scenario_def.get('error'):
with self.assertRaises((ConfigurationError, ValueError)):
# Error can be raised when making Read Pref or selecting.
pref = read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness)
pref = parse_read_preference(pref_def)
top_latency.select_server(pref)
return
pref = read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness)
pref = parse_read_preference(pref_def)
# Select servers.
if not scenario_def.get('suitable_servers'):
@ -281,3 +272,14 @@ def create_selection_tests(test_dir):
setattr(TestAllScenarios, new_test.__name__, new_test)
return TestAllScenarios
def parse_read_preference(pref):
# Make first letter lowercase to match read_pref's modes.
mode_string = pref.get('mode', 'primary')
mode_string = mode_string[:1].lower() + mode_string[1:]
mode = read_preferences.read_pref_mode_from_name(mode_string)
max_staleness = pref.get('maxStalenessSeconds', -1)
tag_sets = pref.get('tag_sets')
return read_preferences.make_read_preference(
mode, tag_sets=tag_sets, max_staleness=max_staleness)