Raise ConfigurationError if a server's min wire version is > 2, PYTHON-587.
This commit is contained in:
parent
2f67eff07e
commit
3757086e02
@ -15,6 +15,13 @@
|
||||
|
||||
.. autoclass:: pymongo.read_preferences.ReadPreference
|
||||
.. autofunction:: has_c
|
||||
.. data:: MIN_SUPPORTED_WIRE_VERSION
|
||||
|
||||
The minimum wire protocol version PyMongo supports.
|
||||
|
||||
.. data:: MAX_SUPPORTED_WIRE_VERSION
|
||||
|
||||
The maximum wire protocol version PyMongo supports.
|
||||
|
||||
Sub-modules:
|
||||
|
||||
|
||||
@ -77,6 +77,8 @@ def get_version_string():
|
||||
version = get_version_string()
|
||||
"""Current version of PyMongo."""
|
||||
|
||||
from pymongo.common import (MIN_SUPPORTED_WIRE_VERSION,
|
||||
MAX_SUPPORTED_WIRE_VERSION)
|
||||
from pymongo.connection import Connection
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
|
||||
|
||||
@ -42,6 +42,10 @@ MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE
|
||||
MIN_WIRE_VERSION = 0
|
||||
MAX_WIRE_VERSION = 0
|
||||
|
||||
# What this version of PyMongo supports.
|
||||
MIN_SUPPORTED_WIRE_VERSION = 0
|
||||
MAX_SUPPORTED_WIRE_VERSION = 2
|
||||
|
||||
|
||||
def raise_config_error(key, dummy):
|
||||
"""Raise ConfigurationError with the given key name."""
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
"""Represent a mongod / mongos instance"""
|
||||
|
||||
from pymongo import common
|
||||
from pymongo.errors import ConfigurationError
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
# Member states
|
||||
@ -53,6 +54,7 @@ class Member(object):
|
||||
else:
|
||||
self.state = OTHER
|
||||
|
||||
self.set_name = ismaster_response.get('setName')
|
||||
self.tags = ismaster_response.get('tags', {})
|
||||
self.max_bson_size = ismaster_response.get(
|
||||
'maxBsonObjectSize', common.MAX_BSON_SIZE)
|
||||
@ -63,7 +65,21 @@ class Member(object):
|
||||
self.max_wire_version = ismaster_response.get(
|
||||
'maxWireVersion', common.MAX_WIRE_VERSION)
|
||||
|
||||
self.set_name = ismaster_response.get('setName')
|
||||
# self.min/max_wire_version is the server's wire protocol.
|
||||
# MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
|
||||
if (
|
||||
# Server too new.
|
||||
common.MAX_SUPPORTED_WIRE_VERSION < self.min_wire_version
|
||||
# Server too old.
|
||||
or common.MIN_SUPPORTED_WIRE_VERSION > self.max_wire_version
|
||||
):
|
||||
raise ConfigurationError(
|
||||
"Server at %s:%d uses wire protocol versions %d through %d, "
|
||||
"but PyMongo only supports %d through %d"
|
||||
% (self.host[0], self.host[1],
|
||||
self.min_wire_version, self.max_wire_version,
|
||||
common.MIN_SUPPORTED_WIRE_VERSION,
|
||||
common.MAX_SUPPORTED_WIRE_VERSION))
|
||||
|
||||
def clone_with(self, ismaster_response, ping_time_sample):
|
||||
"""Get a clone updated with ismaster response and a single ping time.
|
||||
|
||||
@ -848,8 +848,9 @@ class MongoClient(common.BaseObject):
|
||||
chosen_member = member
|
||||
discovered_nodes = nodes
|
||||
break
|
||||
except OperationFailure:
|
||||
# The server is available but something failed, probably auth.
|
||||
except (OperationFailure, ConfigurationError):
|
||||
# The server is available but something failed, e.g. auth,
|
||||
# wrong replica set name, or incompatible wire protocol.
|
||||
raise
|
||||
except Exception, why:
|
||||
errors.append(str(why))
|
||||
|
||||
@ -130,7 +130,7 @@ def _partition_node(node):
|
||||
class RSState(object):
|
||||
def __init__(
|
||||
self, threadlocal, hosts=None, host_to_member=None, arbiters=None,
|
||||
writer=None, error_message='No primary available'):
|
||||
writer=None, error_message='No primary available', exc=None):
|
||||
"""An immutable snapshot of the client's view of the replica set state.
|
||||
|
||||
Stores Member instances for all members we're connected to, and a
|
||||
@ -144,6 +144,7 @@ class RSState(object):
|
||||
- `arbiters`: Optional sequence of arbiters as (host, port)
|
||||
- `writer`: Optional (host, port) of primary
|
||||
- `error_message`: Optional error if `writer` is None
|
||||
- `exc`: Optional error if state is unusable
|
||||
"""
|
||||
self._threadlocal = threadlocal # threading.local or gevent local
|
||||
self._arbiters = frozenset(arbiters or []) # set of (host, port)
|
||||
@ -152,6 +153,7 @@ class RSState(object):
|
||||
self._host_to_member = host_to_member or {}
|
||||
self._hosts = frozenset(hosts or [])
|
||||
self._members = frozenset(self._host_to_member.values())
|
||||
self._exc = exc
|
||||
self._primary_member = self.get(writer)
|
||||
|
||||
def clone_with_host_down(self, host, error_message):
|
||||
@ -168,7 +170,8 @@ class RSState(object):
|
||||
members,
|
||||
self._arbiters,
|
||||
None,
|
||||
error_message)
|
||||
error_message,
|
||||
self._exc)
|
||||
else:
|
||||
# Some other host went down. Keep our current primary or, if it's
|
||||
# already down, keep our current error message.
|
||||
@ -178,7 +181,8 @@ class RSState(object):
|
||||
members,
|
||||
self._arbiters,
|
||||
self._writer,
|
||||
self._error_message)
|
||||
self._error_message,
|
||||
self._exc)
|
||||
|
||||
def clone_without_writer(self, threadlocal):
|
||||
"""Get a clone without a primary. Unpins all threads.
|
||||
@ -190,8 +194,17 @@ class RSState(object):
|
||||
threadlocal,
|
||||
self._hosts,
|
||||
self._host_to_member,
|
||||
self._arbiters)
|
||||
|
||||
def clone_with_error(self, exc):
|
||||
return RSState(
|
||||
self._threadlocal,
|
||||
self._hosts,
|
||||
self._host_to_member.copy(),
|
||||
self._arbiters,
|
||||
None)
|
||||
self._writer,
|
||||
self._error_message,
|
||||
exc)
|
||||
|
||||
@property
|
||||
def arbiters(self):
|
||||
@ -232,6 +245,11 @@ class RSState(object):
|
||||
host for host, member in self._host_to_member.items()
|
||||
if member.is_secondary])
|
||||
|
||||
@property
|
||||
def exc(self):
|
||||
"""Reason RSState is unusable, or None."""
|
||||
return self._exc
|
||||
|
||||
def get(self, host):
|
||||
"""Return a Member instance or None for the given (host, port)."""
|
||||
return self._host_to_member.get(host)
|
||||
@ -1054,8 +1072,15 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
"""
|
||||
# Only one thread / greenlet calls refresh() at a time: the one
|
||||
# running __init__() or the monitor. We won't modify the state, only
|
||||
# replace it at the end.
|
||||
# replace it.
|
||||
rs_state = self.__rs_state
|
||||
try:
|
||||
self.__rs_state = self.__create_rs_state(rs_state, initial)
|
||||
except ConfigurationError, e:
|
||||
self.__rs_state = rs_state.clone_with_error(e)
|
||||
raise
|
||||
|
||||
def __create_rs_state(self, rs_state, initial):
|
||||
errors = []
|
||||
if rs_state.hosts:
|
||||
# Try first those hosts we think are up, then the down ones.
|
||||
@ -1198,18 +1223,26 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
+ response.get('passives', []))
|
||||
|
||||
# Replace old state with new.
|
||||
self.__rs_state = RSState(
|
||||
return RSState(
|
||||
threadlocal,
|
||||
[_partition_node(h) for h in final_host_list],
|
||||
members,
|
||||
arbiters,
|
||||
writer)
|
||||
|
||||
def __get_rs_state(self):
|
||||
rs_state = self.__rs_state
|
||||
if rs_state.exc:
|
||||
raise rs_state.exc
|
||||
|
||||
return rs_state
|
||||
|
||||
def __find_primary(self):
|
||||
"""Returns a connection to the primary of this replica set,
|
||||
if one exists, or raises AutoReconnect.
|
||||
"""
|
||||
primary = self.__rs_state.primary_member
|
||||
rs_state = self.__get_rs_state()
|
||||
primary = rs_state.primary_member
|
||||
if primary:
|
||||
return primary
|
||||
|
||||
@ -1218,7 +1251,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
|
||||
# Try again. This time copy the RSState reference so we're guaranteed
|
||||
# primary_member and error_message are from the same state.
|
||||
rs_state = self.__rs_state
|
||||
rs_state = self.__get_rs_state()
|
||||
if rs_state.primary_member:
|
||||
return rs_state.primary_member
|
||||
|
||||
@ -1248,7 +1281,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
|
||||
if sync:
|
||||
rs_state = self.__rs_state
|
||||
if not rs_state.primary_member:
|
||||
if rs_state.exc or not rs_state.primary_member:
|
||||
self.__schedule_refresh(sync)
|
||||
|
||||
def disconnect(self):
|
||||
@ -1425,7 +1458,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
if _connection_to_use in (None, -1):
|
||||
member = self.__find_primary()
|
||||
else:
|
||||
member = self.__rs_state.get(_connection_to_use)
|
||||
member = self.__get_rs_state().get(_connection_to_use)
|
||||
|
||||
sock_info = None
|
||||
try:
|
||||
@ -1524,7 +1557,7 @@ class MongoReplicaSetClient(common.BaseObject):
|
||||
"""
|
||||
self._ensure_connected()
|
||||
|
||||
rs_state = self.__rs_state
|
||||
rs_state = self.__get_rs_state()
|
||||
tag_sets = kwargs.get('tag_sets', [{}])
|
||||
mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
|
||||
if _must_use_master:
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
|
||||
import socket
|
||||
|
||||
from pymongo import common
|
||||
from pymongo import MongoClient, MongoReplicaSetClient
|
||||
from pymongo.pool import Pool
|
||||
|
||||
@ -77,6 +78,9 @@ class MockClientBase(object):
|
||||
# Hosts that should raise socket errors.
|
||||
self.mock_down_hosts = []
|
||||
|
||||
# Hostname -> (min wire version, max wire version)
|
||||
self.mock_wire_versions = {}
|
||||
|
||||
def kill_host(self, host):
|
||||
"""Host is like 'a:1'."""
|
||||
self.mock_down_hosts.append(host)
|
||||
@ -85,13 +89,23 @@ class MockClientBase(object):
|
||||
"""Host is like 'a:1'."""
|
||||
self.mock_down_hosts.remove(host)
|
||||
|
||||
def set_wire_version_range(self, host, min_version, max_version):
|
||||
self.mock_wire_versions[host] = (min_version, max_version)
|
||||
|
||||
def mock_is_master(self, host):
|
||||
min_wire_version, max_wire_version = self.mock_wire_versions.get(
|
||||
host,
|
||||
(common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION))
|
||||
|
||||
# host is like 'a:1'.
|
||||
if host in self.mock_down_hosts:
|
||||
raise socket.timeout('mock timeout')
|
||||
|
||||
if host in self.mock_standalones:
|
||||
return {'ismaster': True}
|
||||
return {
|
||||
'ismaster': True,
|
||||
'minWireVersion': min_wire_version,
|
||||
'maxWireVersion': max_wire_version}
|
||||
|
||||
if host in self.mock_members:
|
||||
ismaster = (host == self.mock_primary)
|
||||
@ -101,7 +115,9 @@ class MockClientBase(object):
|
||||
'ismaster': ismaster,
|
||||
'secondary': not ismaster,
|
||||
'setName': 'rs',
|
||||
'hosts': self.mock_ismaster_hosts}
|
||||
'hosts': self.mock_ismaster_hosts,
|
||||
'minWireVersion': min_wire_version,
|
||||
'maxWireVersion': max_wire_version}
|
||||
|
||||
if self.mock_primary:
|
||||
response['primary'] = self.mock_primary
|
||||
@ -109,7 +125,11 @@ class MockClientBase(object):
|
||||
return response
|
||||
|
||||
if host in self.mock_mongoses:
|
||||
return {'ismaster': True, 'msg': 'isdbgrid'}
|
||||
return {
|
||||
'ismaster': True,
|
||||
'minWireVersion': min_wire_version,
|
||||
'maxWireVersion': max_wire_version,
|
||||
'msg': 'isdbgrid'}
|
||||
|
||||
# In test_internal_ips(), we try to connect to a host listed
|
||||
# in ismaster['hosts'] but not publicly accessible.
|
||||
|
||||
@ -33,7 +33,7 @@ from bson.tz_util import utc
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.database import Database
|
||||
from pymongo.pool import SocketInfo
|
||||
from pymongo import thread_util
|
||||
from pymongo import thread_util, common
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
ConfigurationError,
|
||||
ConnectionFailure,
|
||||
@ -894,6 +894,53 @@ with client.start_request() as request:
|
||||
client = MongoClient('doesnt exist', _connect=False)
|
||||
self.assertFalse(client.alive())
|
||||
|
||||
def test_wire_version(self):
|
||||
c = MockClient(
|
||||
standalones=[],
|
||||
members=['a:1', 'b:2', 'c:3'],
|
||||
mongoses=[],
|
||||
host='b:2', # Pass a secondary.
|
||||
replicaSet='rs',
|
||||
_connect=False)
|
||||
|
||||
c.set_wire_version_range('a:1', 1, 5)
|
||||
c.db.collection.find_one() # Connect.
|
||||
self.assertEqual(c.min_wire_version, 1)
|
||||
self.assertEqual(c.max_wire_version, 5)
|
||||
|
||||
c.set_wire_version_range('a:1', 10, 11)
|
||||
c.disconnect()
|
||||
self.assertRaises(ConfigurationError, c.db.collection.find_one)
|
||||
|
||||
def test_wire_version_mongos_ha(self):
|
||||
c = MockClient(
|
||||
standalones=[],
|
||||
members=[],
|
||||
mongoses=['a:1', 'b:2', 'c:3'],
|
||||
host='a:1,b:2,c:3',
|
||||
_connect=False)
|
||||
|
||||
c.set_wire_version_range('a:1', 2, 5)
|
||||
c.set_wire_version_range('b:2', 2, 2)
|
||||
c.set_wire_version_range('c:3', 1, 1)
|
||||
c.db.collection.find_one() # Connect.
|
||||
|
||||
# Which member did we use?
|
||||
used_host = '%s:%s' % (c.host, c.port)
|
||||
expected_min, expected_max = c.mock_wire_versions[used_host]
|
||||
self.assertEqual(expected_min, c.min_wire_version)
|
||||
self.assertEqual(expected_max, c.max_wire_version)
|
||||
|
||||
c.set_wire_version_range('a:1', 0, 0)
|
||||
c.set_wire_version_range('b:2', 0, 0)
|
||||
c.set_wire_version_range('c:3', 0, 0)
|
||||
c.disconnect()
|
||||
c.db.collection.find_one()
|
||||
used_host = '%s:%s' % (c.host, c.port)
|
||||
expected_min, expected_max = c.mock_wire_versions[used_host]
|
||||
self.assertEqual(expected_min, c.min_wire_version)
|
||||
self.assertEqual(expected_max, c.max_wire_version)
|
||||
|
||||
def test_replica_set(self):
|
||||
client = MongoClient(host, port)
|
||||
name = client.pymongo_test.command('ismaster').get('setName')
|
||||
@ -903,7 +950,7 @@ with client.start_request() as request:
|
||||
MongoClient(host, port, replicaSet=name) # No error.
|
||||
|
||||
self.assertRaises(
|
||||
ConnectionFailure,
|
||||
ConfigurationError,
|
||||
MongoClient, host, port, replicaSet='bad' + name)
|
||||
|
||||
|
||||
|
||||
@ -1140,6 +1140,37 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
|
||||
self.assertFalse(client.alive())
|
||||
|
||||
|
||||
class TestReplicaSetWireVersion(unittest.TestCase):
|
||||
def test_wire_version(self):
|
||||
c = MockReplicaSetClient(
|
||||
standalones=[],
|
||||
members=['a:1', 'b:2', 'c:3'],
|
||||
mongoses=[],
|
||||
host='a:1',
|
||||
replicaSet='rs',
|
||||
_connect=False)
|
||||
|
||||
c.set_wire_version_range('a:1', 1, 5)
|
||||
c.set_wire_version_range('b:2', 0, 1)
|
||||
c.set_wire_version_range('c:3', 1, 2)
|
||||
c.db.collection.find_one() # Connect.
|
||||
self.assertEqual(c.min_wire_version, 1)
|
||||
self.assertEqual(c.max_wire_version, 5)
|
||||
|
||||
c.set_wire_version_range('a:1', 2, 2)
|
||||
c.refresh()
|
||||
self.assertEqual(c.min_wire_version, 2)
|
||||
self.assertEqual(c.max_wire_version, 2)
|
||||
|
||||
# A secondary doesn't overlap with us.
|
||||
c.set_wire_version_range('b:2', 5, 6)
|
||||
|
||||
# refresh() raises, as do all following operations.
|
||||
self.assertRaises(ConfigurationError, c.refresh)
|
||||
self.assertRaises(ConfigurationError, c.db.collection.find_one)
|
||||
self.assertRaises(ConfigurationError, c.db.collection.insert, {})
|
||||
|
||||
|
||||
# Test concurrent access to a lazily-connecting RS client.
|
||||
class TestReplicaSetClientLazyConnect(
|
||||
TestReplicaSetClientBase,
|
||||
|
||||
@ -19,7 +19,7 @@ import unittest
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from pymongo.errors import AutoReconnect, ConnectionFailure
|
||||
from pymongo.errors import ConfigurationError, ConnectionFailure
|
||||
from pymongo import ReadPreference
|
||||
from test.pymongo_mocks import MockClient, MockReplicaSetClient
|
||||
|
||||
@ -53,7 +53,7 @@ class TestSecondaryBecomesStandalone(unittest.TestCase):
|
||||
|
||||
try:
|
||||
c.db.collection.find_one()
|
||||
except AutoReconnect, e:
|
||||
except ConfigurationError, e:
|
||||
self.assertTrue('not a member of replica set' in str(e))
|
||||
else:
|
||||
self.fail("MongoClient didn't raise AutoReconnect")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user