908 lines
32 KiB
Python
908 lines
32 KiB
Python
# Copyright 2014-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test the topology module."""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
|
|
from pymongo.operations import _Op
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from test import client_knobs, unittest
|
|
from test.pymongo_mocks import DummyMonitor
|
|
from test.utils import MockPool, flaky
|
|
from test.utils_shared import wait_until
|
|
|
|
from bson.objectid import ObjectId
|
|
from pymongo import common
|
|
from pymongo.errors import AutoReconnect, ConfigurationError, ConnectionFailure
|
|
from pymongo.hello import Hello, HelloCompat
|
|
from pymongo.read_preferences import Primary, ReadPreference, Secondary
|
|
from pymongo.server_description import ServerDescription
|
|
from pymongo.server_selectors import any_server_selector, writable_server_selector
|
|
from pymongo.server_type import SERVER_TYPE
|
|
from pymongo.synchronous.monitor import Monitor
|
|
from pymongo.synchronous.pool import PoolOptions
|
|
from pymongo.synchronous.server import Server
|
|
from pymongo.synchronous.settings import TopologySettings
|
|
from pymongo.synchronous.topology import Topology, _ErrorContext
|
|
from pymongo.topology_description import TOPOLOGY_TYPE
|
|
|
|
|
|
class SetNameDiscoverySettings(TopologySettings):
|
|
def get_topology_type(self):
|
|
return TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
|
|
|
|
|
address = ("a", 27017)
|
|
|
|
|
|
def create_mock_topology(
|
|
seeds=None,
|
|
replica_set_name=None,
|
|
monitor_class=DummyMonitor,
|
|
direct_connection=False,
|
|
):
|
|
partitioned_seeds = list(map(common.partition_node, seeds or ["a"]))
|
|
topology_settings = TopologySettings(
|
|
partitioned_seeds,
|
|
replica_set_name=replica_set_name,
|
|
pool_class=MockPool, # type: ignore[arg-type]
|
|
monitor_class=monitor_class,
|
|
direct_connection=direct_connection,
|
|
)
|
|
|
|
t = Topology(topology_settings)
|
|
t.open()
|
|
return t
|
|
|
|
|
|
def got_hello(topology, server_address, hello_response):
|
|
server_description = ServerDescription(server_address, Hello(hello_response), 0)
|
|
|
|
topology.on_change(server_description)
|
|
|
|
|
|
def disconnected(topology, server_address):
|
|
# Create new description of server type Unknown.
|
|
topology.on_change(ServerDescription(server_address))
|
|
|
|
|
|
def get_server(topology, hostname):
|
|
return topology.get_server_by_address((hostname, 27017))
|
|
|
|
|
|
def get_type(topology, hostname):
|
|
return get_server(topology, hostname).description.server_type
|
|
|
|
|
|
def get_monitor(topology, hostname):
|
|
return get_server(topology, hostname)._monitor
|
|
|
|
|
|
class TopologyTest(unittest.TestCase):
|
|
"""Disables periodic monitoring, to make tests deterministic."""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.client_knobs = client_knobs(heartbeat_frequency=999999)
|
|
self.client_knobs.enable()
|
|
self.addCleanup(self.client_knobs.disable)
|
|
|
|
|
|
class TestTopologyConfiguration(TopologyTest):
|
|
def test_timeout_configuration(self):
|
|
pool_options = PoolOptions(connect_timeout=1, socket_timeout=2)
|
|
topology_settings = TopologySettings(pool_options=pool_options)
|
|
t = Topology(topology_settings=topology_settings)
|
|
t.open()
|
|
|
|
# Get the default server.
|
|
server = t.get_server_by_address(("localhost", 27017))
|
|
|
|
# The pool for application operations obeys our settings.
|
|
self.assertEqual(1, server._pool.opts.connect_timeout)
|
|
self.assertEqual(2, server._pool.opts.socket_timeout)
|
|
|
|
# The pool for monitoring operations uses our connect_timeout as both
|
|
# its connect_timeout and its socket_timeout.
|
|
monitor = server._monitor
|
|
self.assertEqual(1, monitor._pool.opts.connect_timeout)
|
|
self.assertEqual(1, monitor._pool.opts.socket_timeout)
|
|
|
|
# The monitor, not its pool, is responsible for calling hello.
|
|
self.assertTrue(monitor._pool.is_sdam)
|
|
|
|
def test_selector_fast_path(self):
|
|
topology = create_mock_topology(seeds=["a", "b:27018"], replica_set_name="foo")
|
|
description = topology.description
|
|
description._topology_type = TOPOLOGY_TYPE.ReplicaSetWithPrimary
|
|
|
|
# There is no primary yet, so it should give an empty list.
|
|
self.assertEqual(description.apply_selector(Primary()), [])
|
|
|
|
# If we set a primary server, we should get it back.
|
|
sd = list(description._server_descriptions.values())[0]
|
|
sd._server_type = SERVER_TYPE.RSPrimary
|
|
self.assertEqual(description.apply_selector(Primary()), [sd])
|
|
|
|
# If there is a custom selector, it should be applied.
|
|
def custom_selector(servers):
|
|
return []
|
|
|
|
self.assertEqual(description.apply_selector(Primary(), custom_selector=custom_selector), [])
|
|
|
|
|
|
class TestSingleServerTopology(TopologyTest):
|
|
def test_direct_connection(self):
|
|
for server_type, hello_response in [
|
|
(
|
|
SERVER_TYPE.RSPrimary,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"hosts": ["a"],
|
|
"setName": "rs",
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
(
|
|
SERVER_TYPE.RSSecondary,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"hosts": ["a"],
|
|
"setName": "rs",
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
(
|
|
SERVER_TYPE.Mongos,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"msg": "isdbgrid",
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
(
|
|
SERVER_TYPE.RSArbiter,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"arbiterOnly": True,
|
|
"hosts": ["a"],
|
|
"setName": "rs",
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
(
|
|
SERVER_TYPE.Standalone,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
# A "slave" in a master-slave deployment.
|
|
# This replication type was removed in MongoDB
|
|
# 4.0.
|
|
(
|
|
SERVER_TYPE.Standalone,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
),
|
|
]:
|
|
t = create_mock_topology(direct_connection=True)
|
|
|
|
# Can't select a server while the only server is of type Unknown.
|
|
with self.assertRaisesRegex(ConnectionFailure, "No servers found yet"):
|
|
t.select_servers(any_server_selector, _Op.TEST, server_selection_timeout=0)
|
|
|
|
got_hello(t, address, hello_response)
|
|
|
|
# Topology type never changes.
|
|
self.assertEqual(TOPOLOGY_TYPE.Single, t.description.topology_type)
|
|
|
|
# No matter whether the server is writable,
|
|
# select_servers() returns it.
|
|
s = t.select_server(writable_server_selector, _Op.TEST)
|
|
self.assertEqual(server_type, s.description.server_type)
|
|
|
|
# Topology type single is always readable and writable regardless
|
|
# of server type or state.
|
|
self.assertEqual(t.description.topology_type_name, "Single")
|
|
self.assertTrue(t.description.has_writable_server())
|
|
self.assertTrue(t.description.has_readable_server())
|
|
self.assertTrue(t.description.has_readable_server(Secondary()))
|
|
self.assertTrue(
|
|
t.description.has_readable_server(Secondary(tag_sets=[{"tag": "does-not-exist"}]))
|
|
)
|
|
|
|
def test_reopen(self):
|
|
t = create_mock_topology()
|
|
|
|
# Additional calls are permitted.
|
|
t.open()
|
|
t.open()
|
|
|
|
def test_unavailable_seed(self):
|
|
t = create_mock_topology()
|
|
disconnected(t, address)
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "a"))
|
|
|
|
def test_round_trip_time(self):
|
|
round_trip_time = 125
|
|
available = True
|
|
|
|
class TestMonitor(Monitor):
|
|
def _check_with_socket(self, *args, **kwargs):
|
|
if available:
|
|
return (
|
|
Hello({"ok": 1, "maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION}),
|
|
round_trip_time,
|
|
)
|
|
else:
|
|
raise AutoReconnect("mock monitor error")
|
|
|
|
t = create_mock_topology(monitor_class=TestMonitor)
|
|
self.addCleanup(t.close)
|
|
s = t.select_server(writable_server_selector, _Op.TEST)
|
|
self.assertEqual(125, s.description.round_trip_time)
|
|
|
|
round_trip_time = 25
|
|
t.request_check_all()
|
|
|
|
# Exponential weighted average: .8 * 125 + .2 * 25 = 105.
|
|
self.assertAlmostEqual(105, s.description.round_trip_time)
|
|
|
|
# The server is temporarily down.
|
|
available = False
|
|
t.request_check_all()
|
|
|
|
def raises_err():
|
|
try:
|
|
t.select_server(writable_server_selector, _Op.TEST, server_selection_timeout=0.1)
|
|
except ConnectionFailure:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
wait_until(raises_err, "discover server is down")
|
|
self.assertIsNone(s.description.round_trip_time)
|
|
|
|
# Bring it back, RTT is now 20 milliseconds.
|
|
available = True
|
|
round_trip_time = 20
|
|
|
|
def new_average():
|
|
# We reset the average to the most recent measurement.
|
|
description = s.description
|
|
return (
|
|
description.round_trip_time is not None
|
|
and round(abs(20 - description.round_trip_time), 7) == 0
|
|
)
|
|
|
|
tries = 0
|
|
while not new_average():
|
|
t.request_check_all()
|
|
tries += 1
|
|
if tries > 10:
|
|
self.fail("Didn't ever calculate correct new average")
|
|
|
|
|
|
class TestMultiServerTopology(TopologyTest):
|
|
def test_readable_writable(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b"]},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
},
|
|
)
|
|
|
|
self.assertEqual(t.description.topology_type_name, "ReplicaSetWithPrimary")
|
|
self.assertTrue(t.description.has_writable_server())
|
|
self.assertTrue(t.description.has_readable_server())
|
|
self.assertTrue(t.description.has_readable_server(Secondary()))
|
|
self.assertFalse(t.description.has_readable_server(Secondary(tag_sets=[{"tag": "exists"}])))
|
|
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": False,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
},
|
|
)
|
|
|
|
self.assertEqual(t.description.topology_type_name, "ReplicaSetNoPrimary")
|
|
self.assertFalse(t.description.has_writable_server())
|
|
self.assertFalse(t.description.has_readable_server())
|
|
self.assertTrue(t.description.has_readable_server(Secondary()))
|
|
self.assertFalse(t.description.has_readable_server(Secondary(tag_sets=[{"tag": "exists"}])))
|
|
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b"]},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
"tags": {"tag": "exists"},
|
|
},
|
|
)
|
|
|
|
self.assertEqual(t.description.topology_type_name, "ReplicaSetWithPrimary")
|
|
self.assertTrue(t.description.has_writable_server())
|
|
self.assertTrue(t.description.has_readable_server())
|
|
self.assertTrue(t.description.has_readable_server(Secondary()))
|
|
self.assertTrue(t.description.has_readable_server(Secondary(tag_sets=[{"tag": "exists"}])))
|
|
|
|
def test_close(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b"]},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
},
|
|
)
|
|
|
|
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, "a"))
|
|
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, "b"))
|
|
self.assertTrue(get_monitor(t, "a").opened)
|
|
self.assertTrue(get_monitor(t, "b").opened)
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, t.description.topology_type)
|
|
|
|
t.close()
|
|
self.assertEqual(2, len(t.description.server_descriptions()))
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "a"))
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "b"))
|
|
self.assertFalse(get_monitor(t, "a").opened)
|
|
self.assertFalse(get_monitor(t, "b").opened)
|
|
self.assertEqual("rs", t.description.replica_set_name)
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, t.description.topology_type)
|
|
|
|
# A closed topology should not be updated when receiving a hello.
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b", "c"]},
|
|
)
|
|
|
|
self.assertEqual(2, len(t.description.server_descriptions()))
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "a"))
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "b"))
|
|
self.assertFalse(get_monitor(t, "a").opened)
|
|
self.assertFalse(get_monitor(t, "b").opened)
|
|
# Server c should not have been added.
|
|
self.assertEqual(None, get_server(t, "c"))
|
|
self.assertEqual("rs", t.description.replica_set_name)
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, t.description.topology_type)
|
|
|
|
def test_handle_error(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b"]},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
},
|
|
)
|
|
|
|
errctx = _ErrorContext(AutoReconnect("mock"), 0, 0, True, None)
|
|
t.handle_error(("a", 27017), errctx)
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "a"))
|
|
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(t, "b"))
|
|
self.assertEqual("rs", t.description.replica_set_name)
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, t.description.topology_type)
|
|
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "b"]},
|
|
)
|
|
|
|
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, "a"))
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, t.description.topology_type)
|
|
|
|
t.handle_error(("b", 27017), errctx)
|
|
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(t, "a"))
|
|
self.assertEqual(SERVER_TYPE.Unknown, get_type(t, "b"))
|
|
self.assertEqual("rs", t.description.replica_set_name)
|
|
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, t.description.topology_type)
|
|
|
|
def test_handle_error_removed_server(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
|
|
# No error resetting a server not in the TopologyDescription.
|
|
errctx = _ErrorContext(AutoReconnect("mock"), 0, 0, True, None)
|
|
t.handle_error(("b", 27017), errctx)
|
|
|
|
# Server was *not* added as type Unknown.
|
|
self.assertFalse(t.has_server(("b", 27017)))
|
|
|
|
def test_discover_set_name_from_primary(self):
|
|
# Discovering a replica set without the setName supplied by the user
|
|
# is not yet supported by MongoClient, but Topology can do it.
|
|
topology_settings = SetNameDiscoverySettings(
|
|
seeds=[address],
|
|
pool_class=MockPool, # type: ignore[arg-type]
|
|
monitor_class=DummyMonitor, # type: ignore[arg-type]
|
|
)
|
|
|
|
t = Topology(topology_settings)
|
|
self.assertEqual(t.description.replica_set_name, None)
|
|
self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
|
t.open()
|
|
got_hello(
|
|
t, address, {"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a"]}
|
|
)
|
|
|
|
self.assertEqual(t.description.replica_set_name, "rs")
|
|
self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetWithPrimary)
|
|
|
|
# Another response from the primary. Tests the code that processes
|
|
# primary response when topology type is already ReplicaSetWithPrimary.
|
|
got_hello(
|
|
t, address, {"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a"]}
|
|
)
|
|
|
|
# No change.
|
|
self.assertEqual(t.description.replica_set_name, "rs")
|
|
self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetWithPrimary)
|
|
|
|
def test_discover_set_name_from_secondary(self):
|
|
# Discovering a replica set without the setName supplied by the user
|
|
# is not yet supported by MongoClient, but Topology can do it.
|
|
topology_settings = SetNameDiscoverySettings(
|
|
seeds=[address],
|
|
pool_class=MockPool, # type: ignore[arg-type]
|
|
monitor_class=DummyMonitor, # type: ignore[arg-type]
|
|
)
|
|
|
|
t = Topology(topology_settings)
|
|
self.assertEqual(t.description.replica_set_name, None)
|
|
self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
|
t.open()
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a"],
|
|
},
|
|
)
|
|
|
|
self.assertEqual(t.description.replica_set_name, "rs")
|
|
self.assertEqual(t.description.topology_type, TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
|
|
|
def test_wire_version(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
t.description.check_compatible() # No error.
|
|
|
|
got_hello(
|
|
t, address, {"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a"]}
|
|
)
|
|
|
|
# Use defaults.
|
|
server = t.get_server_by_address(address)
|
|
self.assertEqual(server.description.min_wire_version, 0)
|
|
self.assertEqual(server.description.max_wire_version, 0)
|
|
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"setName": "rs",
|
|
"hosts": ["a"],
|
|
"minWireVersion": 1,
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
},
|
|
)
|
|
|
|
self.assertEqual(server.description.min_wire_version, 1)
|
|
self.assertEqual(server.description.max_wire_version, 8)
|
|
t.select_servers(any_server_selector, _Op.TEST)
|
|
|
|
# Incompatible.
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"setName": "rs",
|
|
"hosts": ["a"],
|
|
"minWireVersion": 26,
|
|
"maxWireVersion": 27,
|
|
},
|
|
)
|
|
|
|
try:
|
|
t.select_servers(any_server_selector, _Op.TEST)
|
|
except ConfigurationError as e:
|
|
# Error message should say which server failed and why.
|
|
self.assertEqual(
|
|
str(e),
|
|
"Server at a:27017 requires wire version 26, but this version "
|
|
"of PyMongo only supports up to %d." % (common.MAX_SUPPORTED_WIRE_VERSION,),
|
|
)
|
|
else:
|
|
self.fail("No error with incompatible wire version")
|
|
|
|
# Incompatible.
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"setName": "rs",
|
|
"hosts": ["a"],
|
|
"minWireVersion": 0,
|
|
"maxWireVersion": 0,
|
|
},
|
|
)
|
|
|
|
try:
|
|
t.select_servers(any_server_selector, _Op.TEST)
|
|
except ConfigurationError as e:
|
|
# Error message should say which server failed and why.
|
|
self.assertEqual(
|
|
str(e),
|
|
"Server at a:27017 reports wire version 0, but this version "
|
|
"of PyMongo requires at least %d (MongoDB %s)."
|
|
% (common.MIN_SUPPORTED_WIRE_VERSION, common.MIN_SUPPORTED_SERVER_VERSION),
|
|
)
|
|
else:
|
|
self.fail("No error with incompatible wire version")
|
|
|
|
def test_max_write_batch_size(self):
|
|
t = create_mock_topology(seeds=["a", "b"], replica_set_name="rs")
|
|
|
|
def write_batch_size():
|
|
s = t.select_server(writable_server_selector, _Op.TEST)
|
|
return s.description.max_write_batch_size
|
|
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
"maxWriteBatchSize": 1,
|
|
},
|
|
)
|
|
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
"maxWriteBatchSize": 2,
|
|
},
|
|
)
|
|
|
|
# Uses primary's max batch size.
|
|
self.assertEqual(1, write_batch_size())
|
|
|
|
# b becomes primary.
|
|
got_hello(
|
|
t,
|
|
("b", 27017),
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: True,
|
|
"setName": "rs",
|
|
"hosts": ["a", "b"],
|
|
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
|
|
"maxWriteBatchSize": 2,
|
|
},
|
|
)
|
|
|
|
self.assertEqual(2, write_batch_size())
|
|
|
|
def test_topology_repr(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
self.addCleanup(t.close)
|
|
got_hello(
|
|
t,
|
|
("a", 27017),
|
|
{"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a", "c", "b"]},
|
|
)
|
|
self.assertEqual(
|
|
repr(t.description),
|
|
f"<TopologyDescription id: {t._topology_id}, "
|
|
"topology_type: ReplicaSetWithPrimary, servers: ["
|
|
"<ServerDescription ('a', 27017) server_type: RSPrimary, rtt: 0>, "
|
|
"<ServerDescription ('b', 27017) server_type: Unknown,"
|
|
" rtt: None>, "
|
|
"<ServerDescription ('c', 27017) server_type: Unknown,"
|
|
" rtt: None>]>",
|
|
)
|
|
|
|
def test_unexpected_load_balancer(self):
|
|
# Note: This behavior should not be reachable in practice but we
|
|
# should handle it gracefully nonetheless. See PYTHON-2791.
|
|
# Load balancers are included in topology with a single seed.
|
|
t = create_mock_topology(seeds=["a"])
|
|
mock_lb_response = {
|
|
"ok": 1,
|
|
"msg": "isdbgrid",
|
|
"serviceId": ObjectId(),
|
|
"maxWireVersion": 13,
|
|
}
|
|
got_hello(t, ("a", 27017), mock_lb_response)
|
|
sds = t.description.server_descriptions()
|
|
self.assertIn(("a", 27017), sds)
|
|
self.assertEqual(sds[("a", 27017)].server_type_name, "LoadBalancer")
|
|
self.assertEqual(t.description.topology_type_name, "Single")
|
|
self.assertTrue(t.description.has_writable_server())
|
|
|
|
# Load balancers are removed from a topology with multiple seeds.
|
|
t = create_mock_topology(seeds=["a", "b"])
|
|
got_hello(t, ("a", 27017), mock_lb_response)
|
|
self.assertNotIn(("a", 27017), t.description.server_descriptions())
|
|
self.assertEqual(t.description.topology_type_name, "Unknown")
|
|
|
|
|
|
def wait_for_primary(topology):
|
|
"""Wait for a Topology to discover a writable server.
|
|
|
|
If the monitor is currently calling hello, a blocking call to
|
|
select_server from this thread can trigger a spurious wake of the monitor
|
|
thread. In applications this is harmless but it would break some tests,
|
|
so we pass server_selection_timeout=0 and poll instead.
|
|
"""
|
|
|
|
def get_primary():
|
|
try:
|
|
return topology.select_server(writable_server_selector, _Op.TEST, 0)
|
|
except ConnectionFailure:
|
|
return None
|
|
|
|
return wait_until(get_primary, "find primary")
|
|
|
|
|
|
class TestTopologyErrors(TopologyTest):
|
|
# Errors when calling hello.
|
|
|
|
@flaky(reason="PYTHON-5366")
|
|
def test_pool_reset(self):
|
|
# hello succeeds at first, then always raises socket error.
|
|
hello_count = [0]
|
|
|
|
class TestMonitor(Monitor):
|
|
def _check_with_socket(self, *args, **kwargs):
|
|
hello_count[0] += 1
|
|
if hello_count[0] == 1:
|
|
return Hello({"ok": 1, "maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION}), 0
|
|
else:
|
|
raise AutoReconnect("mock monitor error")
|
|
|
|
t = create_mock_topology(monitor_class=TestMonitor)
|
|
self.addCleanup(t.close)
|
|
server = wait_for_primary(t)
|
|
self.assertEqual(1, hello_count[0])
|
|
generation = server.pool.gen.get_overall()
|
|
|
|
# Pool is reset by hello failure.
|
|
t.request_check_all()
|
|
self.assertNotEqual(generation, server.pool.gen.get_overall())
|
|
|
|
def test_hello_retry(self):
|
|
# hello succeeds at first, then raises socket error, then succeeds.
|
|
hello_count = [0]
|
|
|
|
class TestMonitor(Monitor):
|
|
def _check_with_socket(self, *args, **kwargs):
|
|
hello_count[0] += 1
|
|
if hello_count[0] in (1, 3):
|
|
return Hello({"ok": 1, "maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION}), 0
|
|
else:
|
|
raise AutoReconnect(f"mock monitor error #{hello_count[0]}")
|
|
|
|
t = create_mock_topology(monitor_class=TestMonitor)
|
|
self.addCleanup(t.close)
|
|
server = wait_for_primary(t)
|
|
self.assertEqual(1, hello_count[0])
|
|
self.assertEqual(SERVER_TYPE.Standalone, server.description.server_type)
|
|
|
|
# Second hello call, server is marked Unknown, then the monitor
|
|
# immediately runs a retry (third hello).
|
|
t.request_check_all()
|
|
# The third hello call (the immediate retry) happens sometime soon
|
|
# after the failed check triggered by request_check_all. Wait until
|
|
# the server becomes known again.
|
|
server = t.select_server(writable_server_selector, _Op.TEST, 0.250)
|
|
self.assertEqual(SERVER_TYPE.Standalone, server.description.server_type)
|
|
self.assertEqual(3, hello_count[0])
|
|
|
|
def test_internal_monitor_error(self):
|
|
exception = AssertionError("internal error")
|
|
|
|
class TestMonitor(Monitor):
|
|
def _check_with_socket(self, *args, **kwargs):
|
|
raise exception
|
|
|
|
t = create_mock_topology(monitor_class=TestMonitor)
|
|
self.addCleanup(t.close)
|
|
with self.assertRaisesRegex(ConnectionFailure, "internal error"):
|
|
t.select_server(any_server_selector, _Op.TEST, server_selection_timeout=0.5)
|
|
|
|
|
|
class TestServerSelectionErrors(TopologyTest):
|
|
def assertMessage(self, message, topology, selector=any_server_selector):
|
|
with self.assertRaises(ConnectionFailure) as context:
|
|
topology.select_server(selector, _Op.TEST, server_selection_timeout=0)
|
|
|
|
self.assertIn(message, str(context.exception))
|
|
|
|
def test_no_primary(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "rs",
|
|
"hosts": ["a"],
|
|
},
|
|
)
|
|
|
|
self.assertMessage(
|
|
'No replica set members match selector "Primary()"', t, ReadPreference.PRIMARY
|
|
)
|
|
|
|
self.assertMessage("No primary available for writes", t, writable_server_selector)
|
|
|
|
def test_no_secondary(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t, address, {"ok": 1, HelloCompat.LEGACY_CMD: True, "setName": "rs", "hosts": ["a"]}
|
|
)
|
|
|
|
self.assertMessage(
|
|
"No replica set members match selector"
|
|
' "Secondary(tag_sets=None, max_staleness=-1, hedge=None)"',
|
|
t,
|
|
ReadPreference.SECONDARY,
|
|
)
|
|
|
|
self.assertMessage(
|
|
"No replica set members match selector"
|
|
" \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=-1, "
|
|
'hedge=None)"',
|
|
t,
|
|
Secondary(tag_sets=[{"dc": "ny"}]),
|
|
)
|
|
|
|
def test_bad_replica_set_name(self):
|
|
t = create_mock_topology(replica_set_name="rs")
|
|
got_hello(
|
|
t,
|
|
address,
|
|
{
|
|
"ok": 1,
|
|
HelloCompat.LEGACY_CMD: False,
|
|
"secondary": True,
|
|
"setName": "wrong",
|
|
"hosts": ["a"],
|
|
},
|
|
)
|
|
|
|
self.assertMessage('No replica set members available for replica set name "rs"', t)
|
|
|
|
def test_multiple_standalones(self):
|
|
# Standalones are removed from a topology with multiple seeds.
|
|
t = create_mock_topology(seeds=["a", "b"])
|
|
got_hello(t, ("a", 27017), {"ok": 1})
|
|
got_hello(t, ("b", 27017), {"ok": 1})
|
|
self.assertMessage("No servers available", t)
|
|
|
|
def test_no_mongoses(self):
|
|
# Standalones are removed from a topology with multiple seeds.
|
|
t = create_mock_topology(seeds=["a", "b"])
|
|
|
|
# Discover a mongos and change topology type to Sharded.
|
|
got_hello(t, ("a", 27017), {"ok": 1, "msg": "isdbgrid"})
|
|
|
|
# Oops, both servers are standalone now. Remove them.
|
|
got_hello(t, ("a", 27017), {"ok": 1})
|
|
got_hello(t, ("b", 27017), {"ok": 1})
|
|
self.assertMessage("No mongoses available", t)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|