diff --git a/doc/api/pymongo/index.rst b/doc/api/pymongo/index.rst index 9de4aef38..052d0a310 100644 --- a/doc/api/pymongo/index.rst +++ b/doc/api/pymongo/index.rst @@ -15,6 +15,13 @@ .. autoclass:: pymongo.read_preferences.ReadPreference .. autofunction:: has_c + .. data:: MIN_SUPPORTED_WIRE_VERSION + + The minimum wire protocol version PyMongo supports. + + .. data:: MAX_SUPPORTED_WIRE_VERSION + + The maximum wire protocol version PyMongo supports. Sub-modules: diff --git a/pymongo/__init__.py b/pymongo/__init__.py index 79f15d71a..d39480923 100644 --- a/pymongo/__init__.py +++ b/pymongo/__init__.py @@ -77,6 +77,8 @@ def get_version_string(): version = get_version_string() """Current version of PyMongo.""" +from pymongo.common import (MIN_SUPPORTED_WIRE_VERSION, + MAX_SUPPORTED_WIRE_VERSION) from pymongo.connection import Connection from pymongo.mongo_client import MongoClient from pymongo.mongo_replica_set_client import MongoReplicaSetClient diff --git a/pymongo/common.py b/pymongo/common.py index 46978492e..109eae704 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -42,6 +42,10 @@ MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE MIN_WIRE_VERSION = 0 MAX_WIRE_VERSION = 0 +# What this version of PyMongo supports. +MIN_SUPPORTED_WIRE_VERSION = 0 +MAX_SUPPORTED_WIRE_VERSION = 2 + def raise_config_error(key, dummy): """Raise ConfigurationError with the given key name.""" diff --git a/pymongo/member.py b/pymongo/member.py index a76702225..3377066d7 100644 --- a/pymongo/member.py +++ b/pymongo/member.py @@ -15,6 +15,7 @@ """Represent a mongod / mongos instance""" from pymongo import common +from pymongo.errors import ConfigurationError from pymongo.read_preferences import ReadPreference # Member states @@ -53,6 +54,7 @@ class Member(object): else: self.state = OTHER + self.set_name = ismaster_response.get('setName') self.tags = ismaster_response.get('tags', {}) self.max_bson_size = ismaster_response.get( 'maxBsonObjectSize', common.MAX_BSON_SIZE) @@ -63,7 +65,21 @@ class Member(object): self.max_wire_version = ismaster_response.get( 'maxWireVersion', common.MAX_WIRE_VERSION) - self.set_name = ismaster_response.get('setName') + # self.min/max_wire_version is the server's wire protocol. + # MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports. + if ( + # Server too new. + common.MAX_SUPPORTED_WIRE_VERSION < self.min_wire_version + # Server too old. + or common.MIN_SUPPORTED_WIRE_VERSION > self.max_wire_version + ): + raise ConfigurationError( + "Server at %s:%d uses wire protocol versions %d through %d, " + "but PyMongo only supports %d through %d" + % (self.host[0], self.host[1], + self.min_wire_version, self.max_wire_version, + common.MIN_SUPPORTED_WIRE_VERSION, + common.MAX_SUPPORTED_WIRE_VERSION)) def clone_with(self, ismaster_response, ping_time_sample): """Get a clone updated with ismaster response and a single ping time. diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 9a0b00794..9e2e373a2 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -848,8 +848,9 @@ class MongoClient(common.BaseObject): chosen_member = member discovered_nodes = nodes break - except OperationFailure: - # The server is available but something failed, probably auth. + except (OperationFailure, ConfigurationError): + # The server is available but something failed, e.g. auth, + # wrong replica set name, or incompatible wire protocol. raise except Exception, why: errors.append(str(why)) diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 715aca9a6..1f85fecc3 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -130,7 +130,7 @@ def _partition_node(node): class RSState(object): def __init__( self, threadlocal, hosts=None, host_to_member=None, arbiters=None, - writer=None, error_message='No primary available'): + writer=None, error_message='No primary available', exc=None): """An immutable snapshot of the client's view of the replica set state. Stores Member instances for all members we're connected to, and a @@ -144,6 +144,7 @@ class RSState(object): - `arbiters`: Optional sequence of arbiters as (host, port) - `writer`: Optional (host, port) of primary - `error_message`: Optional error if `writer` is None + - `exc`: Optional error if state is unusable """ self._threadlocal = threadlocal # threading.local or gevent local self._arbiters = frozenset(arbiters or []) # set of (host, port) @@ -152,6 +153,7 @@ class RSState(object): self._host_to_member = host_to_member or {} self._hosts = frozenset(hosts or []) self._members = frozenset(self._host_to_member.values()) + self._exc = exc self._primary_member = self.get(writer) def clone_with_host_down(self, host, error_message): @@ -168,7 +170,8 @@ class RSState(object): members, self._arbiters, None, - error_message) + error_message, + self._exc) else: # Some other host went down. Keep our current primary or, if it's # already down, keep our current error message. @@ -178,7 +181,8 @@ class RSState(object): members, self._arbiters, self._writer, - self._error_message) + self._error_message, + self._exc) def clone_without_writer(self, threadlocal): """Get a clone without a primary. Unpins all threads. @@ -190,8 +194,17 @@ class RSState(object): threadlocal, self._hosts, self._host_to_member, + self._arbiters) + + def clone_with_error(self, exc): + return RSState( + self._threadlocal, + self._hosts, + self._host_to_member.copy(), self._arbiters, - None) + self._writer, + self._error_message, + exc) @property def arbiters(self): @@ -232,6 +245,11 @@ class RSState(object): host for host, member in self._host_to_member.items() if member.is_secondary]) + @property + def exc(self): + """Reason RSState is unusable, or None.""" + return self._exc + def get(self, host): """Return a Member instance or None for the given (host, port).""" return self._host_to_member.get(host) @@ -1054,8 +1072,15 @@ class MongoReplicaSetClient(common.BaseObject): """ # Only one thread / greenlet calls refresh() at a time: the one # running __init__() or the monitor. We won't modify the state, only - # replace it at the end. + # replace it. rs_state = self.__rs_state + try: + self.__rs_state = self.__create_rs_state(rs_state, initial) + except ConfigurationError, e: + self.__rs_state = rs_state.clone_with_error(e) + raise + + def __create_rs_state(self, rs_state, initial): errors = [] if rs_state.hosts: # Try first those hosts we think are up, then the down ones. @@ -1198,18 +1223,26 @@ class MongoReplicaSetClient(common.BaseObject): + response.get('passives', [])) # Replace old state with new. - self.__rs_state = RSState( + return RSState( threadlocal, [_partition_node(h) for h in final_host_list], members, arbiters, writer) + def __get_rs_state(self): + rs_state = self.__rs_state + if rs_state.exc: + raise rs_state.exc + + return rs_state + def __find_primary(self): """Returns a connection to the primary of this replica set, if one exists, or raises AutoReconnect. """ - primary = self.__rs_state.primary_member + rs_state = self.__get_rs_state() + primary = rs_state.primary_member if primary: return primary @@ -1218,7 +1251,7 @@ class MongoReplicaSetClient(common.BaseObject): # Try again. This time copy the RSState reference so we're guaranteed # primary_member and error_message are from the same state. - rs_state = self.__rs_state + rs_state = self.__get_rs_state() if rs_state.primary_member: return rs_state.primary_member @@ -1248,7 +1281,7 @@ class MongoReplicaSetClient(common.BaseObject): if sync: rs_state = self.__rs_state - if not rs_state.primary_member: + if rs_state.exc or not rs_state.primary_member: self.__schedule_refresh(sync) def disconnect(self): @@ -1425,7 +1458,7 @@ class MongoReplicaSetClient(common.BaseObject): if _connection_to_use in (None, -1): member = self.__find_primary() else: - member = self.__rs_state.get(_connection_to_use) + member = self.__get_rs_state().get(_connection_to_use) sock_info = None try: @@ -1524,7 +1557,7 @@ class MongoReplicaSetClient(common.BaseObject): """ self._ensure_connected() - rs_state = self.__rs_state + rs_state = self.__get_rs_state() tag_sets = kwargs.get('tag_sets', [{}]) mode = kwargs.get('read_preference', ReadPreference.PRIMARY) if _must_use_master: diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 384d77bdb..e6f21a142 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -16,6 +16,7 @@ import socket +from pymongo import common from pymongo import MongoClient, MongoReplicaSetClient from pymongo.pool import Pool @@ -77,6 +78,9 @@ class MockClientBase(object): # Hosts that should raise socket errors. self.mock_down_hosts = [] + # Hostname -> (min wire version, max wire version) + self.mock_wire_versions = {} + def kill_host(self, host): """Host is like 'a:1'.""" self.mock_down_hosts.append(host) @@ -85,13 +89,23 @@ class MockClientBase(object): """Host is like 'a:1'.""" self.mock_down_hosts.remove(host) + def set_wire_version_range(self, host, min_version, max_version): + self.mock_wire_versions[host] = (min_version, max_version) + def mock_is_master(self, host): + min_wire_version, max_wire_version = self.mock_wire_versions.get( + host, + (common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION)) + # host is like 'a:1'. if host in self.mock_down_hosts: raise socket.timeout('mock timeout') if host in self.mock_standalones: - return {'ismaster': True} + return { + 'ismaster': True, + 'minWireVersion': min_wire_version, + 'maxWireVersion': max_wire_version} if host in self.mock_members: ismaster = (host == self.mock_primary) @@ -101,7 +115,9 @@ class MockClientBase(object): 'ismaster': ismaster, 'secondary': not ismaster, 'setName': 'rs', - 'hosts': self.mock_ismaster_hosts} + 'hosts': self.mock_ismaster_hosts, + 'minWireVersion': min_wire_version, + 'maxWireVersion': max_wire_version} if self.mock_primary: response['primary'] = self.mock_primary @@ -109,7 +125,11 @@ class MockClientBase(object): return response if host in self.mock_mongoses: - return {'ismaster': True, 'msg': 'isdbgrid'} + return { + 'ismaster': True, + 'minWireVersion': min_wire_version, + 'maxWireVersion': max_wire_version, + 'msg': 'isdbgrid'} # In test_internal_ips(), we try to connect to a host listed # in ismaster['hosts'] but not publicly accessible. diff --git a/test/test_client.py b/test/test_client.py index 47e9d4cb9..7e5f07bda 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -33,7 +33,7 @@ from bson.tz_util import utc from pymongo.mongo_client import MongoClient from pymongo.database import Database from pymongo.pool import SocketInfo -from pymongo import thread_util +from pymongo import thread_util, common from pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, @@ -894,6 +894,53 @@ with client.start_request() as request: client = MongoClient('doesnt exist', _connect=False) self.assertFalse(client.alive()) + def test_wire_version(self): + c = MockClient( + standalones=[], + members=['a:1', 'b:2', 'c:3'], + mongoses=[], + host='b:2', # Pass a secondary. + replicaSet='rs', + _connect=False) + + c.set_wire_version_range('a:1', 1, 5) + c.db.collection.find_one() # Connect. + self.assertEqual(c.min_wire_version, 1) + self.assertEqual(c.max_wire_version, 5) + + c.set_wire_version_range('a:1', 10, 11) + c.disconnect() + self.assertRaises(ConfigurationError, c.db.collection.find_one) + + def test_wire_version_mongos_ha(self): + c = MockClient( + standalones=[], + members=[], + mongoses=['a:1', 'b:2', 'c:3'], + host='a:1,b:2,c:3', + _connect=False) + + c.set_wire_version_range('a:1', 2, 5) + c.set_wire_version_range('b:2', 2, 2) + c.set_wire_version_range('c:3', 1, 1) + c.db.collection.find_one() # Connect. + + # Which member did we use? + used_host = '%s:%s' % (c.host, c.port) + expected_min, expected_max = c.mock_wire_versions[used_host] + self.assertEqual(expected_min, c.min_wire_version) + self.assertEqual(expected_max, c.max_wire_version) + + c.set_wire_version_range('a:1', 0, 0) + c.set_wire_version_range('b:2', 0, 0) + c.set_wire_version_range('c:3', 0, 0) + c.disconnect() + c.db.collection.find_one() + used_host = '%s:%s' % (c.host, c.port) + expected_min, expected_max = c.mock_wire_versions[used_host] + self.assertEqual(expected_min, c.min_wire_version) + self.assertEqual(expected_max, c.max_wire_version) + def test_replica_set(self): client = MongoClient(host, port) name = client.pymongo_test.command('ismaster').get('setName') @@ -903,7 +950,7 @@ with client.start_request() as request: MongoClient(host, port, replicaSet=name) # No error. self.assertRaises( - ConnectionFailure, + ConfigurationError, MongoClient, host, port, replicaSet='bad' + name) diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 69746dc79..135588ed1 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -1140,6 +1140,37 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): self.assertFalse(client.alive()) +class TestReplicaSetWireVersion(unittest.TestCase): + def test_wire_version(self): + c = MockReplicaSetClient( + standalones=[], + members=['a:1', 'b:2', 'c:3'], + mongoses=[], + host='a:1', + replicaSet='rs', + _connect=False) + + c.set_wire_version_range('a:1', 1, 5) + c.set_wire_version_range('b:2', 0, 1) + c.set_wire_version_range('c:3', 1, 2) + c.db.collection.find_one() # Connect. + self.assertEqual(c.min_wire_version, 1) + self.assertEqual(c.max_wire_version, 5) + + c.set_wire_version_range('a:1', 2, 2) + c.refresh() + self.assertEqual(c.min_wire_version, 2) + self.assertEqual(c.max_wire_version, 2) + + # A secondary doesn't overlap with us. + c.set_wire_version_range('b:2', 5, 6) + + # refresh() raises, as do all following operations. + self.assertRaises(ConfigurationError, c.refresh) + self.assertRaises(ConfigurationError, c.db.collection.find_one) + self.assertRaises(ConfigurationError, c.db.collection.insert, {}) + + # Test concurrent access to a lazily-connecting RS client. class TestReplicaSetClientLazyConnect( TestReplicaSetClientBase, diff --git a/test/test_replica_set_reconfig.py b/test/test_replica_set_reconfig.py index 7e4cbefc1..93e9bed7d 100644 --- a/test/test_replica_set_reconfig.py +++ b/test/test_replica_set_reconfig.py @@ -19,7 +19,7 @@ import unittest sys.path[0:0] = [""] -from pymongo.errors import AutoReconnect, ConnectionFailure +from pymongo.errors import ConfigurationError, ConnectionFailure from pymongo import ReadPreference from test.pymongo_mocks import MockClient, MockReplicaSetClient @@ -53,7 +53,7 @@ class TestSecondaryBecomesStandalone(unittest.TestCase): try: c.db.collection.find_one() - except AutoReconnect, e: + except ConfigurationError, e: self.assertTrue('not a member of replica set' in str(e)) else: self.fail("MongoClient didn't raise AutoReconnect")