PYTHON-1781 Raise a client side error when attempting a sharded transaction
This commit is contained in:
parent
ea62ce57d7
commit
e7ddc291b1
@ -33,7 +33,8 @@ Changes in Version 3.8.0.dev0
|
||||
the :class:`~bson.codec_options.TypeCodec` and
|
||||
:class:`~bson.codec_options.TypeRegistry` APIs. For more information, see
|
||||
the :doc:`custom type example <examples/custom_type>`.
|
||||
|
||||
- Attempting a multi-document transaction on a sharded cluster now raises a
|
||||
:exc:`~pymongo.errors.ConfigurationError`.
|
||||
|
||||
Issues Resolved
|
||||
...............
|
||||
|
||||
@ -281,7 +281,8 @@ class _Bulk(object):
|
||||
if retryable and not self.started_retryable_write:
|
||||
session._start_retryable_write()
|
||||
self.started_retryable_write = True
|
||||
session._apply_to(cmd, retryable, ReadPreference.PRIMARY)
|
||||
session._apply_to(cmd, retryable, ReadPreference.PRIMARY,
|
||||
sock_info)
|
||||
sock_info.send_cluster_time(cmd, session, client)
|
||||
check_keys = run.op_type == _INSERT
|
||||
ops = islice(run.ops, run.idx_offset, None)
|
||||
|
||||
@ -90,7 +90,7 @@ from bson.int64 import Int64
|
||||
from bson.py3compat import abc, reraise_instance
|
||||
from bson.timestamp import Timestamp
|
||||
|
||||
from pymongo import monotonic
|
||||
from pymongo import monotonic, __version__
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
ConnectionFailure,
|
||||
InvalidOperation,
|
||||
@ -263,6 +263,10 @@ _UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset([
|
||||
64, # WriteConcernFailed
|
||||
])
|
||||
|
||||
_MONGOS_NOT_SUPPORTED_MSG = (
|
||||
'PyMongo %s does not support running multi-document transactions on '
|
||||
'sharded clusters') % (__version__,)
|
||||
|
||||
|
||||
class ClientSession(object):
|
||||
"""A session for ordering sequential operations."""
|
||||
@ -356,6 +360,9 @@ class ClientSession(object):
|
||||
"""
|
||||
self._check_ended()
|
||||
|
||||
if self._client._is_mongos_non_blocking():
|
||||
raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG)
|
||||
|
||||
if self._in_transaction:
|
||||
raise InvalidOperation("Transaction already in progress")
|
||||
|
||||
@ -534,7 +541,7 @@ class ClientSession(object):
|
||||
return self._transaction.opts.read_preference
|
||||
return None
|
||||
|
||||
def _apply_to(self, command, is_retryable, read_preference):
|
||||
def _apply_to(self, command, is_retryable, read_preference, sock_info):
|
||||
self._check_ended()
|
||||
|
||||
self._server_session.last_use = monotonic.time()
|
||||
@ -548,6 +555,9 @@ class ClientSession(object):
|
||||
return
|
||||
|
||||
if self._in_transaction:
|
||||
if sock_info.is_mongos:
|
||||
raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG)
|
||||
|
||||
if read_preference != ReadPreference.PRIMARY:
|
||||
raise InvalidOperation(
|
||||
'read preference in a transaction must be primary, not: '
|
||||
|
||||
@ -289,7 +289,7 @@ class _Query(object):
|
||||
cmd = SON([('explain', cmd)])
|
||||
session = self.session
|
||||
if session:
|
||||
session._apply_to(cmd, False, self.read_preference)
|
||||
session._apply_to(cmd, False, self.read_preference, sock_info)
|
||||
# Explain does not support readConcern.
|
||||
if (not explain and session.options.causal_consistency
|
||||
and session.operation_time is not None
|
||||
@ -379,7 +379,7 @@ class _GetMore(object):
|
||||
self.max_await_time_ms)
|
||||
|
||||
if self.session:
|
||||
self.session._apply_to(cmd, False, self.read_preference)
|
||||
self.session._apply_to(cmd, False, self.read_preference, sock_info)
|
||||
sock_info.send_cluster_time(cmd, self.session, self.client)
|
||||
self._as_command = cmd, self.db
|
||||
return self._as_command
|
||||
|
||||
@ -1530,6 +1530,9 @@ class MongoClient(common.BaseObject):
|
||||
except Exception:
|
||||
helpers._handle_exception()
|
||||
|
||||
def _is_mongos_non_blocking(self):
|
||||
return self._topology.is_mongos_non_blocking()
|
||||
|
||||
def __start_session(self, implicit, **kwargs):
|
||||
# Driver Sessions Spec: "If startSession is called when multiple users
|
||||
# are authenticated drivers MUST raise an error with the error message
|
||||
|
||||
@ -560,7 +560,7 @@ class SocketInfo(object):
|
||||
'Must be connected to MongoDB 3.4+ to use a collation.')
|
||||
|
||||
if session:
|
||||
session._apply_to(spec, retryable_write, read_preference)
|
||||
session._apply_to(spec, retryable_write, read_preference, self)
|
||||
self.send_cluster_time(spec, session, client)
|
||||
listeners = self.listeners if publish_events else None
|
||||
unacknowledged = write_concern and not write_concern.acknowledged
|
||||
|
||||
@ -30,6 +30,7 @@ from pymongo import common
|
||||
from pymongo import periodic_executor
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.topology_description import (updated_topology_description,
|
||||
SERVER_TYPE,
|
||||
TOPOLOGY_TYPE,
|
||||
TopologyDescription)
|
||||
from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError
|
||||
@ -451,6 +452,22 @@ class Topology(object):
|
||||
# Called from a __del__ method, can't use a lock.
|
||||
self._session_pool.return_server_session_no_lock(server_session)
|
||||
|
||||
def is_mongos_non_blocking(self):
|
||||
"""Return if we are connected to a Mongos without blocking.
|
||||
|
||||
If the state is unknown, return False.
|
||||
"""
|
||||
with self._lock:
|
||||
if not self._opened:
|
||||
return False
|
||||
if self._description.topology_type == TOPOLOGY_TYPE.Sharded:
|
||||
return True
|
||||
server_descriptions = self._description.apply_selector(
|
||||
writable_server_selector, None)
|
||||
if not server_descriptions:
|
||||
return False
|
||||
return server_descriptions[0].server_type == SERVER_TYPE.Mongos
|
||||
|
||||
def _new_selection(self):
|
||||
"""A Selection object, initially including all known servers.
|
||||
|
||||
|
||||
@ -86,6 +86,23 @@ class TestTransactions(IntegrationTest):
|
||||
cmd.update(command_args)
|
||||
self.client.admin.command(cmd)
|
||||
|
||||
@client_context.require_mongos
|
||||
@client_context.require_version_min(4, 0)
|
||||
def test_transactions_not_supported(self):
|
||||
with self.client.start_session() as s:
|
||||
with self.assertRaisesRegex(
|
||||
ConfigurationError,
|
||||
'does not support running multi-document transactions on '
|
||||
'sharded clusters'):
|
||||
s.start_transaction()
|
||||
self.client.close()
|
||||
with s.start_transaction():
|
||||
with self.assertRaisesRegex(
|
||||
ConfigurationError,
|
||||
'does not support running multi-document transactions '
|
||||
'on sharded clusters'):
|
||||
self.client.test.test.insert_one({}, session=s)
|
||||
|
||||
@client_context.require_transactions
|
||||
def test_transaction_options_validation(self):
|
||||
default_options = TransactionOptions()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user