PYTHON-2672 SDAM, CMAP, and server selection changes for load balancers (#621)

Disable SRV Polling, SDAM compatibility check, logicalSessionTimeoutMinutes check.
server session pool pruning, server selection, and server monitoring.
A ServerType of LoadBalancer MUST be considered a data-bearing server.
"drivers MUST emit the following series of SDAM events" section.
Send loadBalanced:True with handshakes, validate serviceId.
Add topologyVersion fallback when serviceId is missing.
Don't mark load balancers unknown.
This commit is contained in:
Shane Harvey 2021-05-18 14:12:49 -07:00 committed by GitHub
parent 2a74601572
commit 5bf15c8e18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 113 additions and 51 deletions

View File

@ -130,6 +130,7 @@ def _parse_pool_options(options):
options.get('compressors', []),
options.get('zlibcompressionlevel', -1))
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
load_balanced = options.get('loadbalanced')
return PoolOptions(max_pool_size,
min_pool_size,
max_idle_time_seconds,
@ -140,7 +141,8 @@ def _parse_pool_options(options):
appname,
driver,
compression_settings,
server_api=server_api)
server_api=server_api,
load_balanced=load_balanced)
class ClientOptions(object):

View File

@ -26,7 +26,9 @@ def _get_server_type(doc):
if not doc.get('ok'):
return SERVER_TYPE.Unknown
if doc.get('isreplicaset'):
if doc.get('serviceId'):
return SERVER_TYPE.LoadBalancer
elif doc.get('isreplicaset'):
return SERVER_TYPE.RSGhost
elif doc.get('setName'):
if doc.get('hidden'):
@ -58,7 +60,8 @@ class IsMaster(object):
self._is_writable = self._server_type in (
SERVER_TYPE.RSPrimary,
SERVER_TYPE.Standalone,
SERVER_TYPE.Mongos)
SERVER_TYPE.Mongos,
SERVER_TYPE.LoadBalancer)
self._is_readable = (
self.server_type == SERVER_TYPE.RSSecondary
@ -185,3 +188,7 @@ class IsMaster(object):
@property
def awaitable(self):
return self._awaitable
@property
def service_id(self):
return self._doc.get('serviceId')

View File

@ -926,7 +926,8 @@ class MongoClient(common.BaseObject):
'Cannot use "address" property when load balancing among'
' mongoses, use "nodes" instead.')
if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
TOPOLOGY_TYPE.Single):
TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
return None
return self._server_property('address')

View File

@ -262,7 +262,7 @@ class PoolOptions(object):
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings', '__max_connecting',
'__pause_enabled', '__server_api')
'__pause_enabled', '__server_api', '__load_balanced')
def __init__(self, max_pool_size=MAX_POOL_SIZE,
min_pool_size=MIN_POOL_SIZE,
@ -272,7 +272,7 @@ class PoolOptions(object):
ssl_match_hostname=True, socket_keepalive=True,
event_listeners=None, appname=None, driver=None,
compression_settings=None, max_connecting=MAX_CONNECTING,
pause_enabled=True, server_api=None):
pause_enabled=True, server_api=None, load_balanced=None):
self.__max_pool_size = max_pool_size
self.__min_pool_size = min_pool_size
self.__max_idle_time_seconds = max_idle_time_seconds
@ -290,6 +290,7 @@ class PoolOptions(object):
self.__max_connecting = max_connecting
self.__pause_enabled = pause_enabled
self.__server_api = server_api
self.__load_balanced = load_balanced
self.__metadata = copy.deepcopy(_METADATA)
if appname:
self.__metadata['application'] = {'name': appname}
@ -452,6 +453,12 @@ class PoolOptions(object):
"""
return self.__server_api
@property
def load_balanced(self):
"""True if this Pool is configured in load balanced mode.
"""
return self.__load_balanced
def _negotiate_creds(all_credentials):
"""Return one credential that needs mechanism negotiation, if any.
@ -531,6 +538,8 @@ class SocketInfo(object):
self.cancel_context = _CancellationContext()
self.opts = pool.opts
self.more_to_come = False
# For load balancer support.
self.service_id = None
def hello_cmd(self):
if self.opts.server_api:
@ -551,6 +560,8 @@ class SocketInfo(object):
cmd['client'] = self.opts.metadata
if self.compression_settings:
cmd['compression'] = self.compression_settings.compressors
if self.opts.load_balanced:
cmd['loadBalanced'] = True
elif topology_version is not None:
cmd['topologyVersion'] = topology_version
cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000)
@ -574,6 +585,10 @@ class SocketInfo(object):
doc = self.command('admin', cmd, publish_events=False,
exhaust_allowed=awaitable)
# PYTHON-2712 will remove this topologyVersion fallback logic.
if self.opts.load_balanced:
process_id = doc.get('topologyVersion', {}).get('processId')
doc.setdefault('serviceId', process_id)
ismaster = IsMaster(doc, awaitable=awaitable)
self.is_writable = ismaster.is_writable
self.max_wire_version = ismaster.max_wire_version
@ -595,6 +610,12 @@ class SocketInfo(object):
auth_ctx.parse_response(ismaster)
if auth_ctx.speculate_succeeded():
self.auth_ctx[auth_ctx.credentials] = auth_ctx
if self.opts.load_balanced:
if not ismaster.service_id:
raise ConfigurationError(
'Driver attempted to initialize in load balancing mode'
' but the server does not support this mode')
self.service_id = ismaster.service_id
return ismaster
def _next_reply(self):
@ -1113,7 +1134,8 @@ class Pool:
with self.size_cond:
if self.closed:
return
if self.opts.pause_enabled and pause:
if (self.opts.pause_enabled and pause and
not self.opts.load_balanced):
old_state, self.state = self.state, PoolState.PAUSED
self.generation += 1
newpid = os.getpid()

