diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 346f7ad6e..f53a9642e 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -130,6 +130,7 @@ def _parse_pool_options(options): options.get('compressors', []), options.get('zlibcompressionlevel', -1)) ssl_context, ssl_match_hostname = _parse_ssl_options(options) + load_balanced = options.get('loadbalanced') return PoolOptions(max_pool_size, min_pool_size, max_idle_time_seconds, @@ -140,7 +141,8 @@ def _parse_pool_options(options): appname, driver, compression_settings, - server_api=server_api) + server_api=server_api, + load_balanced=load_balanced) class ClientOptions(object): diff --git a/pymongo/ismaster.py b/pymongo/ismaster.py index 0c38fa6b1..59eedec22 100644 --- a/pymongo/ismaster.py +++ b/pymongo/ismaster.py @@ -26,7 +26,9 @@ def _get_server_type(doc): if not doc.get('ok'): return SERVER_TYPE.Unknown - if doc.get('isreplicaset'): + if doc.get('serviceId'): + return SERVER_TYPE.LoadBalancer + elif doc.get('isreplicaset'): return SERVER_TYPE.RSGhost elif doc.get('setName'): if doc.get('hidden'): @@ -58,7 +60,8 @@ class IsMaster(object): self._is_writable = self._server_type in ( SERVER_TYPE.RSPrimary, SERVER_TYPE.Standalone, - SERVER_TYPE.Mongos) + SERVER_TYPE.Mongos, + SERVER_TYPE.LoadBalancer) self._is_readable = ( self.server_type == SERVER_TYPE.RSSecondary @@ -185,3 +188,7 @@ class IsMaster(object): @property def awaitable(self): return self._awaitable + + @property + def service_id(self): + return self._doc.get('serviceId') diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index c3098cea7..ebd11970a 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -926,7 +926,8 @@ class MongoClient(common.BaseObject): 'Cannot use "address" property when load balancing among' ' mongoses, use "nodes" instead.') if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary, - TOPOLOGY_TYPE.Single): + TOPOLOGY_TYPE.Single, + TOPOLOGY_TYPE.LoadBalanced): return None return self._server_property('address') diff --git a/pymongo/pool.py b/pymongo/pool.py index a0dda5d9f..728fec0f6 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -262,7 +262,7 @@ class PoolOptions(object): '__ssl_context', '__ssl_match_hostname', '__socket_keepalive', '__event_listeners', '__appname', '__driver', '__metadata', '__compression_settings', '__max_connecting', - '__pause_enabled', '__server_api') + '__pause_enabled', '__server_api', '__load_balanced') def __init__(self, max_pool_size=MAX_POOL_SIZE, min_pool_size=MIN_POOL_SIZE, @@ -272,7 +272,7 @@ class PoolOptions(object): ssl_match_hostname=True, socket_keepalive=True, event_listeners=None, appname=None, driver=None, compression_settings=None, max_connecting=MAX_CONNECTING, - pause_enabled=True, server_api=None): + pause_enabled=True, server_api=None, load_balanced=None): self.__max_pool_size = max_pool_size self.__min_pool_size = min_pool_size self.__max_idle_time_seconds = max_idle_time_seconds @@ -290,6 +290,7 @@ class PoolOptions(object): self.__max_connecting = max_connecting self.__pause_enabled = pause_enabled self.__server_api = server_api + self.__load_balanced = load_balanced self.__metadata = copy.deepcopy(_METADATA) if appname: self.__metadata['application'] = {'name': appname} @@ -452,6 +453,12 @@ class PoolOptions(object): """ return self.__server_api + @property + def load_balanced(self): + """True if this Pool is configured in load balanced mode. + """ + return self.__load_balanced + def _negotiate_creds(all_credentials): """Return one credential that needs mechanism negotiation, if any. @@ -531,6 +538,8 @@ class SocketInfo(object): self.cancel_context = _CancellationContext() self.opts = pool.opts self.more_to_come = False + # For load balancer support. + self.service_id = None def hello_cmd(self): if self.opts.server_api: @@ -551,6 +560,8 @@ class SocketInfo(object): cmd['client'] = self.opts.metadata if self.compression_settings: cmd['compression'] = self.compression_settings.compressors + if self.opts.load_balanced: + cmd['loadBalanced'] = True elif topology_version is not None: cmd['topologyVersion'] = topology_version cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000) @@ -574,6 +585,10 @@ class SocketInfo(object): doc = self.command('admin', cmd, publish_events=False, exhaust_allowed=awaitable) + # PYTHON-2712 will remove this topologyVersion fallback logic. + if self.opts.load_balanced: + process_id = doc.get('topologyVersion', {}).get('processId') + doc.setdefault('serviceId', process_id) ismaster = IsMaster(doc, awaitable=awaitable) self.is_writable = ismaster.is_writable self.max_wire_version = ismaster.max_wire_version @@ -595,6 +610,12 @@ class SocketInfo(object): auth_ctx.parse_response(ismaster) if auth_ctx.speculate_succeeded(): self.auth_ctx[auth_ctx.credentials] = auth_ctx + if self.opts.load_balanced: + if not ismaster.service_id: + raise ConfigurationError( + 'Driver attempted to initialize in load balancing mode' + ' but the server does not support this mode') + self.service_id = ismaster.service_id return ismaster def _next_reply(self): @@ -1113,7 +1134,8 @@ class Pool: with self.size_cond: if self.closed: return - if self.opts.pause_enabled and pause: + if (self.opts.pause_enabled and pause and + not self.opts.load_balanced): old_state, self.state = self.state, PoolState.PAUSED self.generation += 1 newpid = os.getpid() diff --git a/pymongo/server.py b/pymongo/server.py index c5ddd9bea..fbfddae2e 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -46,7 +46,8 @@ class Server(object): Multiple calls have no effect. """ - self._monitor.open() + if not self._pool.opts.load_balanced: + self._monitor.open() def reset(self): """Clear the connection pool.""" diff --git a/pymongo/server_description.py b/pymongo/server_description.py index 897faa3d3..5dc8222fe 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -206,7 +206,8 @@ class ServerDescription(object): """Checks if this server supports retryable writes.""" return ( self._ls_timeout_minutes is not None and - self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary)) + self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary, + SERVER_TYPE.LoadBalancer)) @property def retryable_reads_supported(self): diff --git a/pymongo/server_type.py b/pymongo/server_type.py index c231aa04c..101f9dba4 100644 --- a/pymongo/server_type.py +++ b/pymongo/server_type.py @@ -20,4 +20,4 @@ from collections import namedtuple SERVER_TYPE = namedtuple('ServerType', ['Unknown', 'Mongos', 'RSPrimary', 'RSSecondary', 'RSArbiter', 'RSOther', 'RSGhost', - 'Standalone'])(*range(8)) + 'Standalone', 'LoadBalancer'])(*range(9)) diff --git a/pymongo/settings.py b/pymongo/settings.py index 91807ffc0..c866d1671 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -132,7 +132,9 @@ class TopologySettings(object): return self._load_balanced def get_topology_type(self): - if self.direct: + if self.load_balanced: + return TOPOLOGY_TYPE.LoadBalanced + elif self.direct: return TOPOLOGY_TYPE.Single elif self.replica_set_name is not None: return TOPOLOGY_TYPE.ReplicaSetNoPrimary diff --git a/pymongo/topology.py b/pymongo/topology.py index a101ec9c1..446bb9353 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -34,6 +34,7 @@ from pymongo.errors import (ConnectionFailure, PyMongoError, ServerSelectionTimeoutError, WriteError) +from pymongo.ismaster import IsMaster from pymongo.monitor import SrvMonitor from pymongo.pool import PoolOptions from pymongo.server import Server @@ -136,7 +137,8 @@ class Topology(object): executor.open() self._srv_monitor = None - if self._settings.fqdn is not None: + if (self._settings.fqdn is not None and + not self._settings.load_balanced): self._srv_monitor = SrvMonitor(self, self._settings) def open(self): @@ -489,29 +491,38 @@ class Topology(object): with self._lock: return self._session_pool.pop_all() - def get_server_session(self): - """Start or resume a server session, or raise ConfigurationError.""" - with self._lock: - session_timeout = self._description.logical_session_timeout_minutes - if session_timeout is None: - # Maybe we need an initial scan? Can raise ServerSelectionError. - if self._description.topology_type == TOPOLOGY_TYPE.Single: - if not self._description.has_known_servers: - self._select_servers_loop( - any_server_selector, - self._settings.server_selection_timeout, - None) - elif not self._description.readable_servers: + def _check_session_support(self): + """Internal check for session support on non-load balanced clusters.""" + session_timeout = self._description.logical_session_timeout_minutes + if session_timeout is None: + # Maybe we need an initial scan? Can raise ServerSelectionError. + if self._description.topology_type == TOPOLOGY_TYPE.Single: + if not self._description.has_known_servers: self._select_servers_loop( - readable_server_selector, + any_server_selector, self._settings.server_selection_timeout, None) + elif not self._description.readable_servers: + self._select_servers_loop( + readable_server_selector, + self._settings.server_selection_timeout, + None) session_timeout = self._description.logical_session_timeout_minutes if session_timeout is None: raise ConfigurationError( "Sessions are not supported by this MongoDB deployment") + return session_timeout + def get_server_session(self): + """Start or resume a server session, or raise ConfigurationError.""" + with self._lock: + # Sessions are always supported in load balanced mode. + if not self._settings.load_balanced: + session_timeout = self._check_session_support() + else: + # Sessions never time out in load balanced mode. + session_timeout = float('inf') return self._session_pool.get_server_session(session_timeout) def return_server_session(self, server_session, lock): @@ -551,6 +562,12 @@ class Topology(object): SRV_POLLING_TOPOLOGIES): self._srv_monitor.open() + if self._settings.load_balanced: + # Emit initial SDAM events for load balancer mode. + self._process_change(ServerDescription( + self._seed_addresses[0], + IsMaster({'ok': 1, 'serviceId': self._topology_id}))) + # Ensure that the monitors are open. for server in self._servers.values(): server.open() @@ -608,20 +625,23 @@ class Topology(object): if err_code in helpers._NOT_MASTER_CODES: is_shutting_down = err_code in helpers._SHUTDOWN_CODES # Mark server Unknown, clear the pool, and request check. - self._process_change(ServerDescription(address, error=error)) + if not self._settings.load_balanced: + self._process_change(ServerDescription(address, error=error)) if is_shutting_down or (err_ctx.max_wire_version <= 7): # Clear the pool. server.reset() server.request_check() elif not err_ctx.completed_handshake: # Unknown command error during the connection handshake. - self._process_change(ServerDescription(address, error=error)) + if not self._settings.load_balanced: + self._process_change(ServerDescription(address, error=error)) # Clear the pool. server.reset() elif issubclass(exc_type, ConnectionFailure): # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." - self._process_change(ServerDescription(address, error=error)) + if not self._settings.load_balanced: + self._process_change(ServerDescription(address, error=error)) # Clear the pool. server.reset() # "When a client marks a server Unknown from `Network error when diff --git a/pymongo/topology_description.py b/pymongo/topology_description.py index 0b72b65f3..1c5a1f456 100644 --- a/pymongo/topology_description.py +++ b/pymongo/topology_description.py @@ -25,9 +25,9 @@ from pymongo.server_type import SERVER_TYPE # Enumeration for various kinds of MongoDB cluster topologies. -TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary', - 'ReplicaSetWithPrimary', 'Sharded', - 'Unknown'])(*range(5)) +TOPOLOGY_TYPE = namedtuple('TopologyType', [ + 'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded', + 'Unknown', 'LoadBalanced'])(*range(6)) # Topologies compatible with SRV record polling. SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded) @@ -63,7 +63,28 @@ class TopologyDescription(object): # Is PyMongo compatible with all servers' wire protocols? self._incompatible_err = None + if self._topology_type != TOPOLOGY_TYPE.LoadBalanced: + self._init_incompatible_err() + # Server Discovery And Monitoring Spec: Whenever a client updates the + # TopologyDescription from an ismaster response, it MUST set + # TopologyDescription.logicalSessionTimeoutMinutes to the smallest + # logicalSessionTimeoutMinutes value among ServerDescriptions of all + # data-bearing server types. If any have a null + # logicalSessionTimeoutMinutes, then + # TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null. + readable_servers = self.readable_servers + if not readable_servers: + self._ls_timeout_minutes = None + elif any(s.logical_session_timeout_minutes is None + for s in readable_servers): + self._ls_timeout_minutes = None + else: + self._ls_timeout_minutes = min(s.logical_session_timeout_minutes + for s in readable_servers) + + def _init_incompatible_err(self): + """Internal compatibility check for non-load balanced topologies.""" for s in self._server_descriptions.values(): if not s.is_server_type_known: continue @@ -98,23 +119,6 @@ class TopologyDescription(object): break - # Server Discovery And Monitoring Spec: Whenever a client updates the - # TopologyDescription from an ismaster response, it MUST set - # TopologyDescription.logicalSessionTimeoutMinutes to the smallest - # logicalSessionTimeoutMinutes value among ServerDescriptions of all - # data-bearing server types. If any have a null - # logicalSessionTimeoutMinutes, then - # TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null. - readable_servers = self.readable_servers - if not readable_servers: - self._ls_timeout_minutes = None - elif any(s.logical_session_timeout_minutes is None - for s in readable_servers): - self._ls_timeout_minutes = None - else: - self._ls_timeout_minutes = min(s.logical_session_timeout_minutes - for s in readable_servers) - def check_compatible(self): """Raise ConfigurationError if any server is incompatible. @@ -243,8 +247,9 @@ class TopologyDescription(object): selector.min_wire_version, common_wv)) - if self.topology_type == TOPOLOGY_TYPE.Single: - # Ignore selectors for standalone. + if self.topology_type in (TOPOLOGY_TYPE.Single, + TOPOLOGY_TYPE.LoadBalanced): + # Ignore selectors for standalone and load balancer mode. return self.known_servers elif address: # Ignore selectors when explicit address is requested. @@ -306,6 +311,7 @@ _SERVER_TYPE_TO_TOPOLOGY_TYPE = { SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary, SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary, SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary, + # Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out. }