693 lines
26 KiB
Python
693 lines
26 KiB
Python
# Copyright 2011-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the replica_set_connection module."""
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import copy
|
|
import pickle
|
|
import random
|
|
import sys
|
|
from typing import Any
|
|
|
|
from pymongo.operations import _Op
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from test import IntegrationTest, SkipTest, client_context, connected, unittest
|
|
from test.utils import (
|
|
OvertCommandListener,
|
|
one,
|
|
wait_until,
|
|
)
|
|
from test.version import Version
|
|
|
|
from bson.son import SON
|
|
from pymongo.errors import ConfigurationError, OperationFailure
|
|
from pymongo.message import _maybe_add_read_preference
|
|
from pymongo.read_preferences import (
|
|
MovingAverage,
|
|
Nearest,
|
|
Primary,
|
|
PrimaryPreferred,
|
|
ReadPreference,
|
|
Secondary,
|
|
SecondaryPreferred,
|
|
)
|
|
from pymongo.server_description import ServerDescription
|
|
from pymongo.server_selectors import Selection, readable_server_selector
|
|
from pymongo.server_type import SERVER_TYPE
|
|
from pymongo.synchronous.mongo_client import MongoClient
|
|
from pymongo.write_concern import WriteConcern
|
|
|
|
|
|
class TestSelections(IntegrationTest):
|
|
@client_context.require_connection
|
|
def test_bool(self):
|
|
client = self.single_client()
|
|
|
|
wait_until(lambda: client.address, "discover primary")
|
|
selection = Selection.from_topology_description(client._topology.description)
|
|
|
|
self.assertTrue(selection)
|
|
self.assertFalse(selection.with_server_descriptions([]))
|
|
|
|
|
|
class TestReadPreferenceObjects(unittest.TestCase):
|
|
prefs = [
|
|
Primary(),
|
|
PrimaryPreferred(),
|
|
Secondary(),
|
|
Nearest(tag_sets=[{"a": 1}, {"b": 2}]),
|
|
SecondaryPreferred(max_staleness=30),
|
|
]
|
|
|
|
def test_pickle(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, pickle.loads(pickle.dumps(pref)))
|
|
|
|
def test_copy(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, copy.copy(pref))
|
|
|
|
def test_deepcopy(self):
|
|
for pref in self.prefs:
|
|
self.assertEqual(pref, copy.deepcopy(pref))
|
|
|
|
|
|
class TestReadPreferencesBase(IntegrationTest):
|
|
@classmethod
|
|
@client_context.require_secondaries_count(1)
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
# Insert some data so we can use cursors in read_from_which_host
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.get_database(
|
|
"pymongo_test", write_concern=WriteConcern(w=client_context.w)
|
|
).test.insert_many([{"_id": i} for i in range(10)])
|
|
|
|
self.addCleanup(self.client.pymongo_test.test.drop)
|
|
|
|
def read_from_which_host(self, client):
|
|
"""Do a find() on the client and return which host was used"""
|
|
cursor = client.pymongo_test.test.find()
|
|
next(cursor)
|
|
return cursor.address
|
|
|
|
def read_from_which_kind(self, client):
|
|
"""Do a find() on the client and return 'primary' or 'secondary'
|
|
depending on which the client used.
|
|
"""
|
|
address = self.read_from_which_host(client)
|
|
if address == client.primary:
|
|
return "primary"
|
|
elif address in client.secondaries:
|
|
return "secondary"
|
|
else:
|
|
self.fail(
|
|
f"Cursor used address {address}, expected either primary "
|
|
f"{client.primary} or secondaries {client.secondaries}"
|
|
)
|
|
return None
|
|
|
|
def assertReadsFrom(self, expected, **kwargs):
|
|
c = self.rs_client(**kwargs)
|
|
wait_until(lambda: len(c.nodes - c.arbiters) == client_context.w, "discovered all nodes")
|
|
|
|
used = self.read_from_which_kind(c)
|
|
self.assertEqual(expected, used, f"Cursor used {used}, expected {expected}")
|
|
|
|
|
|
class TestSingleSecondaryOk(TestReadPreferencesBase):
|
|
def test_reads_from_secondary(self):
|
|
host, port = next(iter(self.client.secondaries))
|
|
# Direct connection to a secondary.
|
|
client = self.single_client(host, port)
|
|
self.assertFalse(client.is_primary)
|
|
|
|
# Regardless of read preference, we should be able to do
|
|
# "reads" with a direct connection to a secondary.
|
|
# See server-selection.rst#topology-type-single.
|
|
self.assertEqual(client.read_preference, ReadPreference.PRIMARY)
|
|
|
|
db = client.pymongo_test
|
|
coll = db.test
|
|
|
|
# Test find and find_one.
|
|
self.assertIsNotNone(coll.find_one())
|
|
self.assertEqual(10, len(list(coll.find())))
|
|
|
|
# Test some database helpers.
|
|
self.assertIsNotNone(db.list_collection_names())
|
|
self.assertIsNotNone(db.validate_collection("test"))
|
|
self.assertIsNotNone(db.command("ping"))
|
|
|
|
# Test some collection helpers.
|
|
self.assertEqual(10, coll.count_documents({}))
|
|
self.assertEqual(10, len(coll.distinct("_id")))
|
|
self.assertIsNotNone(coll.aggregate([]))
|
|
self.assertIsNotNone(coll.index_information())
|
|
|
|
|
|
class TestReadPreferences(TestReadPreferencesBase):
|
|
def test_mode_validation(self):
|
|
for mode in (
|
|
ReadPreference.PRIMARY,
|
|
ReadPreference.PRIMARY_PREFERRED,
|
|
ReadPreference.SECONDARY,
|
|
ReadPreference.SECONDARY_PREFERRED,
|
|
ReadPreference.NEAREST,
|
|
):
|
|
self.assertEqual(mode, self.rs_client(read_preference=mode).read_preference)
|
|
|
|
self.assertRaises(TypeError, self.rs_client, read_preference="foo")
|
|
|
|
def test_tag_sets_validation(self):
|
|
S = Secondary(tag_sets=[{}])
|
|
self.assertEqual([{}], self.rs_client(read_preference=S).read_preference.tag_sets)
|
|
|
|
S = Secondary(tag_sets=[{"k": "v"}])
|
|
self.assertEqual([{"k": "v"}], self.rs_client(read_preference=S).read_preference.tag_sets)
|
|
|
|
S = Secondary(tag_sets=[{"k": "v"}, {}])
|
|
self.assertEqual(
|
|
[{"k": "v"}, {}], self.rs_client(read_preference=S).read_preference.tag_sets
|
|
)
|
|
|
|
self.assertRaises(ValueError, Secondary, tag_sets=[])
|
|
|
|
# One dict not ok, must be a list of dicts
|
|
self.assertRaises(TypeError, Secondary, tag_sets={"k": "v"})
|
|
|
|
self.assertRaises(TypeError, Secondary, tag_sets="foo")
|
|
|
|
self.assertRaises(TypeError, Secondary, tag_sets=["foo"])
|
|
|
|
def test_threshold_validation(self):
|
|
self.assertEqual(
|
|
17, self.rs_client(localThresholdMS=17, connect=False).options.local_threshold_ms
|
|
)
|
|
|
|
self.assertEqual(
|
|
42, self.rs_client(localThresholdMS=42, connect=False).options.local_threshold_ms
|
|
)
|
|
|
|
self.assertEqual(
|
|
666, self.rs_client(localThresholdMS=666, connect=False).options.local_threshold_ms
|
|
)
|
|
|
|
self.assertEqual(
|
|
0, self.rs_client(localThresholdMS=0, connect=False).options.local_threshold_ms
|
|
)
|
|
|
|
self.assertRaises(ValueError, self.rs_client, localthresholdms=-1)
|
|
|
|
def test_zero_latency(self):
|
|
ping_times: set = set()
|
|
# Generate unique ping times.
|
|
while len(ping_times) < len(self.client.nodes):
|
|
ping_times.add(random.random())
|
|
for ping_time, host in zip(ping_times, self.client.nodes):
|
|
ServerDescription._host_to_round_trip_time[host] = ping_time
|
|
try:
|
|
client = connected(self.rs_client(readPreference="nearest", localThresholdMS=0))
|
|
wait_until(lambda: client.nodes == self.client.nodes, "discovered all nodes")
|
|
host = self.read_from_which_host(client)
|
|
for _ in range(5):
|
|
self.assertEqual(host, self.read_from_which_host(client))
|
|
finally:
|
|
ServerDescription._host_to_round_trip_time.clear()
|
|
|
|
def test_primary(self):
|
|
self.assertReadsFrom("primary", read_preference=ReadPreference.PRIMARY)
|
|
|
|
def test_primary_with_tags(self):
|
|
# Tags not allowed with PRIMARY
|
|
self.assertRaises(ConfigurationError, self.rs_client, tag_sets=[{"dc": "ny"}])
|
|
|
|
def test_primary_preferred(self):
|
|
self.assertReadsFrom("primary", read_preference=ReadPreference.PRIMARY_PREFERRED)
|
|
|
|
def test_secondary(self):
|
|
self.assertReadsFrom("secondary", read_preference=ReadPreference.SECONDARY)
|
|
|
|
def test_secondary_preferred(self):
|
|
self.assertReadsFrom("secondary", read_preference=ReadPreference.SECONDARY_PREFERRED)
|
|
|
|
def test_nearest(self):
|
|
# With high localThresholdMS, expect to read from any
|
|
# member
|
|
c = self.rs_client(
|
|
read_preference=ReadPreference.NEAREST, localThresholdMS=10000
|
|
) # 10 seconds
|
|
|
|
data_members = {self.client.primary} | self.client.secondaries
|
|
|
|
# This is a probabilistic test; track which members we've read from so
|
|
# far, and keep reading until we've used all the members or give up.
|
|
# Chance of using only 2 of 3 members 10k times if there's no bug =
|
|
# 3 * (2/3)**10000, very low.
|
|
used: set = set()
|
|
i = 0
|
|
while data_members.difference(used) and i < 10000:
|
|
address = self.read_from_which_host(c)
|
|
used.add(address)
|
|
i += 1
|
|
|
|
not_used = data_members.difference(used)
|
|
latencies = ", ".join(
|
|
"%s: %sms" % (server.description.address, server.description.round_trip_time)
|
|
for server in c._get_topology().select_servers(readable_server_selector, _Op.TEST)
|
|
)
|
|
|
|
self.assertFalse(
|
|
not_used,
|
|
"Expected to use primary and all secondaries for mode NEAREST,"
|
|
f" but didn't use {not_used}\nlatencies: {latencies}",
|
|
)
|
|
|
|
|
|
class ReadPrefTester(MongoClient):
|
|
def __init__(self, *args, **kwargs):
|
|
self.has_read_from = set()
|
|
client_options = client_context.client_options
|
|
client_options.update(kwargs)
|
|
super().__init__(*args, **client_options)
|
|
|
|
@contextlib.contextmanager
|
|
def _conn_for_reads(self, read_preference, session, operation):
|
|
context = super()._conn_for_reads(read_preference, session, operation)
|
|
with context as (conn, read_preference):
|
|
self.record_a_read(conn.address)
|
|
yield conn, read_preference
|
|
|
|
@contextlib.contextmanager
|
|
def _conn_from_server(self, read_preference, server, session):
|
|
context = super()._conn_from_server(read_preference, server, session)
|
|
with context as (conn, read_preference):
|
|
self.record_a_read(conn.address)
|
|
yield conn, read_preference
|
|
|
|
def record_a_read(self, address):
|
|
server = self._get_topology().select_server_by_address(address, _Op.TEST, 0)
|
|
self.has_read_from.add(server)
|
|
|
|
|
|
_PREF_MAP = [
|
|
(Primary, SERVER_TYPE.RSPrimary),
|
|
(PrimaryPreferred, SERVER_TYPE.RSPrimary),
|
|
(Secondary, SERVER_TYPE.RSSecondary),
|
|
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
|
|
(Nearest, "any"),
|
|
]
|
|
|
|
|
|
class TestCommandAndReadPreference(IntegrationTest):
|
|
c: ReadPrefTester
|
|
client_version: Version
|
|
|
|
@classmethod
|
|
@client_context.require_secondaries_count(1)
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
cls.c = ReadPrefTester(
|
|
# Ignore round trip times, to test ReadPreference modes only.
|
|
localThresholdMS=1000 * 1000,
|
|
)
|
|
cls.client_version = Version.from_client(cls.c)
|
|
# mapReduce fails if the collection does not exist.
|
|
coll = cls.c.pymongo_test.get_collection(
|
|
"test", write_concern=WriteConcern(w=client_context.w)
|
|
)
|
|
coll.insert_one({})
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.c.drop_database("pymongo_test")
|
|
cls.c.close()
|
|
|
|
def executed_on_which_server(self, client, fn, *args, **kwargs):
|
|
"""Execute fn(*args, **kwargs) and return the Server instance used."""
|
|
client.has_read_from.clear()
|
|
fn(*args, **kwargs)
|
|
self.assertEqual(1, len(client.has_read_from))
|
|
return one(client.has_read_from)
|
|
|
|
def assertExecutedOn(self, server_type, client, fn, *args, **kwargs):
|
|
server = self.executed_on_which_server(client, fn, *args, **kwargs)
|
|
self.assertEqual(
|
|
SERVER_TYPE._fields[server_type], SERVER_TYPE._fields[server.description.server_type]
|
|
)
|
|
|
|
def _test_fn(self, server_type, fn):
|
|
for _ in range(10):
|
|
if server_type == "any":
|
|
used = set()
|
|
for _ in range(1000):
|
|
server = self.executed_on_which_server(self.c, fn)
|
|
used.add(server.description.address)
|
|
if len(used) == len(self.c.secondaries) + 1:
|
|
# Success
|
|
break
|
|
|
|
assert self.c.primary is not None
|
|
unused = self.c.secondaries.union({self.c.primary}).difference(used)
|
|
if unused:
|
|
self.fail("Some members not used for NEAREST: %s" % (unused))
|
|
else:
|
|
self.assertExecutedOn(server_type, self.c, fn)
|
|
|
|
def _test_primary_helper(self, func):
|
|
# Helpers that ignore read preference.
|
|
self._test_fn(SERVER_TYPE.RSPrimary, func)
|
|
|
|
def _test_coll_helper(self, secondary_ok, coll, meth, *args, **kwargs):
|
|
for mode, server_type in _PREF_MAP:
|
|
new_coll = coll.with_options(read_preference=mode())
|
|
|
|
def func():
|
|
return getattr(new_coll, meth)(*args, **kwargs)
|
|
|
|
if secondary_ok:
|
|
self._test_fn(server_type, func)
|
|
else:
|
|
self._test_fn(SERVER_TYPE.RSPrimary, func)
|
|
|
|
def test_command(self):
|
|
# Test that the generic command helper obeys the read preference
|
|
# passed to it.
|
|
for mode, server_type in _PREF_MAP:
|
|
|
|
def func():
|
|
return self.c.pymongo_test.command("dbStats", read_preference=mode())
|
|
|
|
self._test_fn(server_type, func)
|
|
|
|
def test_create_collection(self):
|
|
# create_collection runs listCollections on the primary to check if
|
|
# the collection already exists.
|
|
self._test_primary_helper(
|
|
lambda: self.c.pymongo_test.create_collection(
|
|
"some_collection%s" % random.randint(0, sys.maxsize)
|
|
)
|
|
)
|
|
|
|
def test_count_documents(self):
|
|
self._test_coll_helper(True, self.c.pymongo_test.test, "count_documents", {})
|
|
|
|
def test_estimated_document_count(self):
|
|
self._test_coll_helper(True, self.c.pymongo_test.test, "estimated_document_count")
|
|
|
|
def test_distinct(self):
|
|
self._test_coll_helper(True, self.c.pymongo_test.test, "distinct", "a")
|
|
|
|
def test_aggregate(self):
|
|
self._test_coll_helper(
|
|
True, self.c.pymongo_test.test, "aggregate", [{"$project": {"_id": 1}}]
|
|
)
|
|
|
|
def test_aggregate_write(self):
|
|
# 5.0 servers support $out on secondaries.
|
|
secondary_ok = client_context.version.at_least(5, 0)
|
|
self._test_coll_helper(
|
|
secondary_ok,
|
|
self.c.pymongo_test.test,
|
|
"aggregate",
|
|
[{"$project": {"_id": 1}}, {"$out": "agg_write_test"}],
|
|
)
|
|
|
|
|
|
class TestMovingAverage(unittest.TestCase):
|
|
def test_moving_average(self):
|
|
avg = MovingAverage()
|
|
self.assertIsNone(avg.get())
|
|
avg.add_sample(10)
|
|
self.assertAlmostEqual(10, avg.get()) # type: ignore
|
|
avg.add_sample(20)
|
|
self.assertAlmostEqual(12, avg.get()) # type: ignore
|
|
avg.add_sample(30)
|
|
self.assertAlmostEqual(15.6, avg.get()) # type: ignore
|
|
|
|
|
|
class TestMongosAndReadPreference(IntegrationTest):
|
|
def test_read_preference_document(self):
|
|
pref = Primary()
|
|
self.assertEqual(pref.document, {"mode": "primary"})
|
|
|
|
pref = PrimaryPreferred()
|
|
self.assertEqual(pref.document, {"mode": "primaryPreferred"})
|
|
pref = PrimaryPreferred(tag_sets=[{"dc": "sf"}])
|
|
self.assertEqual(pref.document, {"mode": "primaryPreferred", "tags": [{"dc": "sf"}]})
|
|
pref = PrimaryPreferred(tag_sets=[{"dc": "sf"}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{"mode": "primaryPreferred", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30},
|
|
)
|
|
|
|
pref = Secondary()
|
|
self.assertEqual(pref.document, {"mode": "secondary"})
|
|
pref = Secondary(tag_sets=[{"dc": "sf"}])
|
|
self.assertEqual(pref.document, {"mode": "secondary", "tags": [{"dc": "sf"}]})
|
|
pref = Secondary(tag_sets=[{"dc": "sf"}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document, {"mode": "secondary", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30}
|
|
)
|
|
|
|
pref = SecondaryPreferred()
|
|
self.assertEqual(pref.document, {"mode": "secondaryPreferred"})
|
|
pref = SecondaryPreferred(tag_sets=[{"dc": "sf"}])
|
|
self.assertEqual(pref.document, {"mode": "secondaryPreferred", "tags": [{"dc": "sf"}]})
|
|
pref = SecondaryPreferred(tag_sets=[{"dc": "sf"}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document,
|
|
{"mode": "secondaryPreferred", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30},
|
|
)
|
|
|
|
pref = Nearest()
|
|
self.assertEqual(pref.document, {"mode": "nearest"})
|
|
pref = Nearest(tag_sets=[{"dc": "sf"}])
|
|
self.assertEqual(pref.document, {"mode": "nearest", "tags": [{"dc": "sf"}]})
|
|
pref = Nearest(tag_sets=[{"dc": "sf"}], max_staleness=30)
|
|
self.assertEqual(
|
|
pref.document, {"mode": "nearest", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30}
|
|
)
|
|
|
|
with self.assertRaises(TypeError):
|
|
# Float is prohibited.
|
|
Nearest(max_staleness=1.5) # type: ignore
|
|
|
|
with self.assertRaises(ValueError):
|
|
Nearest(max_staleness=0)
|
|
|
|
with self.assertRaises(ValueError):
|
|
Nearest(max_staleness=-2)
|
|
|
|
def test_read_preference_document_hedge(self):
|
|
cases = {
|
|
"primaryPreferred": PrimaryPreferred,
|
|
"secondary": Secondary,
|
|
"secondaryPreferred": SecondaryPreferred,
|
|
"nearest": Nearest,
|
|
}
|
|
for mode, cls in cases.items():
|
|
with self.assertRaises(TypeError):
|
|
cls(hedge=[]) # type: ignore
|
|
|
|
pref = cls(hedge={})
|
|
self.assertEqual(pref.document, {"mode": mode})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
if cls == SecondaryPreferred:
|
|
# SecondaryPreferred without hedge doesn't add $readPreference.
|
|
self.assertEqual(out, {})
|
|
else:
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge: dict[str, Any] = {"enabled": True}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge = {"enabled": False}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
hedge = {"enabled": False, "extra": "option"}
|
|
pref = cls(hedge=hedge)
|
|
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
def test_send_hedge(self):
|
|
cases = {
|
|
"primaryPreferred": PrimaryPreferred,
|
|
"secondaryPreferred": SecondaryPreferred,
|
|
"nearest": Nearest,
|
|
}
|
|
if client_context.supports_secondary_read_pref:
|
|
cases["secondary"] = Secondary
|
|
listener = OvertCommandListener()
|
|
client = self.rs_client(event_listeners=[listener])
|
|
self.addCleanup(client.close)
|
|
client.admin.command("ping")
|
|
for _mode, cls in cases.items():
|
|
pref = cls(hedge={"enabled": True})
|
|
coll = client.test.get_collection("test", read_preference=pref)
|
|
listener.reset()
|
|
coll.find_one()
|
|
started = listener.started_events
|
|
self.assertEqual(len(started), 1, started)
|
|
cmd = started[0].command
|
|
if client_context.is_rs or client_context.is_mongos:
|
|
self.assertIn("$readPreference", cmd)
|
|
self.assertEqual(cmd["$readPreference"], pref.document)
|
|
else:
|
|
self.assertNotIn("$readPreference", cmd)
|
|
|
|
def test_maybe_add_read_preference(self):
|
|
# Primary doesn't add $readPreference
|
|
out = _maybe_add_read_preference({}, Primary())
|
|
self.assertEqual(out, {})
|
|
|
|
pref = PrimaryPreferred()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = PrimaryPreferred(tag_sets=[{"dc": "nyc"}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
pref = Secondary()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = Secondary(tag_sets=[{"dc": "nyc"}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
# SecondaryPreferred without tag_sets or max_staleness doesn't add
|
|
# $readPreference
|
|
pref = SecondaryPreferred()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, {})
|
|
pref = SecondaryPreferred(tag_sets=[{"dc": "nyc"}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = SecondaryPreferred(max_staleness=120)
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
pref = Nearest()
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
pref = Nearest(tag_sets=[{"dc": "nyc"}])
|
|
out = _maybe_add_read_preference({}, pref)
|
|
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
|
|
|
|
criteria = SON([("$query", {}), ("$orderby", SON([("_id", 1)]))])
|
|
pref = Nearest()
|
|
out = _maybe_add_read_preference(criteria, pref)
|
|
self.assertEqual(
|
|
out,
|
|
SON(
|
|
[
|
|
("$query", {}),
|
|
("$orderby", SON([("_id", 1)])),
|
|
("$readPreference", pref.document),
|
|
]
|
|
),
|
|
)
|
|
pref = Nearest(tag_sets=[{"dc": "nyc"}])
|
|
out = _maybe_add_read_preference(criteria, pref)
|
|
self.assertEqual(
|
|
out,
|
|
SON(
|
|
[
|
|
("$query", {}),
|
|
("$orderby", SON([("_id", 1)])),
|
|
("$readPreference", pref.document),
|
|
]
|
|
),
|
|
)
|
|
|
|
@client_context.require_mongos
|
|
def test_mongos(self):
|
|
res = client_context.client.config.shards.find_one()
|
|
assert res is not None
|
|
shard = res["host"]
|
|
num_members = shard.count(",") + 1
|
|
if num_members == 1:
|
|
raise SkipTest("Need a replica set shard to test.")
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test", write_concern=WriteConcern(w=num_members)
|
|
)
|
|
coll.drop()
|
|
res = coll.insert_many([{} for _ in range(5)])
|
|
first_id = res.inserted_ids[0]
|
|
last_id = res.inserted_ids[-1]
|
|
|
|
# Note - this isn't a perfect test since there's no way to
|
|
# tell what shard member a query ran on.
|
|
for pref in (Primary(), PrimaryPreferred(), Secondary(), SecondaryPreferred(), Nearest()):
|
|
qcoll = coll.with_options(read_preference=pref)
|
|
results = list(qcoll.find().sort([("_id", 1)]))
|
|
self.assertEqual(first_id, results[0]["_id"])
|
|
self.assertEqual(last_id, results[-1]["_id"])
|
|
results = list(qcoll.find().sort([("_id", -1)]))
|
|
self.assertEqual(first_id, results[-1]["_id"])
|
|
self.assertEqual(last_id, results[0]["_id"])
|
|
|
|
@client_context.require_mongos
|
|
def test_mongos_max_staleness(self):
|
|
# Sanity check that we're sending maxStalenessSeconds
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test", read_preference=SecondaryPreferred(max_staleness=120)
|
|
)
|
|
# No error
|
|
coll.find_one()
|
|
|
|
coll = client_context.client.pymongo_test.get_collection(
|
|
"test", read_preference=SecondaryPreferred(max_staleness=10)
|
|
)
|
|
try:
|
|
coll.find_one()
|
|
except OperationFailure as exc:
|
|
self.assertEqual(160, exc.code)
|
|
else:
|
|
self.fail("mongos accepted invalid staleness")
|
|
|
|
coll = self.single_client(
|
|
readPreference="secondaryPreferred", maxStalenessSeconds=120
|
|
).pymongo_test.test
|
|
# No error
|
|
coll.find_one()
|
|
|
|
coll = self.single_client(
|
|
readPreference="secondaryPreferred", maxStalenessSeconds=10
|
|
).pymongo_test.test
|
|
try:
|
|
coll.find_one()
|
|
except OperationFailure as exc:
|
|
self.assertEqual(160, exc.code)
|
|
else:
|
|
self.fail("mongos accepted invalid staleness")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|