diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index e65958d25..9cd74202a 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -48,9 +48,9 @@ from pymongo import (auth, thread_util, uri_parser) from pymongo.client_options import ClientOptions -from pymongo.topology_description import CLUSTER_TYPE +from pymongo.topology_description import TOPOLOGY_TYPE from pymongo.cursor_manager import CursorManager -from pymongo.topology import Cluster +from pymongo.topology import Topology from pymongo.errors import (ConfigurationError, ConnectionFailure, InvalidURI, AutoReconnect, OperationFailure, @@ -62,7 +62,7 @@ from pymongo.server_selectors import (any_server_selector, writable_preferred_server_selector, writable_server_selector) from pymongo.server_type import SERVER_TYPE -from pymongo.settings import ClusterSettings +from pymongo.settings import TopologySettings def _partition_node(node): @@ -300,7 +300,7 @@ class MongoClient(common.BaseObject): if creds: self._cache_credentials(creds.source, creds) - self._cluster_settings = ClusterSettings( + self._topology_settings = TopologySettings( seeds=seeds, set_name=options.replica_set_name, pool_class=pool_class, @@ -308,9 +308,9 @@ class MongoClient(common.BaseObject): monitor_class=monitor_class, condition_class=condition_class) - self._cluster = Cluster(self._cluster_settings) + self._topology = Topology(self._topology_settings) if connect: - self._cluster.open() + self._topology.open() def _cache_credentials(self, source, credentials, connect=False): """Save a set of authentication credentials. @@ -329,7 +329,7 @@ class MongoClient(common.BaseObject): 'to this database. You must logout first.') if connect: - server = self._get_cluster().select_server( + server = self._get_topology().select_server( writable_preferred_server_selector) # get_socket() logs out of the database if logged in with old @@ -405,7 +405,7 @@ class MongoClient(common.BaseObject): ServerDescription first, then use its properties. """ try: - server = self._cluster.select_server( + server = self._topology.select_server( writable_server_selector, server_wait_time=0) return getattr(server.description, attr_name) @@ -452,7 +452,7 @@ class MongoClient(common.BaseObject): MongoClient gained this property in version 3.0 when MongoReplicaSetClient's functionality was merged in. """ - return self._cluster.get_primary() + return self._topology.get_primary() @property def secondaries(self): @@ -464,7 +464,7 @@ class MongoClient(common.BaseObject): MongoClient gained this property in version 3.0 when MongoReplicaSetClient's functionality was merged in. """ - return self._cluster.get_secondaries() + return self._topology.get_secondaries() @property def arbiters(self): @@ -473,7 +473,7 @@ class MongoClient(common.BaseObject): A sequence of (host, port) pairs. Empty if this client is not connected to a replica set. """ - return self._cluster.get_arbiters() + return self._topology.get_arbiters() @property def is_primary(self): @@ -508,7 +508,7 @@ class MongoClient(common.BaseObject): Nodes are either specified when this instance was created, or discovered through the replica set discovery mechanism. """ - description = self._cluster.description + description = self._topology.description return frozenset(s.address for s in description.known_servers) @property @@ -587,16 +587,16 @@ class MongoClient(common.BaseObject): Can raise ConnectionFailure. """ - cluster = self._get_cluster() # Starts monitors if necessary. - server = cluster.select_server(writable_server_selector) + topology = self._get_topology() # Starts monitors if necessary. + server = topology.select_server(writable_server_selector) return server.description.max_wire_version def _is_writable(self): """Attempt to connect to a writable server, or return False. """ - cluster = self._get_cluster() # Starts monitors if necessary. + topology = self._get_topology() # Starts monitors if necessary. try: - s = cluster.select_server(writable_server_selector) + s = topology.select_server(writable_server_selector) # When directly connected to a secondary, arbiter, etc., # select_server returns it, whatever the selector. Check @@ -612,7 +612,7 @@ class MongoClient(common.BaseObject): pools. If this instance is used again it will be automatically re-opened. """ - self._cluster.reset() + self._topology.reset() def close(self): """Alias for :meth:`disconnect` @@ -644,7 +644,7 @@ class MongoClient(common.BaseObject): # reported an error. try: # TODO: Mongos pinning. - server = self._cluster.select_server( + server = self._topology.select_server( writable_server_selector, server_wait_time=0) @@ -678,14 +678,14 @@ class MongoClient(common.BaseObject): self.__cursor_manager = manager - def _get_cluster(self): - """Get the internal :class:`~pymongo.cluster.Cluster` object. + def _get_topology(self): + """Get the internal :class:`~pymongo.topology.Topology` object. - If this client was created with "connect=False", calling _get_cluster + If this client was created with "connect=False", calling _get_topology launches the connection process in the background. """ - self._cluster.open() - return self._cluster + self._topology.open() + return self._topology def __check_gle_response(self, response, is_command): """Check a response to a lastError message for errors. @@ -749,15 +749,15 @@ class MongoClient(common.BaseObject): - `address` (optional): Optional address when sending a getMore or killCursors to a specific server. """ - cluster = self._get_cluster() + topology = self._get_topology() if address: assert not check_primary, "Can't use check_primary with address" - server = cluster.get_server_by_address(address) + server = topology.get_server_by_address(address) if not server: raise AutoReconnect('server %s:%d no longer available' % address) else: - server = cluster.select_server(writable_server_selector) + server = topology.select_server(writable_server_selector) is_writable = server.description.is_writable if check_primary and not with_last_error and not is_writable: @@ -801,9 +801,9 @@ class MongoClient(common.BaseObject): - `exhaust` (optional): If True, the socket used stays checked out. It is returned along with its Pool in the Response. """ - cluster = self._get_cluster() + topology = self._get_topology() if address: - server = cluster.get_server_by_address(address) + server = topology.get_server_by_address(address) if not server: raise AutoReconnect('server %s:%d no longer available' % address) @@ -813,7 +813,7 @@ class MongoClient(common.BaseObject): else: selector = writable_server_selector - server = cluster.select_server(selector) + server = topology.select_server(selector) if self.in_request() and not server.in_request(): server.start_request() @@ -844,14 +844,14 @@ class MongoClient(common.BaseObject): def __reset_server(self, address): """Clear our connection pool for a server and mark it Unknown.""" - self._cluster.reset_server(address) + self._topology.reset_server(address) def _reset_server_and_request_check(self, address): """Clear our pool for a server, mark it Unknown, and check it soon.""" - self._cluster.reset_server(address) - server = self._cluster.get_server_by_address(address) + self._topology.reset_server(address) + server = self._topology.get_server_by_address(address) - # "server" is None if another thread removed it from the cluster. + # "server" is None if another thread removed it from the topology. if server: server.request_check() @@ -883,7 +883,7 @@ class MongoClient(common.BaseObject): # lazily. These greedy calls are to make PyMongo 2.x's request # tests pass. try: - servers = self._cluster.select_servers(any_server_selector, + servers = self._topology.select_servers(any_server_selector, server_wait_time=0) for s in servers: @@ -908,7 +908,7 @@ class MongoClient(common.BaseObject): """ if 0 == self.__request_counter.dec(): try: - servers = self._cluster.select_servers(any_server_selector, + servers = self._topology.select_servers(any_server_selector, server_wait_time=0) for s in servers: @@ -926,7 +926,7 @@ class MongoClient(common.BaseObject): return not self == other def __repr__(self): - server_descriptions = self._cluster.description.server_descriptions() + server_descriptions = self._topology.description.server_descriptions() if len(server_descriptions) == 1: description, = server_descriptions.values() return "MongoClient(%r, %r)" % description.address @@ -976,15 +976,15 @@ class MongoClient(common.BaseObject): # TODO: update this, pass address to cursor_manager.close(). # PyMongo 2.x introduced a configurable CursorManager which sends # OP_KILLCURSORS to the server. The API doesn't handle a multi-server - # cluster, where we must pass the address of the server that receives + # topology, where we must pass the address of the server that receives # the message. Support CursorManager for backwards compatibility, but # only for single servers. if self.__cursor_manager: - cluster_type = self._cluster.description.cluster_type - if cluster_type not in (CLUSTER_TYPE.Single, CLUSTER_TYPE.Sharded): + topology_type = self._topology.description.topology_type + if topology_type not in (TOPOLOGY_TYPE.Single, TOPOLOGY_TYPE.Sharded): raise InvalidOperation( - "Can't use custom CursorManager with cluster type %s" % - CLUSTER_TYPE._fields[cluster_type]) + "Can't use custom CursorManager with topology type %s" % + TOPOLOGY_TYPE._fields[topology_type]) self.__cursor_manager.close(cursor_id) else: @@ -1079,9 +1079,9 @@ class MongoClient(common.BaseObject): if from_host is not None: command["fromhost"] = from_host - # _get_cluster() starts connecting, if we initialized with + # _get_topology() starts connecting, if we initialized with # connect=False. - server = self._get_cluster().select_server( + server = self._get_topology().select_server( writable_server_selector) if self.in_request() and not server.in_request(): diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index 8539465d1..653f75084 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -45,6 +45,6 @@ class MongoReplicaSetClient(mongo_client.MongoClient): super(MongoReplicaSetClient, self).__init__(*args, **kwargs) def __repr__(self): - sds = self._cluster.description.server_descriptions() + sds = self._topology.description.server_descriptions() return "MongoReplicaSetClient(%r)" % (["%s:%d" % s.address for s in sds.values()],) diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 003cb59ed..cb6ad2261 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -31,22 +31,22 @@ class Monitor(object): def __init__( self, server_description, - cluster, + topology, pool, - cluster_settings): + topology_settings): """Class to monitor a MongoDB server on a background thread. - Pass an initial ServerDescription, a Cluster, a Pool, and a - ClusterSettings. + Pass an initial ServerDescription, a Topology, a Pool, and + TopologySettings. - The Cluster is weakly referenced. The Pool must be exclusive to this + The Topology is weakly referenced. The Pool must be exclusive to this Monitor. """ super(Monitor, self).__init__() self._server_description = server_description - self._cluster = weakref.proxy(cluster) + self._topology = weakref.proxy(topology) self._pool = pool - self._settings = cluster_settings + self._settings = topology_settings self._stopped = False self._event = thread_util.Event(self._settings.condition_class) self._thread = None @@ -96,9 +96,9 @@ class Monitor(object): while not self._stopped: try: self._server_description = self._check_with_retry() - self._cluster.on_change(self._server_description) + self._topology.on_change(self._server_description) except ReferenceError: - # Cluster was garbage-collected. + # Topology was garbage-collected. self.close() else: start = time.time() # TODO: monotonic. @@ -122,7 +122,7 @@ class Monitor(object): if new_server_description: return new_server_description else: - self._cluster.reset_pool(self._server_description.address) + self._topology.reset_pool(self._server_description.address) if retry: server_description = self._check_once() if server_description: diff --git a/pymongo/server.py b/pymongo/server.py index 91a96ee46..0fdf2e10f 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -12,7 +12,7 @@ # implied. See the License for the specific language governing # permissions and limitations under the License. -"""Communicate with one MongoDB server in a cluster.""" +"""Communicate with one MongoDB server in a topology.""" import socket diff --git a/pymongo/server_description.py b/pymongo/server_description.py index 0f518c62b..b49f560c0 100644 --- a/pymongo/server_description.py +++ b/pymongo/server_description.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Represent one server in the cluster.""" +"""Represent one server in the topology.""" from pymongo.server_type import SERVER_TYPE from pymongo.ismaster import IsMaster diff --git a/pymongo/settings.py b/pymongo/settings.py index 0c147542d..fcc02ffe9 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -17,12 +17,12 @@ import threading from pymongo import common, monitor, pool -from pymongo.topology_description import CLUSTER_TYPE +from pymongo.topology_description import TOPOLOGY_TYPE from pymongo.pool import PoolOptions from pymongo.server_description import ServerDescription -class ClusterSettings(object): +class TopologySettings(object): def __init__( self, seeds=None, @@ -77,13 +77,13 @@ class ClusterSettings(object): """ return self._direct - def get_cluster_type(self): + def get_topology_type(self): if self.direct: - return CLUSTER_TYPE.Single + return TOPOLOGY_TYPE.Single elif self.set_name is not None: - return CLUSTER_TYPE.ReplicaSetNoPrimary + return TOPOLOGY_TYPE.ReplicaSetNoPrimary else: - return CLUSTER_TYPE.Unknown + return TOPOLOGY_TYPE.Unknown def get_server_descriptions(self): """Initial dict of (address, ServerDescription) for all seeds.""" diff --git a/pymongo/thread_util.py b/pymongo/thread_util.py index ec005a14a..a1018cbb7 100644 --- a/pymongo/thread_util.py +++ b/pymongo/thread_util.py @@ -229,7 +229,7 @@ class Event(object): """Copy of standard threading.Event, but uses a custom condition class. Allows async frameworks to override monitors' synchronization behavior - with ClusterSettings.condition_class. + with TopologySettings.condition_class. Copied from CPython's threading.py at hash c7960cc9. """ diff --git a/pymongo/topology.py b/pymongo/topology.py index 162952a56..bcfd8dcc2 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -12,7 +12,7 @@ # implied. See the License for the specific language governing # permissions and limitations under the License. -"""Internal classes to monitor clusters of one or more servers.""" +"""Internal class to monitor a topology of one or more servers.""" import random import threading @@ -20,9 +20,9 @@ import time from bson.py3compat import itervalues from pymongo import common -from pymongo.topology_description import (updated_cluster_description, - CLUSTER_TYPE, - ClusterDescription) +from pymongo.topology_description import (updated_topology_description, + TOPOLOGY_TYPE, + TopologyDescription) from pymongo.errors import AutoReconnect from pymongo.server import Server from pymongo.server_selectors import (arbiter_server_selector, @@ -30,16 +30,16 @@ from pymongo.server_selectors import (arbiter_server_selector, writable_server_selector) -class Cluster(object): - """Monitor a cluster of one or more servers.""" - def __init__(self, cluster_settings): - self._settings = cluster_settings - cluster_description = ClusterDescription( - cluster_settings.get_cluster_type(), - cluster_settings.get_server_descriptions(), - cluster_settings.set_name) +class Topology(object): + """Monitor a topology of one or more servers.""" + def __init__(self, topology_settings): + self._settings = topology_settings + topology_description = TopologyDescription( + topology_settings.get_topology_type(), + topology_settings.get_server_descriptions(), + topology_settings.set_name) - self._description = cluster_description + self._description = topology_description self._opened = False self._lock = threading.Lock() self._condition = self._settings.condition_class(self._lock) @@ -82,19 +82,20 @@ class Cluster(object): # No suitable servers. if wait_time == 0 or now > end_time: # TODO: more error diagnostics. E.g., if state is - # ReplicaSet but every server is Unknown, and the host list - # is non-empty, and doesn't intersect with settings.seeds, - # the set is probably configured with internal hostnames or - # IPs and we're connecting from outside. Or if state is - # ReplicaSet and clusterDescription.server_descriptions is - # empty, we have the wrong set_name. Include - # ClusterDescription's stringification in exception msg. + # ReplicaSet but every server is Unknown, and the host + # list is non-empty, and doesn't intersect with + # settings.seeds, the set is probably configured with + # internal hostnames or IPs and we're connecting from + # outside. Or if we're a replica set and + # server_descriptions is empty, we have the wrong + # set_name. Include TopologyDescription's str() in + # exception msg. raise AutoReconnect("No suitable servers available") self._ensure_opened() self._request_check_all() - # Release the lock and wait for the cluster description to + # Release the lock and wait for the topology description to # change, or for a timeout. We won't miss any changes that # came after our most recent selector() call, since we've # held the lock until now. @@ -114,12 +115,12 @@ class Cluster(object): """Process a new ServerDescription after an ismaster call completes.""" # We do no I/O holding the lock. with self._lock: - # Any monitored server was definitely in the cluster description + # Any monitored server was definitely in the topology description # once. Check if it's still in the description or if some state- # change removed it. E.g., we got a host list from the primary # that didn't include this server. if self._description.has_server(server_description.address): - self._description = updated_cluster_description( + self._description = updated_topology_description( self._description, server_description) self._update_servers() @@ -136,10 +137,10 @@ class Cluster(object): def get_primary(self): """Return primary's address or None.""" - # Implemented here in Cluster instead of MongoClient, so it can lock. + # Implemented here in Topology instead of MongoClient, so it can lock. with self._lock: - cluster_type = self._description.cluster_type - if cluster_type != CLUSTER_TYPE.ReplicaSetWithPrimary: + topology_type = self._description.topology_type + if topology_type != TOPOLOGY_TYPE.ReplicaSetWithPrimary: return None description = writable_server_selector( @@ -149,11 +150,11 @@ class Cluster(object): def _get_replica_set_members(self, selector): """Return set of replica set member addresses.""" - # Implemented here in Cluster instead of MongoClient, so it can lock. + # Implemented here in Topology instead of MongoClient, so it can lock. with self._lock: - cluster_type = self._description.cluster_type - if cluster_type not in (CLUSTER_TYPE.ReplicaSetWithPrimary, - CLUSTER_TYPE.ReplicaSetNoPrimary): + topology_type = self._description.topology_type + if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary, + TOPOLOGY_TYPE.ReplicaSetNoPrimary): return [] descriptions = selector(self._description.known_servers) @@ -187,7 +188,7 @@ class Cluster(object): with self._lock: server = self._servers.get(address) - # "server" is None if another thread removed it from the cluster. + # "server" is None if another thread removed it from the topology. if server: server.pool.reset() @@ -198,7 +199,7 @@ class Cluster(object): def reset(self): """Reset all pools and disconnect from all servers. - The cluster reconnects on demand, or after common.HEARTBEAT_FREQUENCY + The Topology reconnects on demand, or after common.HEARTBEAT_FREQUENCY seconds. """ with self._lock: @@ -232,14 +233,14 @@ class Cluster(object): server.request_check() def _apply_selector(self, selector): - if self._description.cluster_type == CLUSTER_TYPE.Single: + if self._description.topology_type == TOPOLOGY_TYPE.Single: # Ignore the selector. return self._description.known_servers else: return selector(self._description.known_servers) def _update_servers(self): - """Sync our set of Servers from ClusterDescription.server_descriptions. + """Sync our set of Servers from TopologyDescription.server_descriptions. Hold the lock while calling this. """ diff --git a/pymongo/topology_description.py b/pymongo/topology_description.py index d5ec41b34..d52cb3b62 100644 --- a/pymongo/topology_description.py +++ b/pymongo/topology_description.py @@ -12,7 +12,7 @@ # implied. See the License for the specific language governing # permissions and limitations under the License. -"""Represent the cluster of servers.""" +"""Represent the topology of servers.""" from collections import namedtuple @@ -22,22 +22,22 @@ from pymongo.errors import ConfigurationError from pymongo.server_description import ServerDescription -CLUSTER_TYPE = namedtuple('ClusterType', ['Single', 'ReplicaSetNoPrimary', - 'ReplicaSetWithPrimary', 'Sharded', - 'Unknown'])(*range(5)) +TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary', + 'ReplicaSetWithPrimary', 'Sharded', + 'Unknown'])(*range(5)) -class ClusterDescription(object): - def __init__(self, cluster_type, server_descriptions, set_name): - """Represent a cluster of servers. +class TopologyDescription(object): + def __init__(self, topology_type, server_descriptions, set_name): + """Represent a topology of servers. :Parameters: - - `cluster_type`: initial type + - `topology_type`: initial type - `server_descriptions`: dict of (address, ServerDescription) for all seeds - `set_name`: replica set name or None """ - self._cluster_type = cluster_type + self._topology_type = topology_type self._set_name = set_name self._server_descriptions = server_descriptions @@ -80,33 +80,33 @@ class ClusterDescription(object): # The default ServerDescription's type is Unknown. sds[address] = ServerDescription(address) - if self._cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary: - cluster_type = _check_has_primary(sds) + if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary: + topology_type = _check_has_primary(sds) else: - cluster_type = self._cluster_type + topology_type = self._topology_type - return ClusterDescription(cluster_type, sds, self._set_name) + return TopologyDescription(topology_type, sds, self._set_name) def reset(self): """A copy of this description, with all servers marked Unknown.""" - if self._cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary: - cluster_type = CLUSTER_TYPE.ReplicaSetNoPrimary + if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary: + topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary else: - cluster_type = self._cluster_type + topology_type = self._topology_type # The default ServerDescription's type is Unknown. sds = dict((address, ServerDescription(address)) for address in self._server_descriptions) - return ClusterDescription(cluster_type, sds, self._set_name) + return TopologyDescription(topology_type, sds, self._set_name) def server_descriptions(self): """Dict of (address, ServerDescription).""" return self._server_descriptions.copy() @property - def cluster_type(self): - return self._cluster_type + def topology_type(self): + return self._topology_type @property def set_name(self): @@ -120,103 +120,103 @@ class ClusterDescription(object): if s.is_server_type_known] -# If cluster type is Unknown and we receive an ismaster response, what should -# the new cluster type be? -_SERVER_TYPE_TO_CLUSTER_TYPE = { - SERVER_TYPE.Mongos: CLUSTER_TYPE.Sharded, - SERVER_TYPE.RSPrimary: CLUSTER_TYPE.ReplicaSetWithPrimary, - SERVER_TYPE.RSSecondary: CLUSTER_TYPE.ReplicaSetNoPrimary, - SERVER_TYPE.RSArbiter: CLUSTER_TYPE.ReplicaSetNoPrimary, - SERVER_TYPE.RSOther: CLUSTER_TYPE.ReplicaSetNoPrimary, +# If topology type is Unknown and we receive an ismaster response, what should +# the new topology type be? +_SERVER_TYPE_TO_TOPOLOGY_TYPE = { + SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded, + SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary, + SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary, + SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary, + SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary, } -def updated_cluster_description(cluster_description, server_description): - """Return an updated copy of a ClusterDescription. +def updated_topology_description(topology_description, server_description): + """Return an updated copy of a TopologyDescription. :Parameters: - - `cluster_description`: the current ClusterDescription + - `topology_description`: the current TopologyDescription - `server_description`: a new ServerDescription that resulted from an ismaster call Called after attempting (successfully or not) to call ismaster on the - server at server_description.address. Does not modify cluster_description. + server at server_description.address. Does not modify topology_description. """ address = server_description.address # These values will be updated, if necessary, to form the new - # ClusterDescription. - cluster_type = cluster_description.cluster_type - set_name = cluster_description.set_name + # TopologyDescription. + topology_type = topology_description.topology_type + set_name = topology_description.set_name server_type = server_description.server_type # Don't mutate the original dict of server descriptions; copy it. - sds = cluster_description.server_descriptions() + sds = topology_description.server_descriptions() # Replace this server's description with the new one. sds[address] = server_description - if cluster_type == CLUSTER_TYPE.Single: + if topology_type == TOPOLOGY_TYPE.Single: # Single type never changes. - return ClusterDescription(CLUSTER_TYPE.Single, sds, set_name) + return TopologyDescription(TOPOLOGY_TYPE.Single, sds, set_name) - if cluster_type == CLUSTER_TYPE.Unknown: + if topology_type == TOPOLOGY_TYPE.Unknown: if server_type == SERVER_TYPE.Standalone: sds.pop(address) elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost): - cluster_type = _SERVER_TYPE_TO_CLUSTER_TYPE[server_type] + topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type] - if cluster_type == CLUSTER_TYPE.Sharded: + if topology_type == TOPOLOGY_TYPE.Sharded: if server_type != SERVER_TYPE.Mongos: sds.pop(address) - elif cluster_type == CLUSTER_TYPE.ReplicaSetNoPrimary: + elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary: if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos): sds.pop(address) elif server_type == SERVER_TYPE.RSPrimary: - cluster_type, set_name = _update_rs_from_primary( + topology_type, set_name = _update_rs_from_primary( sds, set_name, server_description) elif server_type in ( SERVER_TYPE.RSSecondary, SERVER_TYPE.RSArbiter, SERVER_TYPE.RSOther): - cluster_type, set_name = _update_rs_no_primary_from_member( + topology_type, set_name = _update_rs_no_primary_from_member( sds, set_name, server_description) - elif cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary: + elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary: if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos): sds.pop(address) - cluster_type = _check_has_primary(sds) + topology_type = _check_has_primary(sds) elif server_type == SERVER_TYPE.RSPrimary: - cluster_type, set_name = _update_rs_from_primary( + topology_type, set_name = _update_rs_from_primary( sds, set_name, server_description) elif server_type in ( SERVER_TYPE.RSSecondary, SERVER_TYPE.RSArbiter, SERVER_TYPE.RSOther): - cluster_type = _update_rs_with_primary_from_member( + topology_type = _update_rs_with_primary_from_member( sds, set_name, server_description) else: # Server type is Unknown or RSGhost: did we just lose the primary? - cluster_type = _check_has_primary(sds) + topology_type = _check_has_primary(sds) # Return updated copy. - return ClusterDescription(cluster_type, sds, set_name) + return TopologyDescription(topology_type, sds, set_name) def _update_rs_from_primary(sds, set_name, server_description): - """Update cluster description from a primary's ismaster response. + """Update topology description from a primary's ismaster response. Pass in a dict of ServerDescriptions, current replica set name, and the ServerDescription we are processing. - Returns (new cluster type, new set_name). + Returns (new topology type, new set_name). """ if set_name is None: set_name = server_description.set_name @@ -258,7 +258,7 @@ def _update_rs_with_primary_from_member(sds, set_name, server_description): Pass in a dict of ServerDescriptions, current replica set name, and the ServerDescription we are processing. - Returns new cluster type. + Returns new topology type. """ assert set_name is not None @@ -275,15 +275,15 @@ def _update_rs_no_primary_from_member(sds, set_name, server_description): Pass in a dict of ServerDescriptions, current replica set name, and the ServerDescription we are processing. - Returns (new cluster type, new set_name). + Returns (new topology type, new set_name). """ - cluster_type = CLUSTER_TYPE.ReplicaSetNoPrimary + topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary if set_name is None: set_name = server_description.set_name elif set_name != server_description.set_name: sds.pop(server_description.address) - return cluster_type, set_name + return topology_type, set_name # This isn't the primary's response, so don't remove any servers # it doesn't report. Only add new servers. @@ -291,18 +291,18 @@ def _update_rs_no_primary_from_member(sds, set_name, server_description): if address not in sds: sds[address] = ServerDescription(address) - return cluster_type, set_name + return topology_type, set_name def _check_has_primary(sds): - """Current cluster type is ReplicaSetWithPrimary. Is primary still known? + """Current topology type is ReplicaSetWithPrimary. Is primary still known? Pass in a dict of ServerDescriptions. - Returns new cluster type. + Returns new topology type. """ for s in sds.values(): if s.server_type == SERVER_TYPE.RSPrimary: - return CLUSTER_TYPE.ReplicaSetWithPrimary + return TOPOLOGY_TYPE.ReplicaSetWithPrimary else: - return CLUSTER_TYPE.ReplicaSetNoPrimary + return TOPOLOGY_TYPE.ReplicaSetNoPrimary diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index 0f2475eba..4e939381d 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -975,9 +975,9 @@ class TestReplicaSetRequest(HATestCase): self.assertTrue(self.c.in_request()) - cluster = self.c._get_cluster() - primary_pool = cluster.select_server(writable_server_selector).pool - secondary_pool = cluster.select_server(secondary_server_selector).pool + topology = self.c._get_topology() + primary_pool = topology.select_server(writable_server_selector).pool + secondary_pool = topology.select_server(secondary_server_selector).pool # Trigger start_request on primary pool utils.assertReadFrom(self, self.c, primary, PRIMARY) diff --git a/test/pymongo_mocks.py b/test/pymongo_mocks.py index 02981c150..446766c80 100644 --- a/test/pymongo_mocks.py +++ b/test/pymongo_mocks.py @@ -60,9 +60,9 @@ class MockMonitor(Monitor): self, client, server_description, - cluster, + topology, pool, - cluster_settings): + topology_settings): # MockMonitor gets a 'client' arg, regular monitors don't. self.client = client self.mock_address = server_description.address @@ -71,9 +71,9 @@ class MockMonitor(Monitor): Monitor.__init__( self, ServerDescription((default_host, default_port)), - cluster, + topology, pool, - cluster_settings) + topology_settings) def _check_once(self): try: @@ -90,10 +90,10 @@ class MockClient(MongoClient): def __init__( self, standalones, members, mongoses, ismaster_hosts=None, *args, **kwargs): - """A MongoClient connected to the default server, with a mock cluster. + """A MongoClient connected to the default server, with a mock topology. standalones, members, mongoses determine the configuration of the - cluster. They are formatted like ['a:1', 'b:2']. ismaster_hosts + topology. They are formatted like ['a:1', 'b:2']. ismaster_hosts provides an alternative host list for the server's mocked ismaster response; see test_connect_with_internal_ips. """ diff --git a/test/test_client.py b/test/test_client.py index 9136fb7e3..61fd81b21 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -453,7 +453,7 @@ class TestClient(IntegrationTest, TestRequestMixin): def f(pipe): try: - servers = self.client._cluster.select_servers( + servers = self.client._topology.select_servers( any_server_selector) # In child, only the thread that called fork() is alive. @@ -935,13 +935,13 @@ class TestClientProperties(MockClientTest): connect=False) c.set_wire_version_range('a:1', 1, 5) - c._get_cluster().select_servers(writable_server_selector) # Connect. + c._get_topology().select_servers(writable_server_selector) # Connect. self.assertEqual(c.min_wire_version, 1) self.assertEqual(c.max_wire_version, 5) c.set_wire_version_range('a:1', 10, 11) c.disconnect() - c._get_cluster() + c._get_topology() self.assertRaises(ConfigurationError, c.db.collection.find_one) def test_max_wire_version(self): @@ -1063,7 +1063,7 @@ class TestMongoClientFailover(MockClientTest): c.disconnect() self.assertEqual(0, len(c.nodes)) - c._get_cluster().select_servers(writable_server_selector) + c._get_topology().select_servers(writable_server_selector) self.assertEqual('b', c.host) self.assertEqual(2, c.port) @@ -1092,7 +1092,7 @@ class TestMongoClientFailover(MockClientTest): # But it can reconnect. c.revive_host('a:1') - c._get_cluster().select_servers(writable_server_selector) + c._get_topology().select_servers(writable_server_selector) self.assertEqual('a', c.host) self.assertEqual(1, c.port) @@ -1116,7 +1116,7 @@ class TestMongoClientFailover(MockClientTest): connected(c) wait_until(lambda: len(c.nodes) == 2, 'connect') - sd = c._get_cluster().get_server_by_address(('a', 1)).description + sd = c._get_topology().get_server_by_address(('a', 1)).description self.assertEqual(SERVER_TYPE.RSPrimary, sd.server_type) self.assertEqual(0, sd.min_wire_version) self.assertEqual(1, sd.max_wire_version) @@ -1127,13 +1127,13 @@ class TestMongoClientFailover(MockClientTest): self.assertRaises(AutoReconnect, c.db.collection.find_one) # The primary's description is reset. - sd_a = c._get_cluster().get_server_by_address(('a', 1)).description + sd_a = c._get_topology().get_server_by_address(('a', 1)).description self.assertEqual(SERVER_TYPE.Unknown, sd_a.server_type) self.assertEqual(0, sd_a.min_wire_version) self.assertEqual(0, sd_a.max_wire_version) # ...but not the secondary's. - sd_b = c._get_cluster().get_server_by_address(('b', 2)).description + sd_b = c._get_topology().get_server_by_address(('b', 2)).description self.assertEqual(SERVER_TYPE.RSSecondary, sd_b.server_type) self.assertEqual(0, sd_b.min_wire_version) self.assertEqual(2, sd_b.max_wire_version) diff --git a/test/test_cursor_manager.py b/test/test_cursor_manager.py index 8f6e72585..bfd80227d 100644 --- a/test/test_cursor_manager.py +++ b/test/test_cursor_manager.py @@ -77,7 +77,7 @@ class TestCursorManager(IntegrationTest, TestRequestMixin): @client_context.require_replica_set def test_cursor_manager_prohibited_with_rs(self): - # Test that kill_cursors() throws an error while the cluster type + # Test that kill_cursors() throws an error while the topology type # isn't Single or Sharded. client = get_client(connection_string(), replicaSet=client_context.setname) diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index 5bee2a0b2..d66ff7963 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Test the cluster module.""" +"""Test the topology module.""" import json import os @@ -22,12 +22,12 @@ import threading sys.path[0:0] = [""] from pymongo import common -from pymongo.topology import Cluster -from pymongo.topology_description import CLUSTER_TYPE +from pymongo.topology import Topology +from pymongo.topology_description import TOPOLOGY_TYPE from pymongo.ismaster import IsMaster from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription, SERVER_TYPE -from pymongo.settings import ClusterSettings +from pymongo.settings import TopologySettings from pymongo.uri_parser import parse_uri from test import unittest @@ -65,9 +65,9 @@ class MockPool(object): class MockMonitor(object): - def __init__(self, server_description, cluster, pool, cluster_settings): + def __init__(self, server_description, topology, pool, topology_settings): self._server_description = server_description - self._cluster = cluster + self._topology = topology def open(self): pass @@ -79,7 +79,7 @@ class MockMonitor(object): pass -def create_mock_cluster(uri, monitor_class=MockMonitor): +def create_mock_topology(uri, monitor_class=MockMonitor): # Some tests in the spec include URIs like mongodb://A/?connect=direct, # but PyMongo considers any single-seed URI with no setName to be "direct". parsed_uri = parse_uri(uri.replace('connect=direct', '')) @@ -87,28 +87,28 @@ def create_mock_cluster(uri, monitor_class=MockMonitor): if 'replicaset' in parsed_uri['options']: set_name = parsed_uri['options']['replicaset'] - cluster_settings = ClusterSettings( + topology_settings = TopologySettings( parsed_uri['nodelist'], set_name=set_name, pool_class=MockPool, monitor_class=monitor_class) - c = Cluster(cluster_settings) + c = Topology(topology_settings) c.open() return c -def got_ismaster(cluster, server_address, ismaster_response): +def got_ismaster(topology, server_address, ismaster_response): server_description = ServerDescription( server_address, IsMaster(ismaster_response), MovingAverage([0])) - cluster.on_change(server_description) + topology.on_change(server_description) -def get_type(cluster, hostname): - description = cluster.get_server_by_address((hostname, 27017)).description +def get_type(topology, hostname): + description = topology.get_server_by_address((hostname, 27017)).description return description.server_type @@ -116,28 +116,28 @@ class TestAllScenarios(unittest.TestCase): pass -def cluster_type_name(cluster_type): - return CLUSTER_TYPE._fields[cluster_type] +def topology_type_name(topology_type): + return TOPOLOGY_TYPE._fields[topology_type] def server_type_name(server_type): return SERVER_TYPE._fields[server_type] -def check_outcome(self, cluster, outcome): +def check_outcome(self, topology, outcome): expected_servers = outcome['servers'] # Check weak equality before proceeding. self.assertEqual( - len(cluster.description.server_descriptions()), + len(topology.description.server_descriptions()), len(expected_servers)) # Since lengths are equal, every actual server must have a corresponding # expected server. for expected_server_address, expected_server in expected_servers.items(): node = common.partition_node(expected_server_address) - self.assertTrue(cluster.has_server(node)) - actual_server = cluster.get_server_by_address(node) + self.assertTrue(topology.has_server(node)) + actual_server = topology.get_server_by_address(node) actual_server_description = actual_server.description if expected_server['type'] == 'PossiblePrimary': @@ -157,15 +157,15 @@ def check_outcome(self, cluster, outcome): expected_server['setName'], actual_server_description.set_name) - self.assertEqual(outcome['setName'], cluster.description.set_name) - expected_cluster_type = getattr(CLUSTER_TYPE, outcome['clusterType']) - self.assertEqual(cluster_type_name(expected_cluster_type), - cluster_type_name(cluster.description.cluster_type)) + self.assertEqual(outcome['setName'], topology.description.set_name) + expected_topology_type = getattr(TOPOLOGY_TYPE, outcome['clusterType']) + self.assertEqual(topology_type_name(expected_topology_type), + topology_type_name(topology.description.topology_type)) def create_test(scenario_def): def run_scenario(self): - c = create_mock_cluster(scenario_def['uri']) + c = create_mock_topology(scenario_def['uri']) for phase in scenario_def['phases']: for response in phase['responses']: diff --git a/test/test_pooling.py b/test/test_pooling.py index d88f1feeb..06ca8ea31 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -233,7 +233,7 @@ class _TestPoolingBase(unittest.TestCase): def assert_no_request(self): try: - server = self.c._cluster.select_server( + server = self.c._topology.select_server( writable_server_selector, server_wait_time=0) @@ -253,7 +253,7 @@ class _TestPoolingBase(unittest.TestCase): def assert_pool_size(self, pool_size): if pool_size == 0: try: - server = self.c._cluster.select_server( + server = self.c._topology.select_server( writable_server_selector, server_wait_time=0) diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 81d0be350..0c6504ca1 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -190,7 +190,7 @@ class TestReadPreferences(TestReadPreferencesBase): latencies = ', '.join( '%s: %dms' % (server.description.address, server.description.round_trip_time) - for server in c._get_cluster().select_servers(any_server_selector)) + for server in c._get_topology().select_servers(any_server_selector)) self.assertFalse(not_used, "Expected to use primary and all secondaries for mode NEAREST," diff --git a/test/test_topology.py b/test/test_topology.py index ff79f06bb..d0602d972 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Test the cluster module.""" +"""Test the topology module.""" import sys @@ -24,8 +24,8 @@ import threading from bson.py3compat import imap from pymongo import common from pymongo.server_type import SERVER_TYPE -from pymongo.topology import Cluster -from pymongo.topology_description import CLUSTER_TYPE +from pymongo.topology import Topology +from pymongo.topology_description import TOPOLOGY_TYPE from pymongo.errors import (ConfigurationError, ConnectionFailure) from pymongo.ismaster import IsMaster @@ -34,7 +34,7 @@ from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription from pymongo.server_selectors import (any_server_selector, writable_server_selector) -from pymongo.settings import ClusterSettings +from pymongo.settings import TopologySettings from test import unittest, client_knobs @@ -66,9 +66,9 @@ class MockPool(object): class MockMonitor(object): - def __init__(self, server_description, cluster, pool, cluster_settings): + def __init__(self, server_description, topology, pool, topology_settings): self._server_description = server_description - self._cluster = cluster + self._topology = topology def open(self): pass @@ -80,60 +80,60 @@ class MockMonitor(object): pass -class SetNameDiscoverySettings(ClusterSettings): - def get_cluster_type(self): - return CLUSTER_TYPE.ReplicaSetNoPrimary +class SetNameDiscoverySettings(TopologySettings): + def get_topology_type(self): + return TOPOLOGY_TYPE.ReplicaSetNoPrimary address = ('a', 27017) -def create_mock_cluster(seeds=None, set_name=None, monitor_class=MockMonitor): +def create_mock_topology(seeds=None, set_name=None, monitor_class=MockMonitor): partitioned_seeds = list(imap(common.partition_node, seeds or ['a'])) - cluster_settings = ClusterSettings( + topology_settings = TopologySettings( partitioned_seeds, set_name=set_name, pool_class=MockPool, monitor_class=monitor_class) - c = Cluster(cluster_settings) + c = Topology(topology_settings) c.open() return c -def got_ismaster(cluster, server_address, ismaster_response): +def got_ismaster(topology, server_address, ismaster_response): server_description = ServerDescription( server_address, IsMaster(ismaster_response), MovingAverage([0])) - cluster.on_change(server_description) + topology.on_change(server_description) -def disconnected(cluster, server_address): +def disconnected(topology, server_address): # Create new description of server type Unknown. - cluster.on_change(ServerDescription(server_address)) + topology.on_change(ServerDescription(server_address)) -def get_type(cluster, hostname): - description = cluster.get_server_by_address((hostname, 27017)).description +def get_type(topology, hostname): + description = topology.get_server_by_address((hostname, 27017)).description return description.server_type -class ClusterTest(unittest.TestCase): +class TopologyTest(unittest.TestCase): """Disables periodic monitoring, to make tests deterministic.""" def setUp(self): - super(ClusterTest, self).setUp() + super(TopologyTest, self).setUp() self.client_knobs = client_knobs(heartbeat_frequency=999999) self.client_knobs.enable() def tearDown(self): self.client_knobs.disable() - super(ClusterTest, self).tearDown() + super(TopologyTest, self).tearDown() -class TestSingleServerCluster(ClusterTest): +class TestSingleServerTopology(TopologyTest): def test_direct_connection(self): for server_type, ismaster_response in [ (SERVER_TYPE.RSPrimary, { @@ -170,7 +170,7 @@ class TestSingleServerCluster(ClusterTest): 'ok': 1, 'ismaster': False}), ]: - c = create_mock_cluster() + c = create_mock_topology() # Can't select a server while the only server is of type Unknown. self.assertRaises( @@ -179,8 +179,8 @@ class TestSingleServerCluster(ClusterTest): got_ismaster(c, address, ismaster_response) - # Cluster type never changes. - self.assertEqual(CLUSTER_TYPE.Single, c.description.cluster_type) + # Topology type never changes. + self.assertEqual(TOPOLOGY_TYPE.Single, c.description.topology_type) # No matter whether the server is writable, # select_servers() returns it. @@ -188,14 +188,14 @@ class TestSingleServerCluster(ClusterTest): self.assertEqual(server_type, s.description.server_type) def test_reopen(self): - c = create_mock_cluster() + c = create_mock_topology() # Additional calls are permitted. c.open() c.open() def test_unavailable_seed(self): - c = create_mock_cluster() + c = create_mock_topology() disconnected(c, address) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a')) @@ -206,7 +206,7 @@ class TestSingleServerCluster(ClusterTest): def _check_with_socket(self, sock_info): return IsMaster({'ok': 1}), round_trip_time - c = create_mock_cluster(monitor_class=TestMonitor) + c = create_mock_topology(monitor_class=TestMonitor) s = c.select_server(writable_server_selector) self.assertEqual(1, s.description.round_trip_time) @@ -217,11 +217,11 @@ class TestSingleServerCluster(ClusterTest): self.assertEqual(2, s.description.round_trip_time) -class TestMultiServerCluster(ClusterTest): +class TestMultiServerTopology(TopologyTest): def test_unexpected_host(self): - # Received ismaster response from host not in cluster. + # Received ismaster response from host not in topology. # E.g., a race where the host is removed before it responds. - c = create_mock_cluster(['a', 'b'], set_name='rs') + c = create_mock_topology(['a', 'b'], set_name='rs') # 'b' is not in the set. got_ismaster(c, ('a', 27017), { @@ -244,17 +244,17 @@ class TestMultiServerCluster(ClusterTest): self.assertFalse(c.has_server(('b', 27017))) def test_ghost_seed(self): - c = create_mock_cluster(['a', 'b']) + c = create_mock_topology(['a', 'b']) got_ismaster(c, address, { 'ok': 1, 'ismaster': False, 'isreplicaset': True}) self.assertEqual(SERVER_TYPE.RSGhost, get_type(c, 'a')) - self.assertEqual(CLUSTER_TYPE.Unknown, c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.Unknown, c.description.topology_type) def test_standalone_removed(self): - c = create_mock_cluster(['a', 'b']) + c = create_mock_topology(['a', 'b']) got_ismaster(c, ('a', 27017), { 'ok': 1, 'ismaster': True}) @@ -267,13 +267,13 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(0, len(c.description.server_descriptions())) def test_mongos_ha(self): - c = create_mock_cluster(['a', 'b']) + c = create_mock_topology(['a', 'b']) got_ismaster(c, ('a', 27017), { 'ok': 1, 'ismaster': True, 'msg': 'isdbgrid'}) - self.assertEqual(CLUSTER_TYPE.Sharded, c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.Sharded, c.description.topology_type) got_ismaster(c, ('b', 27017), { 'ok': 1, 'ismaster': True, @@ -283,7 +283,7 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.Mongos, get_type(c, 'b')) def test_non_mongos_server(self): - c = create_mock_cluster(['a', 'b']) + c = create_mock_topology(['a', 'b']) got_ismaster(c, ('a', 27017), { 'ok': 1, 'ismaster': True, @@ -294,7 +294,7 @@ class TestMultiServerCluster(ClusterTest): self.assertFalse(c.has_server(('b', 27017))) def test_rs_discovery(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') # At first, A, B, and C are secondaries. got_ismaster(c, ('a', 27017), { @@ -308,8 +308,8 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'c')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) # Admin removes A, adds a high-priority member D which becomes primary. got_ismaster(c, ('b', 27017), { @@ -325,8 +325,8 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'c')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'd')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) # Primary responds. got_ismaster(c, ('d', 27017), { @@ -342,8 +342,8 @@ class TestMultiServerCluster(ClusterTest): # E is new. self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'e')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) # Stale response from C. got_ismaster(c, ('c', 27017), { @@ -363,7 +363,7 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'e')) def test_reset(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, ('a', 27017), { 'ok': 1, 'ismaster': True, @@ -379,16 +379,16 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) c.reset() self.assertEqual(2, len(c.description.server_descriptions())) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b')) self.assertEqual('rs', c.description.set_name) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) got_ismaster(c, ('a', 27017), { 'ok': 1, @@ -398,11 +398,11 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) def test_reset_server(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, ('a', 27017), { 'ok': 1, 'ismaster': True, @@ -420,8 +420,8 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b')) self.assertEqual('rs', c.description.set_name) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) got_ismaster(c, ('a', 27017), { 'ok': 1, @@ -430,20 +430,20 @@ class TestMultiServerCluster(ClusterTest): 'hosts': ['a', 'b']}) self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a')) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) c.reset_server(('b', 27017)) self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a')) self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b')) self.assertEqual('rs', c.description.set_name) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) def test_reset_removed_server(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') - # No error resetting a server not in the ClusterDescription. + # No error resetting a server not in the TopologyDescription. c.reset_server(('b', 27017)) # Server was *not* added as type Unknown. @@ -451,16 +451,16 @@ class TestMultiServerCluster(ClusterTest): def test_discover_set_name_from_primary(self): # Discovering a replica set without the setName supplied by the user - # is not yet supported by MongoClient, but Cluster can do it. - cluster_settings = SetNameDiscoverySettings( + # is not yet supported by MongoClient, but Topology can do it. + topology_settings = SetNameDiscoverySettings( seeds=[address], pool_class=MockPool, monitor_class=MockMonitor) - c = Cluster(cluster_settings) + c = Topology(topology_settings) self.assertEqual(c.description.set_name, None) - self.assertEqual(c.description.cluster_type, - CLUSTER_TYPE.ReplicaSetNoPrimary) + self.assertEqual(c.description.topology_type, + TOPOLOGY_TYPE.ReplicaSetNoPrimary) got_ismaster(c, address, { 'ok': 1, @@ -469,11 +469,11 @@ class TestMultiServerCluster(ClusterTest): 'hosts': ['a']}) self.assertEqual(c.description.set_name, 'rs') - self.assertEqual(c.description.cluster_type, - CLUSTER_TYPE.ReplicaSetWithPrimary) + self.assertEqual(c.description.topology_type, + TOPOLOGY_TYPE.ReplicaSetWithPrimary) # Another response from the primary. Tests the code that processes - # primary response when cluster type is already ReplicaSetWithPrimary. + # primary response when topology type is already ReplicaSetWithPrimary. got_ismaster(c, address, { 'ok': 1, 'ismaster': True, @@ -482,21 +482,21 @@ class TestMultiServerCluster(ClusterTest): # No change. self.assertEqual(c.description.set_name, 'rs') - self.assertEqual(c.description.cluster_type, - CLUSTER_TYPE.ReplicaSetWithPrimary) + self.assertEqual(c.description.topology_type, + TOPOLOGY_TYPE.ReplicaSetWithPrimary) def test_discover_set_name_from_secondary(self): # Discovering a replica set without the setName supplied by the user - # is not yet supported by MongoClient, but Cluster can do it. - cluster_settings = SetNameDiscoverySettings( + # is not yet supported by MongoClient, but Topology can do it. + topology_settings = SetNameDiscoverySettings( seeds=[address], pool_class=MockPool, monitor_class=MockMonitor) - c = Cluster(cluster_settings) + c = Topology(topology_settings) self.assertEqual(c.description.set_name, None) - self.assertEqual(c.description.cluster_type, - CLUSTER_TYPE.ReplicaSetNoPrimary) + self.assertEqual(c.description.topology_type, + TOPOLOGY_TYPE.ReplicaSetNoPrimary) got_ismaster(c, address, { 'ok': 1, @@ -506,44 +506,44 @@ class TestMultiServerCluster(ClusterTest): 'hosts': ['a']}) self.assertEqual(c.description.set_name, 'rs') - self.assertEqual(c.description.cluster_type, - CLUSTER_TYPE.ReplicaSetNoPrimary) + self.assertEqual(c.description.topology_type, + TOPOLOGY_TYPE.ReplicaSetNoPrimary) def test_primary_disconnect(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, address, { 'ok': 1, 'ismaster': True, 'setName': 'rs', 'hosts': ['a']}) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) disconnected(c, address) self.assertTrue(c.has_server(address)) # Not removed. - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) def test_primary_becomes_standalone(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, address, { 'ok': 1, 'ismaster': True, 'setName': 'rs', 'hosts': ['a']}) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) # An administrator restarts primary as standalone. got_ismaster(c, address, {'ok': 1}) self.assertFalse(c.has_server(address)) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) def test_primary_wrong_set_name(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, address, { 'ok': 1, 'ismaster': True, @@ -551,11 +551,11 @@ class TestMultiServerCluster(ClusterTest): 'hosts': ['a']}) self.assertFalse(c.has_server(address)) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) def test_secondary_wrong_set_name(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') got_ismaster(c, address, { 'ok': 1, 'ismaster': False, @@ -564,11 +564,11 @@ class TestMultiServerCluster(ClusterTest): 'hosts': ['a']}) self.assertFalse(c.has_server(address)) - self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary, + c.description.topology_type) def test_secondary_wrong_set_name_with_primary(self): - c = create_mock_cluster(['a', 'b'], set_name='rs') + c = create_mock_topology(['a', 'b'], set_name='rs') # Find the primary normally. got_ismaster(c, address, { @@ -577,8 +577,8 @@ class TestMultiServerCluster(ClusterTest): 'setName': 'rs', 'hosts': ['a', 'b']}) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) self.assertTrue(c.has_server(('b', 27017))) got_ismaster(c, ('b', 27017), { @@ -590,17 +590,17 @@ class TestMultiServerCluster(ClusterTest): # Secondary removed. self.assertFalse(c.has_server(('b', 27017))) - self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary, - c.description.cluster_type) + self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary, + c.description.topology_type) def test_non_rs_member(self): - c = create_mock_cluster(['a', 'b'], set_name='rs') + c = create_mock_topology(['a', 'b'], set_name='rs') self.assertTrue(c.has_server(('b', 27017))) got_ismaster(c, ('b', 27017), {'ok': 1}) # Standalone is removed. self.assertFalse(c.has_server(('b', 27017))) def test_wire_version(self): - c = create_mock_cluster(set_name='rs') + c = create_mock_topology(set_name='rs') c.description.check_compatible() # No error. got_ismaster(c, address, { @@ -644,7 +644,7 @@ class TestMultiServerCluster(ClusterTest): self.fail('No error with incompatible wire version') def test_max_write_batch_size(self): - c = create_mock_cluster(seeds=['a', 'b'], set_name='rs') + c = create_mock_topology(seeds=['a', 'b'], set_name='rs') def write_batch_size(): s = c.select_server(writable_server_selector) @@ -679,7 +679,7 @@ class TestMultiServerCluster(ClusterTest): self.assertEqual(2, write_batch_size()) -class TestClusterErrors(ClusterTest): +class TestTopologyErrors(TopologyTest): # Errors when calling ismaster. def test_pool_reset(self): @@ -694,7 +694,7 @@ class TestClusterErrors(ClusterTest): else: raise socket.error() - c = create_mock_cluster(monitor_class=TestMonitor) + c = create_mock_topology(monitor_class=TestMonitor) # Await first ismaster call. s = c.select_server(writable_server_selector) self.assertEqual(1, ismaster_count[0]) @@ -716,7 +716,7 @@ class TestClusterErrors(ClusterTest): else: raise socket.error() - c = create_mock_cluster(monitor_class=TestMonitor) + c = create_mock_topology(monitor_class=TestMonitor) # Await first ismaster call. s = c.select_server(writable_server_selector) @@ -737,7 +737,7 @@ class TestClusterErrors(ClusterTest): ismaster_count[0] += 1 raise socket.error() - c = create_mock_cluster(monitor_class=TestMonitor) + c = create_mock_topology(monitor_class=TestMonitor) self.assertRaises( ConnectionFailure, diff --git a/test/utils.py b/test/utils.py index bd2d0ec3d..4c9acfcfb 100644 --- a/test/utils.py +++ b/test/utils.py @@ -374,8 +374,8 @@ def assertReadFromAll(testcase, rsc, members, *args, **kwargs): testcase.assertEqual(members, used) def get_pool(client): - cluster = client._get_cluster() - server = cluster.select_server(writable_server_selector) + topology = client._get_topology() + server = topology.select_server(writable_server_selector) return server.pool def pools_from_rs_client(client): @@ -383,7 +383,7 @@ def pools_from_rs_client(client): """ return [ server.pool for server in - client._get_cluster().select_servers(any_server_selector)] + client._get_topology().select_servers(any_server_selector)] class TestRequestMixin(object): """Inherit from this class and from unittest.TestCase to get some