From e7ddc291b10b34ffd8c0dc5e1abe70fc15c6fbb1 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 21 Mar 2019 14:57:53 -0700 Subject: [PATCH] PYTHON-1781 Raise a client side error when attempting a sharded transaction --- doc/changelog.rst | 3 ++- pymongo/bulk.py | 3 ++- pymongo/client_session.py | 14 ++++++++++++-- pymongo/message.py | 4 ++-- pymongo/mongo_client.py | 3 +++ pymongo/pool.py | 2 +- pymongo/topology.py | 17 +++++++++++++++++ test/test_transactions.py | 17 +++++++++++++++++ 8 files changed, 56 insertions(+), 7 deletions(-) diff --git a/doc/changelog.rst b/doc/changelog.rst index 771b53f95..56f75616c 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -33,7 +33,8 @@ Changes in Version 3.8.0.dev0 the :class:`~bson.codec_options.TypeCodec` and :class:`~bson.codec_options.TypeRegistry` APIs. For more information, see the :doc:`custom type example `. - +- Attempting a multi-document transaction on a sharded cluster now raises a + :exc:`~pymongo.errors.ConfigurationError`. Issues Resolved ............... diff --git a/pymongo/bulk.py b/pymongo/bulk.py index ccd58e4ec..bc8c31c50 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -281,7 +281,8 @@ class _Bulk(object): if retryable and not self.started_retryable_write: session._start_retryable_write() self.started_retryable_write = True - session._apply_to(cmd, retryable, ReadPreference.PRIMARY) + session._apply_to(cmd, retryable, ReadPreference.PRIMARY, + sock_info) sock_info.send_cluster_time(cmd, session, client) check_keys = run.op_type == _INSERT ops = islice(run.ops, run.idx_offset, None) diff --git a/pymongo/client_session.py b/pymongo/client_session.py index dff0cd70f..d610f61d8 100644 --- a/pymongo/client_session.py +++ b/pymongo/client_session.py @@ -90,7 +90,7 @@ from bson.int64 import Int64 from bson.py3compat import abc, reraise_instance from bson.timestamp import Timestamp -from pymongo import monotonic +from pymongo import monotonic, __version__ from pymongo.errors import (ConfigurationError, ConnectionFailure, InvalidOperation, @@ -263,6 +263,10 @@ _UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset([ 64, # WriteConcernFailed ]) +_MONGOS_NOT_SUPPORTED_MSG = ( + 'PyMongo %s does not support running multi-document transactions on ' + 'sharded clusters') % (__version__,) + class ClientSession(object): """A session for ordering sequential operations.""" @@ -356,6 +360,9 @@ class ClientSession(object): """ self._check_ended() + if self._client._is_mongos_non_blocking(): + raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG) + if self._in_transaction: raise InvalidOperation("Transaction already in progress") @@ -534,7 +541,7 @@ class ClientSession(object): return self._transaction.opts.read_preference return None - def _apply_to(self, command, is_retryable, read_preference): + def _apply_to(self, command, is_retryable, read_preference, sock_info): self._check_ended() self._server_session.last_use = monotonic.time() @@ -548,6 +555,9 @@ class ClientSession(object): return if self._in_transaction: + if sock_info.is_mongos: + raise ConfigurationError(_MONGOS_NOT_SUPPORTED_MSG) + if read_preference != ReadPreference.PRIMARY: raise InvalidOperation( 'read preference in a transaction must be primary, not: ' diff --git a/pymongo/message.py b/pymongo/message.py index 05f9e4601..2d4002c3f 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -289,7 +289,7 @@ class _Query(object): cmd = SON([('explain', cmd)]) session = self.session if session: - session._apply_to(cmd, False, self.read_preference) + session._apply_to(cmd, False, self.read_preference, sock_info) # Explain does not support readConcern. if (not explain and session.options.causal_consistency and session.operation_time is not None @@ -379,7 +379,7 @@ class _GetMore(object): self.max_await_time_ms) if self.session: - self.session._apply_to(cmd, False, self.read_preference) + self.session._apply_to(cmd, False, self.read_preference, sock_info) sock_info.send_cluster_time(cmd, self.session, self.client) self._as_command = cmd, self.db return self._as_command diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 9d631a814..11f8389dc 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1530,6 +1530,9 @@ class MongoClient(common.BaseObject): except Exception: helpers._handle_exception() + def _is_mongos_non_blocking(self): + return self._topology.is_mongos_non_blocking() + def __start_session(self, implicit, **kwargs): # Driver Sessions Spec: "If startSession is called when multiple users # are authenticated drivers MUST raise an error with the error message diff --git a/pymongo/pool.py b/pymongo/pool.py index 78c74034b..70f0826e3 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -560,7 +560,7 @@ class SocketInfo(object): 'Must be connected to MongoDB 3.4+ to use a collation.') if session: - session._apply_to(spec, retryable_write, read_preference) + session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) listeners = self.listeners if publish_events else None unacknowledged = write_concern and not write_concern.acknowledged diff --git a/pymongo/topology.py b/pymongo/topology.py index c69c127ff..60f4fa7c8 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -30,6 +30,7 @@ from pymongo import common from pymongo import periodic_executor from pymongo.pool import PoolOptions from pymongo.topology_description import (updated_topology_description, + SERVER_TYPE, TOPOLOGY_TYPE, TopologyDescription) from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError @@ -451,6 +452,22 @@ class Topology(object): # Called from a __del__ method, can't use a lock. self._session_pool.return_server_session_no_lock(server_session) + def is_mongos_non_blocking(self): + """Return if we are connected to a Mongos without blocking. + + If the state is unknown, return False. + """ + with self._lock: + if not self._opened: + return False + if self._description.topology_type == TOPOLOGY_TYPE.Sharded: + return True + server_descriptions = self._description.apply_selector( + writable_server_selector, None) + if not server_descriptions: + return False + return server_descriptions[0].server_type == SERVER_TYPE.Mongos + def _new_selection(self): """A Selection object, initially including all known servers. diff --git a/test/test_transactions.py b/test/test_transactions.py index 1cf15f123..e121049c8 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -86,6 +86,23 @@ class TestTransactions(IntegrationTest): cmd.update(command_args) self.client.admin.command(cmd) + @client_context.require_mongos + @client_context.require_version_min(4, 0) + def test_transactions_not_supported(self): + with self.client.start_session() as s: + with self.assertRaisesRegex( + ConfigurationError, + 'does not support running multi-document transactions on ' + 'sharded clusters'): + s.start_transaction() + self.client.close() + with s.start_transaction(): + with self.assertRaisesRegex( + ConfigurationError, + 'does not support running multi-document transactions ' + 'on sharded clusters'): + self.client.test.test.insert_one({}, session=s) + @client_context.require_transactions def test_transaction_options_validation(self): default_options = TransactionOptions()