View File

@ -46,7 +46,8 @@ class Server(object):
Multiple calls have no effect.
"""
self._monitor.open()
if not self._pool.opts.load_balanced:
self._monitor.open()
def reset(self):
"""Clear the connection pool."""

View File

@ -206,7 +206,8 @@ class ServerDescription(object):
"""Checks if this server supports retryable writes."""
return (
self._ls_timeout_minutes is not None and
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary))
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary,
SERVER_TYPE.LoadBalancer))
@property
def retryable_reads_supported(self):

View File

@ -20,4 +20,4 @@ from collections import namedtuple
SERVER_TYPE = namedtuple('ServerType',
['Unknown', 'Mongos', 'RSPrimary', 'RSSecondary',
'RSArbiter', 'RSOther', 'RSGhost',
'Standalone'])(*range(8))
'Standalone', 'LoadBalancer'])(*range(9))

View File

@ -132,7 +132,9 @@ class TopologySettings(object):
return self._load_balanced
def get_topology_type(self):
if self.direct:
if self.load_balanced:
return TOPOLOGY_TYPE.LoadBalanced
elif self.direct:
return TOPOLOGY_TYPE.Single
elif self.replica_set_name is not None:
return TOPOLOGY_TYPE.ReplicaSetNoPrimary

View File

@ -34,6 +34,7 @@ from pymongo.errors import (ConnectionFailure,
PyMongoError,
ServerSelectionTimeoutError,
WriteError)
from pymongo.ismaster import IsMaster
from pymongo.monitor import SrvMonitor
from pymongo.pool import PoolOptions
from pymongo.server import Server
@ -136,7 +137,8 @@ class Topology(object):
executor.open()
self._srv_monitor = None
if self._settings.fqdn is not None:
if (self._settings.fqdn is not None and
not self._settings.load_balanced):
self._srv_monitor = SrvMonitor(self, self._settings)
def open(self):
@ -489,29 +491,38 @@ class Topology(object):
with self._lock:
return self._session_pool.pop_all()
def get_server_session(self):
"""Start or resume a server session, or raise ConfigurationError."""
with self._lock:
session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
# Maybe we need an initial scan? Can raise ServerSelectionError.
if self._description.topology_type == TOPOLOGY_TYPE.Single:
if not self._description.has_known_servers:
self._select_servers_loop(
any_server_selector,
self._settings.server_selection_timeout,
None)
elif not self._description.readable_servers:
def _check_session_support(self):
"""Internal check for session support on non-load balanced clusters."""
session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
# Maybe we need an initial scan? Can raise ServerSelectionError.
if self._description.topology_type == TOPOLOGY_TYPE.Single:
if not self._description.has_known_servers:
self._select_servers_loop(
readable_server_selector,
any_server_selector,
self._settings.server_selection_timeout,
None)
elif not self._description.readable_servers:
self._select_servers_loop(
readable_server_selector,
self._settings.server_selection_timeout,
None)
session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
raise ConfigurationError(
"Sessions are not supported by this MongoDB deployment")
return session_timeout
def get_server_session(self):
"""Start or resume a server session, or raise ConfigurationError."""
with self._lock:
# Sessions are always supported in load balanced mode.
if not self._settings.load_balanced:
session_timeout = self._check_session_support()
else:
# Sessions never time out in load balanced mode.
session_timeout = float('inf')
return self._session_pool.get_server_session(session_timeout)
def return_server_session(self, server_session, lock):
@ -551,6 +562,12 @@ class Topology(object):
SRV_POLLING_TOPOLOGIES):
self._srv_monitor.open()
if self._settings.load_balanced:
# Emit initial SDAM events for load balancer mode.
self._process_change(ServerDescription(
self._seed_addresses[0],
IsMaster({'ok': 1, 'serviceId': self._topology_id})))
# Ensure that the monitors are open.
for server in self._servers.values():
server.open()
@ -608,20 +625,23 @@ class Topology(object):
if err_code in helpers._NOT_MASTER_CODES:
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
# Mark server Unknown, clear the pool, and request check.
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
if is_shutting_down or (err_ctx.max_wire_version <= 7):
# Clear the pool.
server.reset()
server.request_check()
elif not err_ctx.completed_handshake:
# Unknown command error during the connection handshake.
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
# Clear the pool.
server.reset()
elif issubclass(exc_type, ConnectionFailure):
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
# Clear the pool.
server.reset()
# "When a client marks a server Unknown from `Network error when

View File

@ -25,9 +25,9 @@ from pymongo.server_type import SERVER_TYPE
# Enumeration for various kinds of MongoDB cluster topologies.
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
'ReplicaSetWithPrimary', 'Sharded',
'Unknown'])(*range(5))
TOPOLOGY_TYPE = namedtuple('TopologyType', [
'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded',
'Unknown', 'LoadBalanced'])(*range(6))
# Topologies compatible with SRV record polling.
SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
@ -63,7 +63,28 @@ class TopologyDescription(object):
# Is PyMongo compatible with all servers' wire protocols?
self._incompatible_err = None
if self._topology_type != TOPOLOGY_TYPE.LoadBalanced:
self._init_incompatible_err()
# Server Discovery And Monitoring Spec: Whenever a client updates the
# TopologyDescription from an ismaster response, it MUST set
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
# data-bearing server types. If any have a null
# logicalSessionTimeoutMinutes, then
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
readable_servers = self.readable_servers
if not readable_servers:
self._ls_timeout_minutes = None
elif any(s.logical_session_timeout_minutes is None
for s in readable_servers):
self._ls_timeout_minutes = None
else:
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
for s in readable_servers)
def _init_incompatible_err(self):
"""Internal compatibility check for non-load balanced topologies."""
for s in self._server_descriptions.values():
if not s.is_server_type_known:
continue
@ -98,23 +119,6 @@ class TopologyDescription(object):
break
# Server Discovery And Monitoring Spec: Whenever a client updates the
# TopologyDescription from an ismaster response, it MUST set
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
# data-bearing server types. If any have a null
# logicalSessionTimeoutMinutes, then
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
readable_servers = self.readable_servers
if not readable_servers:
self._ls_timeout_minutes = None
elif any(s.logical_session_timeout_minutes is None
for s in readable_servers):
self._ls_timeout_minutes = None
else:
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
for s in readable_servers)
def check_compatible(self):
"""Raise ConfigurationError if any server is incompatible.
@ -243,8 +247,9 @@ class TopologyDescription(object):
selector.min_wire_version,
common_wv))
if self.topology_type == TOPOLOGY_TYPE.Single:
# Ignore selectors for standalone.
if self.topology_type in (TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
# Ignore selectors for standalone and load balancer mode.
return self.known_servers
elif address:
# Ignore selectors when explicit address is requested.
@ -306,6 +311,7 @@ _SERVER_TYPE_TO_TOPOLOGY_TYPE = {
SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
# Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out.
}