mongo-python-driver/test/test_read_preferences.py
Noah Stapp ebb94b669e
PYTHON-3457 Easier debugging with standardized logging (#1515)
PYTHON-3458 Add command logging.
PYTHON-3459 Add server selection logging.
PYTHON-3473 Add connection pool logging.
PYTHON-4167 Add documentation and examples.

Co-authored-by: sleepyStick <itsirisho@gmail.com>
2024-02-15 11:35:08 -08:00

690 lines
26 KiB
Python

# Copyright 2011-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the replica_set_connection module."""
from __future__ import annotations
import contextlib
import copy
import pickle
import random
import sys
from typing import Any
from pymongo.operations import _Op
sys.path[0:0] = [""]
from test import IntegrationTest, SkipTest, client_context, unittest
from test.utils import (
OvertCommandListener,
connected,
one,
rs_client,
single_client,
wait_until,
)
from test.version import Version
from bson.son import SON
from pymongo.errors import ConfigurationError, OperationFailure
from pymongo.message import _maybe_add_read_preference
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import (
MovingAverage,
Nearest,
Primary,
PrimaryPreferred,
ReadPreference,
Secondary,
SecondaryPreferred,
)
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import Selection, readable_server_selector
from pymongo.server_type import SERVER_TYPE
from pymongo.write_concern import WriteConcern
class TestSelections(IntegrationTest):
@client_context.require_connection
def test_bool(self):
client = single_client()
wait_until(lambda: client.address, "discover primary")
selection = Selection.from_topology_description(client._topology.description)
self.assertTrue(selection)
self.assertFalse(selection.with_server_descriptions([]))
class TestReadPreferenceObjects(unittest.TestCase):
prefs = [
Primary(),
PrimaryPreferred(),
Secondary(),
Nearest(tag_sets=[{"a": 1}, {"b": 2}]),
SecondaryPreferred(max_staleness=30),
]
def test_pickle(self):
for pref in self.prefs:
self.assertEqual(pref, pickle.loads(pickle.dumps(pref)))
def test_copy(self):
for pref in self.prefs:
self.assertEqual(pref, copy.copy(pref))
def test_deepcopy(self):
for pref in self.prefs:
self.assertEqual(pref, copy.deepcopy(pref))
class TestReadPreferencesBase(IntegrationTest):
@classmethod
@client_context.require_secondaries_count(1)
def setUpClass(cls):
super().setUpClass()
def setUp(self):
super().setUp()
# Insert some data so we can use cursors in read_from_which_host
self.client.pymongo_test.test.drop()
self.client.get_database(
"pymongo_test", write_concern=WriteConcern(w=client_context.w)
).test.insert_many([{"_id": i} for i in range(10)])
self.addCleanup(self.client.pymongo_test.test.drop)
def read_from_which_host(self, client):
"""Do a find() on the client and return which host was used"""
cursor = client.pymongo_test.test.find()
next(cursor)
return cursor.address
def read_from_which_kind(self, client):
"""Do a find() on the client and return 'primary' or 'secondary'
depending on which the client used.
"""
address = self.read_from_which_host(client)
if address == client.primary:
return "primary"
elif address in client.secondaries:
return "secondary"
else:
self.fail(
f"Cursor used address {address}, expected either primary "
f"{client.primary} or secondaries {client.secondaries}"
)
return None
def assertReadsFrom(self, expected, **kwargs):
c = rs_client(**kwargs)
wait_until(lambda: len(c.nodes - c.arbiters) == client_context.w, "discovered all nodes")
used = self.read_from_which_kind(c)
self.assertEqual(expected, used, f"Cursor used {used}, expected {expected}")
class TestSingleSecondaryOk(TestReadPreferencesBase):
def test_reads_from_secondary(self):
host, port = next(iter(self.client.secondaries))
# Direct connection to a secondary.
client = single_client(host, port)
self.assertFalse(client.is_primary)
# Regardless of read preference, we should be able to do
# "reads" with a direct connection to a secondary.
# See server-selection.rst#topology-type-single.
self.assertEqual(client.read_preference, ReadPreference.PRIMARY)
db = client.pymongo_test
coll = db.test
# Test find and find_one.
self.assertIsNotNone(coll.find_one())
self.assertEqual(10, len(list(coll.find())))
# Test some database helpers.
self.assertIsNotNone(db.list_collection_names())
self.assertIsNotNone(db.validate_collection("test"))
self.assertIsNotNone(db.command("ping"))
# Test some collection helpers.
self.assertEqual(10, coll.count_documents({}))
self.assertEqual(10, len(coll.distinct("_id")))
self.assertIsNotNone(coll.aggregate([]))
self.assertIsNotNone(coll.index_information())
class TestReadPreferences(TestReadPreferencesBase):
def test_mode_validation(self):
for mode in (
ReadPreference.PRIMARY,
ReadPreference.PRIMARY_PREFERRED,
ReadPreference.SECONDARY,
ReadPreference.SECONDARY_PREFERRED,
ReadPreference.NEAREST,
):
self.assertEqual(mode, rs_client(read_preference=mode).read_preference)
self.assertRaises(TypeError, rs_client, read_preference="foo")
def test_tag_sets_validation(self):
S = Secondary(tag_sets=[{}])
self.assertEqual([{}], rs_client(read_preference=S).read_preference.tag_sets)
S = Secondary(tag_sets=[{"k": "v"}])
self.assertEqual([{"k": "v"}], rs_client(read_preference=S).read_preference.tag_sets)
S = Secondary(tag_sets=[{"k": "v"}, {}])
self.assertEqual([{"k": "v"}, {}], rs_client(read_preference=S).read_preference.tag_sets)
self.assertRaises(ValueError, Secondary, tag_sets=[])
# One dict not ok, must be a list of dicts
self.assertRaises(TypeError, Secondary, tag_sets={"k": "v"})
self.assertRaises(TypeError, Secondary, tag_sets="foo")
self.assertRaises(TypeError, Secondary, tag_sets=["foo"])
def test_threshold_validation(self):
self.assertEqual(
17, rs_client(localThresholdMS=17, connect=False).options.local_threshold_ms
)
self.assertEqual(
42, rs_client(localThresholdMS=42, connect=False).options.local_threshold_ms
)
self.assertEqual(
666, rs_client(localThresholdMS=666, connect=False).options.local_threshold_ms
)
self.assertEqual(0, rs_client(localThresholdMS=0, connect=False).options.local_threshold_ms)
self.assertRaises(ValueError, rs_client, localthresholdms=-1)
def test_zero_latency(self):
ping_times: set = set()
# Generate unique ping times.
while len(ping_times) < len(self.client.nodes):
ping_times.add(random.random())
for ping_time, host in zip(ping_times, self.client.nodes):
ServerDescription._host_to_round_trip_time[host] = ping_time
try:
client = connected(rs_client(readPreference="nearest", localThresholdMS=0))
wait_until(lambda: client.nodes == self.client.nodes, "discovered all nodes")
host = self.read_from_which_host(client)
for _ in range(5):
self.assertEqual(host, self.read_from_which_host(client))
finally:
ServerDescription._host_to_round_trip_time.clear()
def test_primary(self):
self.assertReadsFrom("primary", read_preference=ReadPreference.PRIMARY)
def test_primary_with_tags(self):
# Tags not allowed with PRIMARY
self.assertRaises(ConfigurationError, rs_client, tag_sets=[{"dc": "ny"}])
def test_primary_preferred(self):
self.assertReadsFrom("primary", read_preference=ReadPreference.PRIMARY_PREFERRED)
def test_secondary(self):
self.assertReadsFrom("secondary", read_preference=ReadPreference.SECONDARY)
def test_secondary_preferred(self):
self.assertReadsFrom("secondary", read_preference=ReadPreference.SECONDARY_PREFERRED)
def test_nearest(self):
# With high localThresholdMS, expect to read from any
# member
c = rs_client(read_preference=ReadPreference.NEAREST, localThresholdMS=10000) # 10 seconds
data_members = {self.client.primary} | self.client.secondaries
# This is a probabilistic test; track which members we've read from so
# far, and keep reading until we've used all the members or give up.
# Chance of using only 2 of 3 members 10k times if there's no bug =
# 3 * (2/3)**10000, very low.
used: set = set()
i = 0
while data_members.difference(used) and i < 10000:
address = self.read_from_which_host(c)
used.add(address)
i += 1
not_used = data_members.difference(used)
latencies = ", ".join(
"%s: %sms" % (server.description.address, server.description.round_trip_time)
for server in c._get_topology().select_servers(readable_server_selector, _Op.TEST)
)
self.assertFalse(
not_used,
"Expected to use primary and all secondaries for mode NEAREST,"
f" but didn't use {not_used}\nlatencies: {latencies}",
)
class ReadPrefTester(MongoClient):
def __init__(self, *args, **kwargs):
self.has_read_from = set()
client_options = client_context.client_options
client_options.update(kwargs)
super().__init__(*args, **client_options)
@contextlib.contextmanager
def _conn_for_reads(self, read_preference, session, operation):
context = super()._conn_for_reads(read_preference, session, operation)
with context as (conn, read_preference):
self.record_a_read(conn.address)
yield conn, read_preference
@contextlib.contextmanager
def _conn_from_server(self, read_preference, server, session):
context = super()._conn_from_server(read_preference, server, session)
with context as (conn, read_preference):
self.record_a_read(conn.address)
yield conn, read_preference
def record_a_read(self, address):
server = self._get_topology().select_server_by_address(address, _Op.TEST, 0)
self.has_read_from.add(server)
_PREF_MAP = [
(Primary, SERVER_TYPE.RSPrimary),
(PrimaryPreferred, SERVER_TYPE.RSPrimary),
(Secondary, SERVER_TYPE.RSSecondary),
(SecondaryPreferred, SERVER_TYPE.RSSecondary),
(Nearest, "any"),
]
class TestCommandAndReadPreference(IntegrationTest):
c: ReadPrefTester
client_version: Version
@classmethod
@client_context.require_secondaries_count(1)
def setUpClass(cls):
super().setUpClass()
cls.c = ReadPrefTester(
# Ignore round trip times, to test ReadPreference modes only.
localThresholdMS=1000 * 1000,
)
cls.client_version = Version.from_client(cls.c)
# mapReduce fails if the collection does not exist.
coll = cls.c.pymongo_test.get_collection(
"test", write_concern=WriteConcern(w=client_context.w)
)
coll.insert_one({})
@classmethod
def tearDownClass(cls):
cls.c.drop_database("pymongo_test")
cls.c.close()
def executed_on_which_server(self, client, fn, *args, **kwargs):
"""Execute fn(*args, **kwargs) and return the Server instance used."""
client.has_read_from.clear()
fn(*args, **kwargs)
self.assertEqual(1, len(client.has_read_from))
return one(client.has_read_from)
def assertExecutedOn(self, server_type, client, fn, *args, **kwargs):
server = self.executed_on_which_server(client, fn, *args, **kwargs)
self.assertEqual(
SERVER_TYPE._fields[server_type], SERVER_TYPE._fields[server.description.server_type]
)
def _test_fn(self, server_type, fn):
for _ in range(10):
if server_type == "any":
used = set()
for _ in range(1000):
server = self.executed_on_which_server(self.c, fn)
used.add(server.description.address)
if len(used) == len(self.c.secondaries) + 1:
# Success
break
assert self.c.primary is not None
unused = self.c.secondaries.union({self.c.primary}).difference(used)
if unused:
self.fail("Some members not used for NEAREST: %s" % (unused))
else:
self.assertExecutedOn(server_type, self.c, fn)
def _test_primary_helper(self, func):
# Helpers that ignore read preference.
self._test_fn(SERVER_TYPE.RSPrimary, func)
def _test_coll_helper(self, secondary_ok, coll, meth, *args, **kwargs):
for mode, server_type in _PREF_MAP:
new_coll = coll.with_options(read_preference=mode())
def func():
return getattr(new_coll, meth)(*args, **kwargs)
if secondary_ok:
self._test_fn(server_type, func)
else:
self._test_fn(SERVER_TYPE.RSPrimary, func)
def test_command(self):
# Test that the generic command helper obeys the read preference
# passed to it.
for mode, server_type in _PREF_MAP:
def func():
return self.c.pymongo_test.command("dbStats", read_preference=mode())
self._test_fn(server_type, func)
def test_create_collection(self):
# create_collection runs listCollections on the primary to check if
# the collection already exists.
self._test_primary_helper(
lambda: self.c.pymongo_test.create_collection(
"some_collection%s" % random.randint(0, sys.maxsize)
)
)
def test_count_documents(self):
self._test_coll_helper(True, self.c.pymongo_test.test, "count_documents", {})
def test_estimated_document_count(self):
self._test_coll_helper(True, self.c.pymongo_test.test, "estimated_document_count")
def test_distinct(self):
self._test_coll_helper(True, self.c.pymongo_test.test, "distinct", "a")
def test_aggregate(self):
self._test_coll_helper(
True, self.c.pymongo_test.test, "aggregate", [{"$project": {"_id": 1}}]
)
def test_aggregate_write(self):
# 5.0 servers support $out on secondaries.
secondary_ok = client_context.version.at_least(5, 0)
self._test_coll_helper(
secondary_ok,
self.c.pymongo_test.test,
"aggregate",
[{"$project": {"_id": 1}}, {"$out": "agg_write_test"}],
)
class TestMovingAverage(unittest.TestCase):
def test_moving_average(self):
avg = MovingAverage()
self.assertIsNone(avg.get())
avg.add_sample(10)
self.assertAlmostEqual(10, avg.get()) # type: ignore
avg.add_sample(20)
self.assertAlmostEqual(12, avg.get()) # type: ignore
avg.add_sample(30)
self.assertAlmostEqual(15.6, avg.get()) # type: ignore
class TestMongosAndReadPreference(IntegrationTest):
def test_read_preference_document(self):
pref = Primary()
self.assertEqual(pref.document, {"mode": "primary"})
pref = PrimaryPreferred()
self.assertEqual(pref.document, {"mode": "primaryPreferred"})
pref = PrimaryPreferred(tag_sets=[{"dc": "sf"}])
self.assertEqual(pref.document, {"mode": "primaryPreferred", "tags": [{"dc": "sf"}]})
pref = PrimaryPreferred(tag_sets=[{"dc": "sf"}], max_staleness=30)
self.assertEqual(
pref.document,
{"mode": "primaryPreferred", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30},
)
pref = Secondary()
self.assertEqual(pref.document, {"mode": "secondary"})
pref = Secondary(tag_sets=[{"dc": "sf"}])
self.assertEqual(pref.document, {"mode": "secondary", "tags": [{"dc": "sf"}]})
pref = Secondary(tag_sets=[{"dc": "sf"}], max_staleness=30)
self.assertEqual(
pref.document, {"mode": "secondary", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30}
)
pref = SecondaryPreferred()
self.assertEqual(pref.document, {"mode": "secondaryPreferred"})
pref = SecondaryPreferred(tag_sets=[{"dc": "sf"}])
self.assertEqual(pref.document, {"mode": "secondaryPreferred", "tags": [{"dc": "sf"}]})
pref = SecondaryPreferred(tag_sets=[{"dc": "sf"}], max_staleness=30)
self.assertEqual(
pref.document,
{"mode": "secondaryPreferred", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30},
)
pref = Nearest()
self.assertEqual(pref.document, {"mode": "nearest"})
pref = Nearest(tag_sets=[{"dc": "sf"}])
self.assertEqual(pref.document, {"mode": "nearest", "tags": [{"dc": "sf"}]})
pref = Nearest(tag_sets=[{"dc": "sf"}], max_staleness=30)
self.assertEqual(
pref.document, {"mode": "nearest", "tags": [{"dc": "sf"}], "maxStalenessSeconds": 30}
)
with self.assertRaises(TypeError):
# Float is prohibited.
Nearest(max_staleness=1.5) # type: ignore
with self.assertRaises(ValueError):
Nearest(max_staleness=0)
with self.assertRaises(ValueError):
Nearest(max_staleness=-2)
def test_read_preference_document_hedge(self):
cases = {
"primaryPreferred": PrimaryPreferred,
"secondary": Secondary,
"secondaryPreferred": SecondaryPreferred,
"nearest": Nearest,
}
for mode, cls in cases.items():
with self.assertRaises(TypeError):
cls(hedge=[]) # type: ignore
pref = cls(hedge={})
self.assertEqual(pref.document, {"mode": mode})
out = _maybe_add_read_preference({}, pref)
if cls == SecondaryPreferred:
# SecondaryPreferred without hedge doesn't add $readPreference.
self.assertEqual(out, {})
else:
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
hedge: dict[str, Any] = {"enabled": True}
pref = cls(hedge=hedge)
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
hedge = {"enabled": False}
pref = cls(hedge=hedge)
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
hedge = {"enabled": False, "extra": "option"}
pref = cls(hedge=hedge)
self.assertEqual(pref.document, {"mode": mode, "hedge": hedge})
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
def test_send_hedge(self):
cases = {
"primaryPreferred": PrimaryPreferred,
"secondaryPreferred": SecondaryPreferred,
"nearest": Nearest,
}
if client_context.supports_secondary_read_pref:
cases["secondary"] = Secondary
listener = OvertCommandListener()
client = rs_client(event_listeners=[listener])
self.addCleanup(client.close)
client.admin.command("ping")
for _mode, cls in cases.items():
pref = cls(hedge={"enabled": True})
coll = client.test.get_collection("test", read_preference=pref)
listener.reset()
coll.find_one()
started = listener.started_events
self.assertEqual(len(started), 1, started)
cmd = started[0].command
if client_context.is_rs or client_context.is_mongos:
self.assertIn("$readPreference", cmd)
self.assertEqual(cmd["$readPreference"], pref.document)
else:
self.assertNotIn("$readPreference", cmd)
def test_maybe_add_read_preference(self):
# Primary doesn't add $readPreference
out = _maybe_add_read_preference({}, Primary())
self.assertEqual(out, {})
pref = PrimaryPreferred()
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = PrimaryPreferred(tag_sets=[{"dc": "nyc"}])
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = Secondary()
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = Secondary(tag_sets=[{"dc": "nyc"}])
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
# SecondaryPreferred without tag_sets or max_staleness doesn't add
# $readPreference
pref = SecondaryPreferred()
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, {})
pref = SecondaryPreferred(tag_sets=[{"dc": "nyc"}])
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = SecondaryPreferred(max_staleness=120)
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = Nearest()
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = Nearest(tag_sets=[{"dc": "nyc"}])
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, SON([("$query", {}), ("$readPreference", pref.document)]))
criteria = SON([("$query", {}), ("$orderby", SON([("_id", 1)]))])
pref = Nearest()
out = _maybe_add_read_preference(criteria, pref)
self.assertEqual(
out,
SON(
[
("$query", {}),
("$orderby", SON([("_id", 1)])),
("$readPreference", pref.document),
]
),
)
pref = Nearest(tag_sets=[{"dc": "nyc"}])
out = _maybe_add_read_preference(criteria, pref)
self.assertEqual(
out,
SON(
[
("$query", {}),
("$orderby", SON([("_id", 1)])),
("$readPreference", pref.document),
]
),
)
@client_context.require_mongos
def test_mongos(self):
res = client_context.client.config.shards.find_one()
assert res is not None
shard = res["host"]
num_members = shard.count(",") + 1
if num_members == 1:
raise SkipTest("Need a replica set shard to test.")
coll = client_context.client.pymongo_test.get_collection(
"test", write_concern=WriteConcern(w=num_members)
)
coll.drop()
res = coll.insert_many([{} for _ in range(5)])
first_id = res.inserted_ids[0]
last_id = res.inserted_ids[-1]
# Note - this isn't a perfect test since there's no way to
# tell what shard member a query ran on.
for pref in (Primary(), PrimaryPreferred(), Secondary(), SecondaryPreferred(), Nearest()):
qcoll = coll.with_options(read_preference=pref)
results = list(qcoll.find().sort([("_id", 1)]))
self.assertEqual(first_id, results[0]["_id"])
self.assertEqual(last_id, results[-1]["_id"])
results = list(qcoll.find().sort([("_id", -1)]))
self.assertEqual(first_id, results[-1]["_id"])
self.assertEqual(last_id, results[0]["_id"])
@client_context.require_mongos
def test_mongos_max_staleness(self):
# Sanity check that we're sending maxStalenessSeconds
coll = client_context.client.pymongo_test.get_collection(
"test", read_preference=SecondaryPreferred(max_staleness=120)
)
# No error
coll.find_one()
coll = client_context.client.pymongo_test.get_collection(
"test", read_preference=SecondaryPreferred(max_staleness=10)
)
try:
coll.find_one()
except OperationFailure as exc:
self.assertEqual(160, exc.code)
else:
self.fail("mongos accepted invalid staleness")
coll = single_client(
readPreference="secondaryPreferred", maxStalenessSeconds=120
).pymongo_test.test
# No error
coll.find_one()
coll = single_client(
readPreference="secondaryPreferred", maxStalenessSeconds=10
).pymongo_test.test
try:
coll.find_one()
except OperationFailure as exc:
self.assertEqual(160, exc.code)
else:
self.fail("mongos accepted invalid staleness")
if __name__ == "__main__":
unittest.main()