382 lines
13 KiB
Python
382 lines
13 KiB
Python
# Copyright 2011-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the mongo_replica_set_client module."""
|
|
|
|
import sys
|
|
import warnings
|
|
import time
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from bson.codec_options import CodecOptions
|
|
from bson.son import SON
|
|
from pymongo.common import MAX_SUPPORTED_WIRE_VERSION, partition_node
|
|
from pymongo.errors import (AutoReconnect,
|
|
ConfigurationError,
|
|
ConnectionFailure,
|
|
NetworkTimeout,
|
|
NotMasterError,
|
|
OperationFailure)
|
|
from pymongo.mongo_client import MongoClient
|
|
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
|
|
from pymongo.read_preferences import ReadPreference, Secondary, Nearest
|
|
from pymongo.write_concern import WriteConcern
|
|
from test import (client_context,
|
|
client_knobs,
|
|
IntegrationTest,
|
|
unittest,
|
|
SkipTest,
|
|
db_pwd,
|
|
db_user,
|
|
MockClientTest,
|
|
HAVE_IPADDRESS)
|
|
from test.pymongo_mocks import MockClient
|
|
from test.utils import (connected,
|
|
delay,
|
|
ignore_deprecations,
|
|
one,
|
|
rs_client,
|
|
single_client,
|
|
wait_until)
|
|
|
|
|
|
class TestReplicaSetClientBase(IntegrationTest):
|
|
|
|
@classmethod
|
|
@client_context.require_replica_set
|
|
def setUpClass(cls):
|
|
super(TestReplicaSetClientBase, cls).setUpClass()
|
|
cls.name = client_context.replica_set_name
|
|
cls.w = client_context.w
|
|
|
|
ismaster = client_context.ismaster
|
|
cls.hosts = set(partition_node(h.lower()) for h in ismaster['hosts'])
|
|
cls.arbiters = set(partition_node(h)
|
|
for h in ismaster.get("arbiters", []))
|
|
|
|
repl_set_status = client_context.client.admin.command(
|
|
'replSetGetStatus')
|
|
primary_info = [
|
|
m for m in repl_set_status['members']
|
|
if m['stateStr'] == 'PRIMARY'
|
|
][0]
|
|
|
|
cls.primary = partition_node(primary_info['name'].lower())
|
|
cls.secondaries = set(
|
|
partition_node(m['name'].lower())
|
|
for m in repl_set_status['members']
|
|
if m['stateStr'] == 'SECONDARY')
|
|
|
|
|
|
class TestReplicaSetClient(TestReplicaSetClientBase):
|
|
def test_deprecated(self):
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", DeprecationWarning)
|
|
with self.assertRaises(DeprecationWarning):
|
|
MongoReplicaSetClient()
|
|
|
|
def test_connect(self):
|
|
client = MongoClient(
|
|
client_context.pair,
|
|
replicaSet='fdlksjfdslkjfd',
|
|
serverSelectionTimeoutMS=100)
|
|
|
|
with self.assertRaises(ConnectionFailure):
|
|
client.test.test.find_one()
|
|
|
|
def test_repr(self):
|
|
with ignore_deprecations():
|
|
client = MongoReplicaSetClient(
|
|
client_context.host,
|
|
client_context.port,
|
|
replicaSet=self.name)
|
|
|
|
self.assertIn("MongoReplicaSetClient(host=[", repr(client))
|
|
self.assertIn(client_context.pair, repr(client))
|
|
|
|
def test_properties(self):
|
|
c = client_context.client
|
|
c.admin.command('ping')
|
|
|
|
wait_until(lambda: c.primary == self.primary, "discover primary")
|
|
wait_until(lambda: c.secondaries == self.secondaries,
|
|
"discover secondaries")
|
|
|
|
# SERVER-32845
|
|
if not (client_context.version >= (3, 7, 2)
|
|
and client_context.auth_enabled and client_context.is_rs):
|
|
wait_until(lambda: c.arbiters == self.arbiters,
|
|
"discover arbiters")
|
|
self.assertEqual(c.arbiters, self.arbiters)
|
|
|
|
self.assertEqual(c.primary, self.primary)
|
|
self.assertEqual(c.secondaries, self.secondaries)
|
|
self.assertEqual(c.max_pool_size, 100)
|
|
|
|
# Make sure MongoClient's properties are copied to Database and
|
|
# Collection.
|
|
for obj in c, c.pymongo_test, c.pymongo_test.test:
|
|
self.assertEqual(obj.codec_options, CodecOptions())
|
|
self.assertEqual(obj.read_preference, ReadPreference.PRIMARY)
|
|
self.assertEqual(obj.write_concern, WriteConcern())
|
|
|
|
cursor = c.pymongo_test.test.find()
|
|
self.assertEqual(
|
|
ReadPreference.PRIMARY, cursor._Cursor__read_preference)
|
|
|
|
tag_sets = [{'dc': 'la', 'rack': '2'}, {'foo': 'bar'}]
|
|
secondary = Secondary(tag_sets=tag_sets)
|
|
c = rs_client(
|
|
maxPoolSize=25,
|
|
document_class=SON,
|
|
tz_aware=True,
|
|
read_preference=secondary,
|
|
localThresholdMS=77,
|
|
j=True)
|
|
|
|
self.assertEqual(c.max_pool_size, 25)
|
|
|
|
for obj in c, c.pymongo_test, c.pymongo_test.test:
|
|
self.assertEqual(obj.codec_options, CodecOptions(SON, True))
|
|
self.assertEqual(obj.read_preference, secondary)
|
|
self.assertEqual(obj.write_concern, WriteConcern(j=True))
|
|
|
|
cursor = c.pymongo_test.test.find()
|
|
self.assertEqual(
|
|
secondary, cursor._Cursor__read_preference)
|
|
|
|
nearest = Nearest(tag_sets=[{'dc': 'ny'}, {}])
|
|
cursor = c.pymongo_test.get_collection(
|
|
"test", read_preference=nearest).find()
|
|
|
|
self.assertEqual(nearest, cursor._Cursor__read_preference)
|
|
self.assertEqual(c.max_bson_size, 16777216)
|
|
c.close()
|
|
|
|
@client_context.require_secondaries_count(1)
|
|
def test_timeout_does_not_mark_member_down(self):
|
|
# If a query times out, the client shouldn't mark the member "down".
|
|
|
|
# Disable background refresh.
|
|
with client_knobs(heartbeat_frequency=999999):
|
|
c = rs_client(socketTimeoutMS=3000, w=self.w)
|
|
collection = c.pymongo_test.test
|
|
collection.insert_one({})
|
|
|
|
# Query the primary.
|
|
self.assertRaises(
|
|
NetworkTimeout,
|
|
collection.find_one,
|
|
{'$where': delay(5)})
|
|
|
|
self.assertTrue(c.primary)
|
|
collection.find_one() # No error.
|
|
|
|
coll = collection.with_options(
|
|
read_preference=ReadPreference.SECONDARY)
|
|
|
|
# Query the secondary.
|
|
self.assertRaises(
|
|
NetworkTimeout,
|
|
coll.find_one,
|
|
{'$where': delay(5)})
|
|
|
|
self.assertTrue(c.secondaries)
|
|
|
|
# No error.
|
|
coll.find_one()
|
|
|
|
@client_context.require_ipv6
|
|
def test_ipv6(self):
|
|
if client_context.ssl:
|
|
# http://bugs.python.org/issue13034
|
|
if sys.version_info[:2] == (2, 6):
|
|
raise SkipTest("Python 2.6 can't parse SANs")
|
|
if not HAVE_IPADDRESS:
|
|
raise SkipTest("Need the ipaddress module to test with SSL")
|
|
|
|
port = client_context.port
|
|
c = rs_client("mongodb://[::1]:%d" % (port,))
|
|
|
|
# Client switches to IPv4 once it has first ismaster response.
|
|
msg = 'discovered primary with IPv4 address "%r"' % (self.primary,)
|
|
wait_until(lambda: c.primary == self.primary, msg)
|
|
|
|
# Same outcome with both IPv4 and IPv6 seeds.
|
|
c = rs_client("mongodb://[::1]:%d,localhost:%d" % (port, port))
|
|
|
|
wait_until(lambda: c.primary == self.primary, msg)
|
|
|
|
if client_context.auth_enabled:
|
|
auth_str = "%s:%s@" % (db_user, db_pwd)
|
|
else:
|
|
auth_str = ""
|
|
|
|
uri = "mongodb://%slocalhost:%d,[::1]:%d" % (auth_str, port, port)
|
|
client = rs_client(uri)
|
|
client.pymongo_test.test.insert_one({"dummy": u"object"})
|
|
client.pymongo_test_bernie.test.insert_one({"dummy": u"object"})
|
|
|
|
dbs = client.database_names()
|
|
self.assertTrue("pymongo_test" in dbs)
|
|
self.assertTrue("pymongo_test_bernie" in dbs)
|
|
client.close()
|
|
|
|
def _test_kill_cursor_explicit(self, read_pref):
|
|
with client_knobs(kill_cursor_frequency=0.01):
|
|
c = rs_client(read_preference=read_pref, w=self.w)
|
|
db = c.pymongo_test
|
|
db.drop_collection("test")
|
|
|
|
test = db.test
|
|
test.insert_many([{"i": i} for i in range(20)])
|
|
|
|
# Partially evaluate cursor so it's left alive, then kill it
|
|
cursor = test.find().batch_size(10)
|
|
next(cursor)
|
|
self.assertNotEqual(0, cursor.cursor_id)
|
|
|
|
if read_pref == ReadPreference.PRIMARY:
|
|
msg = "Expected cursor's address to be %s, got %s" % (
|
|
c.primary, cursor.address)
|
|
|
|
self.assertEqual(cursor.address, c.primary, msg)
|
|
else:
|
|
self.assertNotEqual(
|
|
cursor.address, c.primary,
|
|
"Expected cursor's address not to be primary")
|
|
|
|
cursor_id = cursor.cursor_id
|
|
|
|
# Cursor dead on server - trigger a getMore on the same cursor_id
|
|
# and check that the server returns an error.
|
|
cursor2 = cursor.clone()
|
|
cursor2._Cursor__id = cursor_id
|
|
|
|
if sys.platform.startswith('java') or 'PyPy' in sys.version:
|
|
# Explicitly kill cursor.
|
|
cursor.close()
|
|
else:
|
|
# Implicitly kill it in CPython.
|
|
del cursor
|
|
|
|
time.sleep(5)
|
|
self.assertRaises(OperationFailure, lambda: list(cursor2))
|
|
|
|
def test_kill_cursor_explicit_primary(self):
|
|
self._test_kill_cursor_explicit(ReadPreference.PRIMARY)
|
|
|
|
@client_context.require_secondaries_count(1)
|
|
def test_kill_cursor_explicit_secondary(self):
|
|
self._test_kill_cursor_explicit(ReadPreference.SECONDARY)
|
|
|
|
@client_context.require_secondaries_count(1)
|
|
def test_not_master_error(self):
|
|
secondary_address = one(self.secondaries)
|
|
direct_client = single_client(*secondary_address)
|
|
|
|
with self.assertRaises(NotMasterError):
|
|
direct_client.pymongo_test.collection.insert_one({})
|
|
|
|
db = direct_client.get_database(
|
|
"pymongo_test", write_concern=WriteConcern(w=0))
|
|
with self.assertRaises(NotMasterError):
|
|
db.collection.insert_one({})
|
|
|
|
|
|
class TestReplicaSetWireVersion(MockClientTest):
|
|
|
|
@client_context.require_connection
|
|
@client_context.require_no_auth
|
|
def test_wire_version(self):
|
|
c = MockClient(
|
|
standalones=[],
|
|
members=['a:1', 'b:2', 'c:3'],
|
|
mongoses=[],
|
|
host='a:1',
|
|
replicaSet='rs',
|
|
connect=False)
|
|
|
|
c.set_wire_version_range('a:1', 3, 7)
|
|
c.set_wire_version_range('b:2', 2, 3)
|
|
c.set_wire_version_range('c:3', 3, 4)
|
|
c.db.command('ismaster') # Connect.
|
|
|
|
# A secondary doesn't overlap with us.
|
|
c.set_wire_version_range('b:2',
|
|
MAX_SUPPORTED_WIRE_VERSION + 1,
|
|
MAX_SUPPORTED_WIRE_VERSION + 2)
|
|
|
|
def raises_configuration_error():
|
|
try:
|
|
c.db.collection.find_one()
|
|
return False
|
|
except ConfigurationError:
|
|
return True
|
|
|
|
wait_until(raises_configuration_error,
|
|
'notice we are incompatible with server')
|
|
|
|
self.assertRaises(ConfigurationError, c.db.collection.insert_one, {})
|
|
|
|
|
|
class TestReplicaSetClientInternalIPs(MockClientTest):
|
|
|
|
@client_context.require_connection
|
|
def test_connect_with_internal_ips(self):
|
|
# Client is passed an IP it can reach, 'a:1', but the RS config
|
|
# only contains unreachable IPs like 'internal-ip'. PYTHON-608.
|
|
with self.assertRaises(AutoReconnect) as context:
|
|
connected(MockClient(
|
|
standalones=[],
|
|
members=['a:1'],
|
|
mongoses=[],
|
|
ismaster_hosts=['internal-ip:27017'],
|
|
host='a:1',
|
|
replicaSet='rs',
|
|
serverSelectionTimeoutMS=100))
|
|
|
|
self.assertEqual(
|
|
"Could not reach any servers in [('internal-ip', 27017)]."
|
|
" Replica set is configured with internal hostnames or IPs?",
|
|
str(context.exception))
|
|
|
|
class TestReplicaSetClientMaxWriteBatchSize(MockClientTest):
|
|
|
|
@client_context.require_connection
|
|
def test_max_write_batch_size(self):
|
|
c = MockClient(
|
|
standalones=[],
|
|
members=['a:1', 'b:2'],
|
|
mongoses=[],
|
|
host='a:1',
|
|
replicaSet='rs',
|
|
connect=False)
|
|
|
|
c.set_max_write_batch_size('a:1', 1)
|
|
c.set_max_write_batch_size('b:2', 2)
|
|
|
|
# Uses primary's max batch size.
|
|
self.assertEqual(c.max_write_batch_size, 1)
|
|
|
|
# b becomes primary.
|
|
c.mock_primary = 'b:2'
|
|
wait_until(lambda: c.max_write_batch_size == 2,
|
|
'update max_write_batch_size')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|