PYTHON-4780 Implement fast path for server selection with Primary (#2416)
This commit is contained in:
parent
5a640daf92
commit
06872f7f03
2
.github/workflows/zizmor.yml
vendored
2
.github/workflows/zizmor.yml
vendored
@ -18,4 +18,4 @@ jobs:
|
||||
with:
|
||||
persist-credentials: false
|
||||
- name: Run zizmor 🌈
|
||||
uses: zizmorcore/zizmor-action@0f0557ab4a0b31211d42435e42df31cbd63fdd59
|
||||
uses: zizmorcore/zizmor-action@1c7106082dbc1753372e3924b7da1b9417011a21
|
||||
|
||||
@ -10,6 +10,7 @@ PyMongo 4.14 brings a number of changes including:
|
||||
- Added :meth:`pymongo.asynchronous.mongo_client.AsyncMongoClient.append_metadata` and
|
||||
:meth:`pymongo.mongo_client.MongoClient.append_metadata` to allow instantiated MongoClients to send client metadata
|
||||
on-demand
|
||||
- Improved performance of selecting a server with the Primary selector.
|
||||
|
||||
- Introduces a minor breaking change. When encoding :class:`bson.binary.BinaryVector`, a ``ValueError`` will be raised
|
||||
if the 'padding' metadata field is < 0 or > 7, or non-zero for any type other than PACKED_BIT.
|
||||
|
||||
@ -34,7 +34,7 @@ from bson.min_key import MinKey
|
||||
from bson.objectid import ObjectId
|
||||
from pymongo import common
|
||||
from pymongo.errors import ConfigurationError, PyMongoError
|
||||
from pymongo.read_preferences import ReadPreference, _AggWritePref, _ServerMode
|
||||
from pymongo.read_preferences import Primary, ReadPreference, _AggWritePref, _ServerMode
|
||||
from pymongo.server_description import ServerDescription
|
||||
from pymongo.server_selectors import Selection
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
@ -324,6 +324,17 @@ class TopologyDescription:
|
||||
description = self.server_descriptions().get(address)
|
||||
return [description] if description else []
|
||||
|
||||
# Primary selection fast path.
|
||||
if self.topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary and type(selector) is Primary:
|
||||
for sd in self._server_descriptions.values():
|
||||
if sd.server_type == SERVER_TYPE.RSPrimary:
|
||||
sds = [sd]
|
||||
if custom_selector:
|
||||
sds = custom_selector(sds)
|
||||
return sds
|
||||
# No primary found, return an empty list.
|
||||
return []
|
||||
|
||||
selection = Selection.from_topology_description(self)
|
||||
# Ignore read preference for sharded clusters.
|
||||
if self.topology_type != TOPOLOGY_TYPE.Sharded:
|
||||
|
||||
@ -130,12 +130,12 @@ class TestCustomServerSelectorFunction(AsyncIntegrationTest):
|
||||
test_collection = mongo_client.testdb.test_collection
|
||||
self.addAsyncCleanup(mongo_client.drop_database, "testdb")
|
||||
|
||||
# Do N operations and test selector is called at least N times.
|
||||
# Do N operations and test selector is called at least N-1 times due to fast path.
|
||||
await test_collection.insert_one({"age": 20, "name": "John"})
|
||||
await test_collection.insert_one({"age": 31, "name": "Jane"})
|
||||
await test_collection.update_one({"name": "Jane"}, {"$set": {"age": 21}})
|
||||
await test_collection.find_one({"name": "Roe"})
|
||||
self.assertGreaterEqual(selector.call_count, 4)
|
||||
self.assertGreaterEqual(selector.call_count, 3)
|
||||
|
||||
@async_client_context.require_replica_set
|
||||
async def test_latency_threshold_application(self):
|
||||
|
||||
@ -130,12 +130,12 @@ class TestCustomServerSelectorFunction(IntegrationTest):
|
||||
test_collection = mongo_client.testdb.test_collection
|
||||
self.addCleanup(mongo_client.drop_database, "testdb")
|
||||
|
||||
# Do N operations and test selector is called at least N times.
|
||||
# Do N operations and test selector is called at least N-1 times due to fast path.
|
||||
test_collection.insert_one({"age": 20, "name": "John"})
|
||||
test_collection.insert_one({"age": 31, "name": "Jane"})
|
||||
test_collection.update_one({"name": "Jane"}, {"$set": {"age": 21}})
|
||||
test_collection.find_one({"name": "Roe"})
|
||||
self.assertGreaterEqual(selector.call_count, 4)
|
||||
self.assertGreaterEqual(selector.call_count, 3)
|
||||
|
||||
@client_context.require_replica_set
|
||||
def test_latency_threshold_application(self):
|
||||
|
||||
@ -30,7 +30,7 @@ from bson.objectid import ObjectId
|
||||
from pymongo import common
|
||||
from pymongo.errors import AutoReconnect, ConfigurationError, ConnectionFailure
|
||||
from pymongo.hello import Hello, HelloCompat
|
||||
from pymongo.read_preferences import ReadPreference, Secondary
|
||||
from pymongo.read_preferences import Primary, ReadPreference, Secondary
|
||||
from pymongo.server_description import ServerDescription
|
||||
from pymongo.server_selectors import any_server_selector, writable_server_selector
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
@ -51,7 +51,10 @@ address = ("a", 27017)
|
||||
|
||||
|
||||
def create_mock_topology(
|
||||
seeds=None, replica_set_name=None, monitor_class=DummyMonitor, direct_connection=False
|
||||
seeds=None,
|
||||
replica_set_name=None,
|
||||
monitor_class=DummyMonitor,
|
||||
direct_connection=False,
|
||||
):
|
||||
partitioned_seeds = list(map(common.partition_node, seeds or ["a"]))
|
||||
topology_settings = TopologySettings(
|
||||
@ -123,6 +126,25 @@ class TestTopologyConfiguration(TopologyTest):
|
||||
# The monitor, not its pool, is responsible for calling hello.
|
||||
self.assertTrue(monitor._pool.is_sdam)
|
||||
|
||||
def test_selector_fast_path(self):
|
||||
topology = create_mock_topology(seeds=["a", "b:27018"], replica_set_name="foo")
|
||||
description = topology.description
|
||||
description._topology_type = TOPOLOGY_TYPE.ReplicaSetWithPrimary
|
||||
|
||||
# There is no primary yet, so it should give an empty list.
|
||||
self.assertEqual(description.apply_selector(Primary()), [])
|
||||
|
||||
# If we set a primary server, we should get it back.
|
||||
sd = list(description._server_descriptions.values())[0]
|
||||
sd._server_type = SERVER_TYPE.RSPrimary
|
||||
self.assertEqual(description.apply_selector(Primary()), [sd])
|
||||
|
||||
# If there is a custom selector, it should be applied.
|
||||
def custom_selector(servers):
|
||||
return []
|
||||
|
||||
self.assertEqual(description.apply_selector(Primary(), custom_selector=custom_selector), [])
|
||||
|
||||
|
||||
class TestSingleServerTopology(TopologyTest):
|
||||
def test_direct_connection(self):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user