PYTHON-3866 add types to topology_description.py (#1339)

This commit is contained in:
Iris 2023-08-03 16:08:44 -07:00 committed by GitHub
parent 359e924719
commit 54840752d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -15,7 +15,18 @@
"""Represent a deployment of MongoDB servers."""
from random import sample
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Tuple,
cast,
)
from bson.min_key import MinKey
from bson.objectid import ObjectId
@ -99,7 +110,7 @@ class TopologyDescription:
s.logical_session_timeout_minutes for s in readable_servers
)
def _init_incompatible_err(self):
def _init_incompatible_err(self) -> None:
"""Internal compatibility check for non-load balanced topologies."""
for s in self._server_descriptions.values():
if not s.is_server_type_known:
@ -253,14 +264,16 @@ class TopologyDescription:
def srv_max_hosts(self) -> int:
return self._topology_settings._srv_max_hosts
def _apply_local_threshold(self, selection):
def _apply_local_threshold(self, selection: Optional[Selection]) -> List[ServerDescription]:
if not selection:
return []
# Round trip time in seconds.
fastest = min(s.round_trip_time for s in selection.server_descriptions)
fastest = min(cast(float, s.round_trip_time) for s in selection.server_descriptions)
threshold = self._topology_settings.local_threshold_ms / 1000.0
return [
s for s in selection.server_descriptions if (s.round_trip_time - fastest) <= threshold
s
for s in selection.server_descriptions
if (cast(float, s.round_trip_time) - fastest) <= threshold
]
def apply_selector(
@ -344,7 +357,7 @@ class TopologyDescription:
"""
return self.has_readable_server(ReadPreference.PRIMARY)
def __repr__(self):
def __repr__(self) -> str:
# Sort the servers by address.
servers = sorted(self._server_descriptions.values(), key=lambda sd: sd.address)
return "<{} id: {}, topology_type: {}, servers: {!r}>".format(
@ -472,7 +485,9 @@ def updated_topology_description(
)
def _updated_topology_description_srv_polling(topology_description, seedlist):
def _updated_topology_description_srv_polling(
topology_description: TopologyDescription, seedlist: List[Tuple[str, Any]]
) -> TopologyDescription:
"""Return an updated copy of a TopologyDescription.
:Parameters:
@ -515,8 +530,12 @@ def _updated_topology_description_srv_polling(topology_description, seedlist):
def _update_rs_from_primary(
sds, replica_set_name, server_description, max_set_version, max_election_id
):
sds: MutableMapping[_Address, ServerDescription],
replica_set_name: Optional[str],
server_description: ServerDescription,
max_set_version: Optional[int],
max_election_id: Optional[ObjectId],
) -> Tuple[int, Optional[str], Optional[int], Optional[ObjectId]]:
"""Update topology description from a primary's hello response.
Pass in a dict of ServerDescriptions, current replica set name, the
@ -536,8 +555,8 @@ def _update_rs_from_primary(
return _check_has_primary(sds), replica_set_name, max_set_version, max_election_id
if server_description.max_wire_version is None or server_description.max_wire_version < 17:
new_election_tuple = server_description.set_version, server_description.election_id
max_election_tuple = max_set_version, max_election_id
new_election_tuple: Tuple = (server_description.set_version, server_description.election_id)
max_election_tuple: Tuple = (max_set_version, max_election_id)
if None not in new_election_tuple:
if None not in max_election_tuple and new_election_tuple < max_election_tuple:
# Stale primary, set to type Unknown.
@ -589,7 +608,11 @@ def _update_rs_from_primary(
return (_check_has_primary(sds), replica_set_name, max_set_version, max_election_id)
def _update_rs_with_primary_from_member(sds, replica_set_name, server_description):
def _update_rs_with_primary_from_member(
sds: MutableMapping[_Address, ServerDescription],
replica_set_name: Optional[str],
server_description: ServerDescription,
) -> int:
"""RS with known primary. Process a response from a non-primary.
Pass in a dict of ServerDescriptions, current replica set name, and the
@ -608,7 +631,11 @@ def _update_rs_with_primary_from_member(sds, replica_set_name, server_descriptio
return _check_has_primary(sds)
def _update_rs_no_primary_from_member(sds, replica_set_name, server_description):
def _update_rs_no_primary_from_member(
sds: MutableMapping[_Address, ServerDescription],
replica_set_name: Optional[str],
server_description: ServerDescription,
) -> Tuple[int, Optional[str]]:
"""RS without known primary. Update from a non-primary's response.
Pass in a dict of ServerDescriptions, current replica set name, and the
@ -636,7 +663,7 @@ def _update_rs_no_primary_from_member(sds, replica_set_name, server_description)
return topology_type, replica_set_name
def _check_has_primary(sds):
def _check_has_primary(sds: Mapping[_Address, ServerDescription]) -> int:
"""Current topology type is ReplicaSetWithPrimary. Is primary still known?
Pass in a dict of ServerDescriptions.