739 lines
27 KiB
Python
739 lines
27 KiB
Python
# Copyright 2011-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the replica_set_connection module."""
|
|
|
|
import contextlib
|
|
import copy
|
|
import pickle
|
|
import random
|
|
import sys
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from bson.son import SON
|
|
from pymongo.errors import ConfigurationError, OperationFailure
|
|
from pymongo.message import _maybe_add_read_preference
|
|
from pymongo.mongo_client import MongoClient
|
|
from pymongo.read_preferences import (ReadPreference, MovingAverage,
|
|
Primary, PrimaryPreferred,
|
|
Secondary, SecondaryPreferred,
|
|
Nearest)
|
|
from pymongo.server_description import ServerDescription
|
|
from pymongo.server_selectors import readable_server_selector, Selection
|
|
from pymongo.server_type import SERVER_TYPE
|
|
from pymongo.write_concern import WriteConcern
|
|
|
|
from test import (SkipTest,
|
|
client_context,
|
|
IntegrationTest,
|
|
unittest)
|
|
from test.utils import (connected,
|
|
one,
|
|
OvertCommandListener,
|
|
rs_client,
|
|
single_client,
|
|
wait_until)
|
|
from test.version import Version
|
|
|
|
|
|
class TestSelections(IntegrationTest):
|
|
|
|
@client_context.require_connection
|
|
def test_bool(self):
|
|
client = single_client()
|
|
|
|
wait_until(lambda: client.address, "discover primary")
|
|
selection = Selection.from_topology_description(
|
|
client._topology.description)
|
|
|
|
self.assertTrue(selection)
|
|
self.assertFalse(selection.with_server_descriptions([]))
|
|
|
|
|
|
class TestReadPreferenceObjects(unittest.TestCase):
|
|
prefs = [Primary(),
|
|
PrimaryPreferred(),
|
|
Secondary(),
|
|
Nearest(tag_sets=[{'a': 1}, {'b': 2}]),
|
|
SecondaryPreferred(max_staleness=30)]
|
|
|
|
def test_pickle(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, pickle.loads(pickle.dumps(pref)))
|
|
|
|
def test_copy(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, copy.copy(pref))
|
|
|
|
def test_deepcopy(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, copy.deepcopy(pref))
|
|
|
|
|
|
class TestReadPreferencesBase(IntegrationTest):
|
|
|
|
@classmethod
|
|
@client_context.require_secondaries_count(1)
|
|
def setUpClass(cls):
|
|
super(TestReadPreferencesBase, cls).setUpClass()
|
|
|
|
def setUp(self):
|
|
super(TestReadPreferencesBase, self).setUp()
|
|
# Insert some data so we can use cursors in read_from_which_host
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.get_database(
|
|
"pymongo_test",
|
|
write_concern=WriteConcern(w=client_context.w)).test.insert_many(
|
|
[{'_id': i} for i in range(10)])
|
|
|
|
self.addCleanup(self.client.pymongo_test.test.drop)
|
|
|
|
def read_from_which_host(self, client):
|
|
"""Do a find() on the client and return which host was used
|
|
"""
|
|
cursor = client.pymongo_test.test.find()
|
|
next(cursor)
|
|
return cursor.address
|
|
|
|
def read_from_which_kind(self, client):
|
|
"""Do a find() on the client and return 'primary' or 'secondary'
|
|
depending on which the client used.
|
|
"""
|
|
address = self.read_from_which_host(client)
|
|
if address == client.primary:
|
|
return 'primary'
|
|
elif address in client.secondaries:
|
|
return 'secondary'
|
|
else:
|
|
self.fail(
|
|
'Cursor used address %s, expected either primary '
|
|
'%s or secondaries %s' % (
|
|
address, client.primary, client.secondaries))
|
|
|
|
def assertReadsFrom(self, expected, **kwargs):
|
|
c = rs_client(**kwargs)
|
|
wait_until(
|
|
lambda: len(c.nodes - c.arbiters) == client_context.w,
|
|
"discovered all nodes")
|
|
|
|
used = self.read_from_which_kind(c)
|
|
self.assertEqual(expected, used, 'Cursor used %s, expected %s' % (
|
|
used, expected))
|
|
|
|
|
|
class TestSingleSecondaryOk(TestReadPreferencesBase):
|
|
|
|
def test_reads_from_secondary(self):
|
|
|
|
host, port = next(iter(self.client.secondaries))
|
|
# Direct connection to a secondary.
|
|
client = single_client(host, port)
|
|
self.assertFalse(client.is_primary)
|
|
|
|
# Regardless of read preference, we should be able to do
|
|
# "reads" with a direct connection to a secondary.
|
|
# See server-selection.rst#topology-type-single.
|
|
self.assertEqual(client.read_preference, ReadPreference.PRIMARY)
|
|
|
|
db = client.pymongo_test
|
|
coll = db.test
|
|
|
|
# Test find and find_one.
|
|
self.assertIsNotNone(coll.find_one())
|
|
self.assertEqual(10, len(list(coll.find())))
|
|
|
|
# Test some database helpers.
|
|
self.assertIsNotNone(db.list_collection_names())
|
|
self.assertIsNotNone(db.validate_collection("test"))
|
|
self.assertIsNotNone(db.command("ping"))
|
|
|
|
# Test some collection helpers.
|
|
self.assertEqual(10, coll.count_documents({}))
|
|
self.assertEqual(10, len(coll.distinct("_id")))
|
|
self.assertIsNotNone(coll.aggregate([]))
|
|
self.assertIsNotNone(coll.index_information())
|
|
|
|
|
|
class TestReadPreferences(TestReadPreferencesBase):
|
|
|
|
def test_mode_validation(self):
|
|
for mode in (ReadPreference.PRIMARY,
|
|
ReadPreference.PRIMARY_PREFERRED,
|
|
ReadPreference.SECONDARY,
|
|
ReadPreference.SECONDARY_PREFERRED,
|
|
ReadPreference.NEAREST):
|
|
self.assertEqual(
|
|
mode,
|
|
rs_client(read_preference=mode).read_preference)
|
|
|
|
self.assertRaises(
|
|
TypeError,
|
|
rs_client, read_preference='foo')
|
|
|
|
def test_tag_sets_validation(self):
|
|
S = Secondary(tag_sets=[{}])
|
|
self.assertEqual(
|
|
[{}],
|
|
rs_client(read_preference=S).read_preference.tag_sets)
|
|
|
|
S = Secondary(tag_sets=[{'k': 'v'}])
|
|
self.assertEqual(
|
|
[{'k': 'v'}],
|
|
rs_client(read_preference=S).read_preference.tag_sets)
|
|
|
|
S = Secondary(tag_sets=[{'k': 'v'}, {}])
|
|
self.assertEqual(
|
|
[{'k': 'v'}, {}],
|
|
rs_client(read_preference=S).read_preference.tag_sets)
|
|
|
|
self.assertRaises(ValueError, Secondary, tag_sets=[])
|
|
|
|
# One dict not ok, must be a list of dicts
|
|
self.assertRaises(TypeError, Secondary, tag_sets={'k': 'v'})
|
|
|
|
self.assertRaises(TypeError, Secondary, tag_sets='foo')
|
|
|
|
self.assertRaises(TypeError, Secondary, tag_sets=['foo'])
|
|
|
|
def test_threshold_validation(self):
|
|
self.assertEqual(17, rs_client(
|
|
localThresholdMS=17, connect=False).options.local_threshold_ms)
|
|
|
|
self.assertEqual(42, rs_client(
|
|
localThresholdMS=42, connect=False).options.local_threshold_ms)
|
|
|
|
self.assertEqual(666, rs_client(
|
|
localThresholdMS=666, connect=False).options.local_threshold_ms)
|
|
|
|
self.assertEqual(0, rs_client(
|
|
localThresholdMS=0, connect=False).options.local_threshold_ms)
|
|
|
|
self.assertRaises(ValueError,
|
|
rs_client,
|
|
localthresholdms=-1)
|
|
|
|
def test_zero_latency(self):
|
|
ping_times: set = set()
|
|
# Generate unique ping times.
|
|
while len(ping_times) < len(self.client.nodes):
|
|
ping_times.add(random.random())
|
|
for ping_time, host in zip(ping_times, self.client.nodes):
|
|
ServerDescription._host_to_round_trip_time[host] = ping_time
|
|
try:
|
|
client = connected(
|
|
rs_client(readPreference='nearest', localThresholdMS=0))
|
|
wait_until(
|
|
lambda: client.nodes == self.client.nodes,
|
|
"discovered all nodes")
|
|
host = self.read_from_which_host(client)
|
|
for _ in range(5):
|
|
self.assertEqual(host, self.read_from_which_host(client))
|
|
finally:
|
|
ServerDescription._host_to_round_trip_time.clear()
|
|
|
|
def test_primary(self):
|
|
self.assertReadsFrom(
|
|
'primary', read_preference=ReadPreference.PRIMARY)
|
|
|
|
def test_primary_with_tags(self):
|
|
# Tags not allowed with PRIMARY
|
|
self.assertRaises(
|
|
ConfigurationError,
|
|
rs_client, tag_sets=[{'dc': 'ny'}])
|
|
|
|
def test_primary_preferred(self):
|
|
self.assertReadsFrom(
|
|
'primary', read_preference=ReadPreference.PRIMARY_PREFERRED)
|
|
|
|
def test_secondary(self):
|
|
self.assertReadsFrom(
|
|
'secondary', read_preference=ReadPreference.SECONDARY)
|
|
|
|
def test_secondary_preferred(self):
|
|
self.assertReadsFrom(
|
|
'secondary', read_preference=ReadPreference.SECONDARY_PREFERRED)
|
|
|
|
def test_nearest(self):
|
|
# With high localThresholdMS, expect to read from any
|
|
# member
|
|
c = rs_client(
|
|
read_preference=ReadPreference.NEAREST,
|
|
localThresholdMS=10000) # 10 seconds
|
|
|
|
data_members = {self.client.primary} | self.client.secondaries
|
|
|
|
# This is a probabilistic test; track which members we've read from so
|
|
# far, and keep reading until we've used all the members or give up.
|
|
# Chance of using only 2 of 3 members 10k times if there's no bug =
|
|
# 3 * (2/3)**10000, very low.
|
|
used: set = set()
|
|
i = 0
|
|
while data_members.difference(used) and i < 10000:
|
|
address = self.read_from_which_host(c)
|
|
used.add(address)
|
|
i += 1
|
|
|
|
not_used = data_members.difference(used)
|
|
latencies = ', '.join(
|
|
'%s: %dms' % (server.description.address,
|
|
server.description.round_trip_time)
|
|
for server in c._get_topology().select_servers(
|
|
readable_server_selector))
|
|
|
|
self.assertFalse(
|
|
not_used,
|
|
"Expected to use primary and all secondaries for mode NEAREST,"
|
|
" but didn't use %s\nlatencies: %s" % (not_used, latencies))
|
|
|
|
|
|
class ReadPrefTester(MongoClient):
|
|
def __init__(self, *args, **kwargs):
|
|
self.has_read_from = set()
|
|
client_options = client_context.client_options
|
|
client_options.update(kwargs)
|
|
super(ReadPrefTester, self).__init__(*args, **client_options)
|
|
|
|
@contextlib.contextmanager
|
|
def _socket_for_reads(self, read_preference, session):
|
|
context = super(ReadPrefTester, self)._socket_for_reads(
|
|
read_preference, session)
|
|
with context as (sock_info, read_preference):
|
|
self.record_a_read(sock_info.address)
|
|
yield sock_info, read_preference
|
|
|
|
@contextlib.contextmanager
|
|
def _socket_from_server(self, read_preference, server, session):
|
|
context = super(ReadPrefTester, self)._socket_from_server(
|
|
read_preference, server, session)
|
|
with context as (sock_info, read_preference):
|
|
self.record_a_read(sock_info.address)
|
|
yield sock_info, read_preference
|
|
|
|
def record_a_read(self, address):
|
|
server = self._get_topology().select_server_by_address(address, 0)
|
|
self.has_read_from.add(server)
|
|
|
|
_PREF_MAP = [
|
|
(Primary, SERVER_TYPE.RSPrimary),
|
|
(PrimaryPreferred, SERVER_TYPE.RSPrimary),
|
|
(Secondary, SERVER_TYPE.RSSecondary),
|
|
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
|
|
(Nearest, 'any')
|
|
]
|
|
|
|
|
|
class TestCommandAndReadPreference(IntegrationTest):
|
|
c: ReadPrefTester
|
|
client_version: Version
|
|
|
|
@classmethod
|
|
@client_context.require_secondaries_count(1)
|
|
def setUpClass(cls):
|
|
super(TestCommandAndReadPreference, cls).setUpClass()
|
|
cls.c = ReadPrefTester(
|
|
client_context.pair,
|
|
# Ignore round trip times, to test ReadPreference modes only.
|
|
localThresholdMS=1000*1000)
|
|
cls.client_version = Version.from_client(cls.c)
|
|
# mapReduce fails if the collection does not exist.
|
|
coll = cls.c.pymongo_test.get_collection(
|
|
'test', write_concern=WriteConcern(w=client_context.w))
|
|
coll.insert_one({})
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.c.drop_database('pymongo_test')
|
|
cls.c.close()
|
|
|
|
def executed_on_which_server(self, client, fn, *args, **kwargs):
|
|
"""Execute fn(*args, **kwargs) and return the Server instance used."""
|
|
client.has_read_from.clear()
|
|
fn(*args, **kwargs)
|
|
self.assertEqual(1, len(client.has_read_from))
|
|
return one(client.has_read_from)
|
|
|
|
def assertExecutedOn(self, server_type, client, fn, *args, **kwargs):
|
|
server = self.executed_on_which_server(client, fn, *args, **kwargs)
|
|
self.assertEqual(SERVER_TYPE._fields[server_type],
|
|
SERVER_TYPE._fields[server.description.server_type])
|
|
|
|
def _test_fn(self, server_type, fn):
|
|
for _ in range(10):
|
|
if server_type == 'any':
|
|
used = set()
|
|
for _ in range(1000):
|
|
server = self.executed_on_which_server(self.c, fn)
|
|
used.add(server.description.address)
|
|
if len(used) == len(self.c.secondaries) + 1:
|
|
# Success
|
|
break
|
|
|
|
assert self.c.primary is not None
|
|
unused = self.c.secondaries.union(
|
|
set([self.c.primary])
|
|
).difference(used)
|
|
if unused:
|
|
self.fail(
|
|
"Some members not used for NEAREST: %s" % (
|
|
unused))
|
|
else:
|
|
self.assertExecutedOn(server_type, self.c, fn)
|
|
|
|
def _test_primary_helper(self, func):
|
|
# Helpers that ignore read preference.
|
|
self._test_fn(SERVER_TYPE.RSPrimary, func)
|
|
|
|
def _test_coll_helper(self, secondary_ok, coll, meth, *args, **kwargs):
|
|
for mode, server_type in _PREF_MAP:
|
|
new_coll = coll.with_options(read_preference=mode())
|
|
func = lambda: getattr(new_coll, meth)(*args, **kwargs)
|
|
if secondary_ok:
|
|
self._test_fn(server_type, func)
|
|
else:
|
|
self._test_fn(SERVER_TYPE.RSPrimary, func)
|
|
|
|
def test_command(self):
|
|
# Test that the generic command helper obeys the read preference
|
|
# passed to it.
|
|
for mode, server_type in _PREF_MAP:
|
|
func = lambda: self.c.pymongo_test.command('dbStats',
|
|
read_preference=mode())
|
|
self._test_fn(server_type, func)
|
|
|
|
def test_create_collection(self):
|
|
# create_collection runs listCollections on the primary to check if
|
|
# the collection already exists.
|
|
self._test_primary_helper(
|
|
lambda: self.c.pymongo_test.create_collection(
|
|
'some_collection%s' % random.randint(0, sys.maxsize)))
|
|
|
|
def test_count_documents(self):
|
|
self._test_coll_helper(
|
|
True, self.c.pymongo_test.test, 'count_documents', {})
|
|
|
|
def test_estimated_document_count(self):
|
|
self._test_coll_helper(
|
|
True, self.c.pymongo_test.test, 'estimated_document_count')
|
|
|
|
def test_distinct(self):
|
|
self._test_coll_helper(True, self.c.pymongo_test.test, 'distinct', 'a')
|
|
|
|
def test_aggregate(self):
|
|
self._test_coll_helper(True, self.c.pymongo_test.test,
|
|
'aggregate',
|
|
[{'$project': {'_id': 1}}])
|
|
|
|
def test_aggregate_write(self):
|
|
# 5.0 servers support $out on secondaries.
|
|
secondary_ok = client_context.version.at_least(5, 0)
|
|
self._test_coll_helper(secondary_ok, self.c.pymongo_test.test,
|
|
'aggregate',
|
|
[{'$project': {'_id': 1}}, {'$out': "agg_write_test"}])
|
|
|
|
|
|
class TestMovingAverage(unittest.TestCase):
|
|
def test_moving_average(self):
|
|
avg = MovingAverage()
|
|
self.assertIsNone(avg.get())
|
|
avg.add_sample(10)
|
|
self.assertAlmostEqual(10, avg.get()) # type: ignore
|
|
avg.add_sample(20)
|
|
self.assertAlmostEqual(12, avg.get()) # type: ignore
|
|
avg.add_sample(30)
|
|
self.assertAlmostEqual(15.6, avg.get()) # type: ignore
|
|
|
|
|
|
class TestMongosAndReadPreference(IntegrationTest):
|
|
|
|
def test_read_preference_document(self):
|
|
|
|
pref = Primary()
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'primary'})
|
|
|
|
pref = PrimaryPreferred()
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'primaryPreferred'})
|
|
pref = PrimaryPreferred(tag_sets=[{'dc': 'sf'}])
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'primaryPreferred', 'tags': [{'dc': 'sf'}]})
|
|
pref = PrimaryPreferred(
|
|
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'primaryPreferred',
|
|
'tags': [{'dc': 'sf'}],
|
|
'maxStalenessSeconds': 30})
|
|
|
|
pref = Secondary()
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondary'})
|
|
pref = Secondary(tag_sets=[{'dc': 'sf'}])
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondary', 'tags': [{'dc': 'sf'}]})
|
|
pref = Secondary(
|
|
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondary',
|
|
'tags': [{'dc': 'sf'}],
|
|
'maxStalenessSeconds': 30})
|
|
|
|
pref = SecondaryPreferred()
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondaryPreferred'})
|
|
pref = SecondaryPreferred(tag_sets=[{'dc': 'sf'}])
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondaryPreferred', 'tags': [{'dc': 'sf'}]})
|
|
pref = SecondaryPreferred(
|
|
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'secondaryPreferred',
|
|
'tags': [{'dc': 'sf'}],
|
|
'maxStalenessSeconds': 30})
|
|
|
|
pref = Nearest()
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'nearest'})
|
|
pref = Nearest(tag_sets=[{'dc': 'sf'}])
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'nearest', 'tags': [{'dc': 'sf'}]})
|
|
pref = Nearest(
|
|
tag_sets=[{'dc': 'sf'}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{'mode': 'nearest',
|
|
'tags': [{'dc': 'sf'}],
|
|
'maxStalenessSeconds': 30})
|
|
|
|
with self.assertRaises(TypeError):
|
|
# Float is prohibited.
|
|
Nearest(max_staleness=1.5) # type: ignore
|
|
|
|
with self.assertRaises(ValueError):
|
|
Nearest(max_staleness=0)
|
|
|
|
with self.assertRaises(ValueError):
|
|
Nearest(max_staleness=-2)
|
|
|
|
def test_read_preference_document_hedge(self):
|
|
cases = {
|
|
'primaryPreferred': PrimaryPreferred,
|
|
'secondary': Secondary,
|
|
'secondaryPreferred': SecondaryPreferred,
|
|
'nearest': Nearest,
|
|
}
|
|
for mode, cls in cases.items():
|
|
with self.assertRaises(TypeError):
|
|
cls(hedge=[]) # type: ignore
|
|
|
|
pref = cls(hedge={})
|
|
self.assertEqual(pref.document, {'mode': mode})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
if cls == SecondaryPreferred:
|
|
# SecondaryPreferred without hedge doesn't add $readPreference.
|
|
self.assertEqual(out, {})
|
|
else:
|
|
self.assertEqual(
|
|
out,
|
|
SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge = {'enabled': True}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge = {'enabled': False}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge = {'enabled': False, 'extra': 'option'}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {'mode': mode, 'hedge': hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
def test_send_hedge(self):
|
|
cases = {
|
|
'primaryPreferred': PrimaryPreferred,
|
|
'secondaryPreferred': SecondaryPreferred,
|
|
'nearest': Nearest,
|
|
}
|
|
if client_context.supports_secondary_read_pref:
|
|
cases['secondary'] = Secondary
|
|
listener = OvertCommandListener()
|
|
client = rs_client(event_listeners=[listener])
|
|
self.addCleanup(client.close)
|
|
client.admin.command('ping')
|
|
for mode, cls in cases.items():
|
|
pref = cls(hedge={'enabled': True})
|
|
coll = client.test.get_collection('test', read_preference=pref)
|
|
listener.reset()
|
|
coll.find_one()
|
|
started = listener.results['started']
|
|
self.assertEqual(len(started), 1, started)
|
|
cmd = started[0].command
|
|
if client_context.is_rs or client_context.is_mongos:
|
|
self.assertIn('$readPreference', cmd)
|
|
self.assertEqual(cmd['$readPreference'], pref.document)
|
|
else:
|
|
self.assertNotIn('$readPreference', cmd)
|
|
|
|
def test_maybe_add_read_preference(self):
|
|
|
|
# Primary doesn't add $readPreference
|
|
out = _maybe_add_read_preference({}, Primary())
|
|
self.assertEqual(out, {})
|
|
|
|
pref = PrimaryPreferred()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = PrimaryPreferred(tag_sets=[{'dc': 'nyc'}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
pref = Secondary()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = Secondary(tag_sets=[{'dc': 'nyc'}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
# SecondaryPreferred without tag_sets or max_staleness doesn't add
|
|
# $readPreference
|
|
pref = SecondaryPreferred()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, {})
|
|
pref = SecondaryPreferred(tag_sets=[{'dc': 'nyc'}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = SecondaryPreferred(max_staleness=120)
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
pref = Nearest()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = Nearest(tag_sets=[{'dc': 'nyc'}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(
|
|
out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
criteria = SON([("$query", {}), ("$orderby", SON([("_id", 1)]))])
|
|
pref = Nearest()
|
|
out = _maybe_add_read_preference(criteria, pref)
|
|
self.assertEqual(
|
|
out,
|
|
SON([("$query", {}),
|
|
("$orderby", SON([("_id", 1)])),
|
|
("$readPreference", pref.document)]))
|
|
pref = Nearest(tag_sets=[{'dc': 'nyc'}])
|
|
out = _maybe_add_read_preference(criteria, pref)
|
|
self.assertEqual(
|
|
out,
|
|
SON([("$query", {}),
|
|
("$orderby", SON([("_id", 1)])),
|
|
("$readPreference", pref.document)]))
|
|
|
|
@client_context.require_mongos
|
|
def test_mongos(self):
|
|
res = client_context.client.config.shards.find_one()
|
|
assert res is not None
|
|
shard = res['host']
|
|
num_members = shard.count(',') + 1
|
|
if num_members == 1:
|
|
raise SkipTest("Need a replica set shard to test.")
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test",
|
|
write_concern=WriteConcern(w=num_members))
|
|
coll.drop()
|
|
res = coll.insert_many([{} for _ in range(5)])
|
|
first_id = res.inserted_ids[0]
|
|
last_id = res.inserted_ids[-1]
|
|
|
|
# Note - this isn't a perfect test since there's no way to
|
|
# tell what shard member a query ran on.
|
|
for pref in (Primary(),
|
|
PrimaryPreferred(),
|
|
Secondary(),
|
|
SecondaryPreferred(),
|
|
Nearest()):
|
|
qcoll = coll.with_options(read_preference=pref)
|
|
results = list(qcoll.find().sort([("_id", 1)]))
|
|
self.assertEqual(first_id, results[0]["_id"])
|
|
self.assertEqual(last_id, results[-1]["_id"])
|
|
results = list(qcoll.find().sort([("_id", -1)]))
|
|
self.assertEqual(first_id, results[-1]["_id"])
|
|
self.assertEqual(last_id, results[0]["_id"])
|
|
|
|
@client_context.require_mongos
|
|
def test_mongos_max_staleness(self):
|
|
# Sanity check that we're sending maxStalenessSeconds
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test", read_preference=SecondaryPreferred(max_staleness=120))
|
|
# No error
|
|
coll.find_one()
|
|
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test", read_preference=SecondaryPreferred(max_staleness=10))
|
|
try:
|
|
coll.find_one()
|
|
except OperationFailure as exc:
|
|
self.assertEqual(160, exc.code)
|
|
else:
|
|
self.fail("mongos accepted invalid staleness")
|
|
|
|
coll = single_client(
|
|
readPreference='secondaryPreferred',
|
|
maxStalenessSeconds=120).pymongo_test.test
|
|
# No error
|
|
coll.find_one()
|
|
|
|
coll = single_client(
|
|
readPreference='secondaryPreferred',
|
|
maxStalenessSeconds=10).pymongo_test.test
|
|
try:
|
|
coll.find_one()
|
|
except OperationFailure as exc:
|
|
self.assertEqual(160, exc.code)
|
|
else:
|
|
self.fail("mongos accepted invalid staleness")
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|