Replace "Cluster" term with "Topology".
The Server Discovery And Monitoring Spec has replaced the term "cluster" with "topology", since "cluster" in MongoDB terminology should be reserved for sharded clusters. "Topology" describes a single server, replica set, or a set of mongoses. The previous commit replaced "cluster" with "topology" in file names. Here I replace the term in class and variable names.
This commit is contained in:
parent
d464dc49c6
commit
98afef5871
@ -48,9 +48,9 @@ from pymongo import (auth,
|
||||
thread_util,
|
||||
uri_parser)
|
||||
from pymongo.client_options import ClientOptions
|
||||
from pymongo.topology_description import CLUSTER_TYPE
|
||||
from pymongo.topology_description import TOPOLOGY_TYPE
|
||||
from pymongo.cursor_manager import CursorManager
|
||||
from pymongo.topology import Cluster
|
||||
from pymongo.topology import Topology
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
ConnectionFailure,
|
||||
InvalidURI, AutoReconnect, OperationFailure,
|
||||
@ -62,7 +62,7 @@ from pymongo.server_selectors import (any_server_selector,
|
||||
writable_preferred_server_selector,
|
||||
writable_server_selector)
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
from pymongo.settings import ClusterSettings
|
||||
from pymongo.settings import TopologySettings
|
||||
|
||||
|
||||
def _partition_node(node):
|
||||
@ -300,7 +300,7 @@ class MongoClient(common.BaseObject):
|
||||
if creds:
|
||||
self._cache_credentials(creds.source, creds)
|
||||
|
||||
self._cluster_settings = ClusterSettings(
|
||||
self._topology_settings = TopologySettings(
|
||||
seeds=seeds,
|
||||
set_name=options.replica_set_name,
|
||||
pool_class=pool_class,
|
||||
@ -308,9 +308,9 @@ class MongoClient(common.BaseObject):
|
||||
monitor_class=monitor_class,
|
||||
condition_class=condition_class)
|
||||
|
||||
self._cluster = Cluster(self._cluster_settings)
|
||||
self._topology = Topology(self._topology_settings)
|
||||
if connect:
|
||||
self._cluster.open()
|
||||
self._topology.open()
|
||||
|
||||
def _cache_credentials(self, source, credentials, connect=False):
|
||||
"""Save a set of authentication credentials.
|
||||
@ -329,7 +329,7 @@ class MongoClient(common.BaseObject):
|
||||
'to this database. You must logout first.')
|
||||
|
||||
if connect:
|
||||
server = self._get_cluster().select_server(
|
||||
server = self._get_topology().select_server(
|
||||
writable_preferred_server_selector)
|
||||
|
||||
# get_socket() logs out of the database if logged in with old
|
||||
@ -405,7 +405,7 @@ class MongoClient(common.BaseObject):
|
||||
ServerDescription first, then use its properties.
|
||||
"""
|
||||
try:
|
||||
server = self._cluster.select_server(
|
||||
server = self._topology.select_server(
|
||||
writable_server_selector, server_wait_time=0)
|
||||
|
||||
return getattr(server.description, attr_name)
|
||||
@ -452,7 +452,7 @@ class MongoClient(common.BaseObject):
|
||||
MongoClient gained this property in version 3.0 when
|
||||
MongoReplicaSetClient's functionality was merged in.
|
||||
"""
|
||||
return self._cluster.get_primary()
|
||||
return self._topology.get_primary()
|
||||
|
||||
@property
|
||||
def secondaries(self):
|
||||
@ -464,7 +464,7 @@ class MongoClient(common.BaseObject):
|
||||
MongoClient gained this property in version 3.0 when
|
||||
MongoReplicaSetClient's functionality was merged in.
|
||||
"""
|
||||
return self._cluster.get_secondaries()
|
||||
return self._topology.get_secondaries()
|
||||
|
||||
@property
|
||||
def arbiters(self):
|
||||
@ -473,7 +473,7 @@ class MongoClient(common.BaseObject):
|
||||
A sequence of (host, port) pairs. Empty if this client is not
|
||||
connected to a replica set.
|
||||
"""
|
||||
return self._cluster.get_arbiters()
|
||||
return self._topology.get_arbiters()
|
||||
|
||||
@property
|
||||
def is_primary(self):
|
||||
@ -508,7 +508,7 @@ class MongoClient(common.BaseObject):
|
||||
Nodes are either specified when this instance was created,
|
||||
or discovered through the replica set discovery mechanism.
|
||||
"""
|
||||
description = self._cluster.description
|
||||
description = self._topology.description
|
||||
return frozenset(s.address for s in description.known_servers)
|
||||
|
||||
@property
|
||||
@ -587,16 +587,16 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
Can raise ConnectionFailure.
|
||||
"""
|
||||
cluster = self._get_cluster() # Starts monitors if necessary.
|
||||
server = cluster.select_server(writable_server_selector)
|
||||
topology = self._get_topology() # Starts monitors if necessary.
|
||||
server = topology.select_server(writable_server_selector)
|
||||
return server.description.max_wire_version
|
||||
|
||||
def _is_writable(self):
|
||||
"""Attempt to connect to a writable server, or return False.
|
||||
"""
|
||||
cluster = self._get_cluster() # Starts monitors if necessary.
|
||||
topology = self._get_topology() # Starts monitors if necessary.
|
||||
try:
|
||||
s = cluster.select_server(writable_server_selector)
|
||||
s = topology.select_server(writable_server_selector)
|
||||
|
||||
# When directly connected to a secondary, arbiter, etc.,
|
||||
# select_server returns it, whatever the selector. Check
|
||||
@ -612,7 +612,7 @@ class MongoClient(common.BaseObject):
|
||||
pools. If this instance is used again it will be automatically
|
||||
re-opened.
|
||||
"""
|
||||
self._cluster.reset()
|
||||
self._topology.reset()
|
||||
|
||||
def close(self):
|
||||
"""Alias for :meth:`disconnect`
|
||||
@ -644,7 +644,7 @@ class MongoClient(common.BaseObject):
|
||||
# reported an error.
|
||||
try:
|
||||
# TODO: Mongos pinning.
|
||||
server = self._cluster.select_server(
|
||||
server = self._topology.select_server(
|
||||
writable_server_selector,
|
||||
server_wait_time=0)
|
||||
|
||||
@ -678,14 +678,14 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
self.__cursor_manager = manager
|
||||
|
||||
def _get_cluster(self):
|
||||
"""Get the internal :class:`~pymongo.cluster.Cluster` object.
|
||||
def _get_topology(self):
|
||||
"""Get the internal :class:`~pymongo.topology.Topology` object.
|
||||
|
||||
If this client was created with "connect=False", calling _get_cluster
|
||||
If this client was created with "connect=False", calling _get_topology
|
||||
launches the connection process in the background.
|
||||
"""
|
||||
self._cluster.open()
|
||||
return self._cluster
|
||||
self._topology.open()
|
||||
return self._topology
|
||||
|
||||
def __check_gle_response(self, response, is_command):
|
||||
"""Check a response to a lastError message for errors.
|
||||
@ -749,15 +749,15 @@ class MongoClient(common.BaseObject):
|
||||
- `address` (optional): Optional address when sending a getMore or
|
||||
killCursors to a specific server.
|
||||
"""
|
||||
cluster = self._get_cluster()
|
||||
topology = self._get_topology()
|
||||
if address:
|
||||
assert not check_primary, "Can't use check_primary with address"
|
||||
server = cluster.get_server_by_address(address)
|
||||
server = topology.get_server_by_address(address)
|
||||
if not server:
|
||||
raise AutoReconnect('server %s:%d no longer available'
|
||||
% address)
|
||||
else:
|
||||
server = cluster.select_server(writable_server_selector)
|
||||
server = topology.select_server(writable_server_selector)
|
||||
|
||||
is_writable = server.description.is_writable
|
||||
if check_primary and not with_last_error and not is_writable:
|
||||
@ -801,9 +801,9 @@ class MongoClient(common.BaseObject):
|
||||
- `exhaust` (optional): If True, the socket used stays checked out.
|
||||
It is returned along with its Pool in the Response.
|
||||
"""
|
||||
cluster = self._get_cluster()
|
||||
topology = self._get_topology()
|
||||
if address:
|
||||
server = cluster.get_server_by_address(address)
|
||||
server = topology.get_server_by_address(address)
|
||||
if not server:
|
||||
raise AutoReconnect('server %s:%d no longer available'
|
||||
% address)
|
||||
@ -813,7 +813,7 @@ class MongoClient(common.BaseObject):
|
||||
else:
|
||||
selector = writable_server_selector
|
||||
|
||||
server = cluster.select_server(selector)
|
||||
server = topology.select_server(selector)
|
||||
|
||||
if self.in_request() and not server.in_request():
|
||||
server.start_request()
|
||||
@ -844,14 +844,14 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
def __reset_server(self, address):
|
||||
"""Clear our connection pool for a server and mark it Unknown."""
|
||||
self._cluster.reset_server(address)
|
||||
self._topology.reset_server(address)
|
||||
|
||||
def _reset_server_and_request_check(self, address):
|
||||
"""Clear our pool for a server, mark it Unknown, and check it soon."""
|
||||
self._cluster.reset_server(address)
|
||||
server = self._cluster.get_server_by_address(address)
|
||||
self._topology.reset_server(address)
|
||||
server = self._topology.get_server_by_address(address)
|
||||
|
||||
# "server" is None if another thread removed it from the cluster.
|
||||
# "server" is None if another thread removed it from the topology.
|
||||
if server:
|
||||
server.request_check()
|
||||
|
||||
@ -883,7 +883,7 @@ class MongoClient(common.BaseObject):
|
||||
# lazily. These greedy calls are to make PyMongo 2.x's request
|
||||
# tests pass.
|
||||
try:
|
||||
servers = self._cluster.select_servers(any_server_selector,
|
||||
servers = self._topology.select_servers(any_server_selector,
|
||||
server_wait_time=0)
|
||||
|
||||
for s in servers:
|
||||
@ -908,7 +908,7 @@ class MongoClient(common.BaseObject):
|
||||
"""
|
||||
if 0 == self.__request_counter.dec():
|
||||
try:
|
||||
servers = self._cluster.select_servers(any_server_selector,
|
||||
servers = self._topology.select_servers(any_server_selector,
|
||||
server_wait_time=0)
|
||||
|
||||
for s in servers:
|
||||
@ -926,7 +926,7 @@ class MongoClient(common.BaseObject):
|
||||
return not self == other
|
||||
|
||||
def __repr__(self):
|
||||
server_descriptions = self._cluster.description.server_descriptions()
|
||||
server_descriptions = self._topology.description.server_descriptions()
|
||||
if len(server_descriptions) == 1:
|
||||
description, = server_descriptions.values()
|
||||
return "MongoClient(%r, %r)" % description.address
|
||||
@ -976,15 +976,15 @@ class MongoClient(common.BaseObject):
|
||||
# TODO: update this, pass address to cursor_manager.close().
|
||||
# PyMongo 2.x introduced a configurable CursorManager which sends
|
||||
# OP_KILLCURSORS to the server. The API doesn't handle a multi-server
|
||||
# cluster, where we must pass the address of the server that receives
|
||||
# topology, where we must pass the address of the server that receives
|
||||
# the message. Support CursorManager for backwards compatibility, but
|
||||
# only for single servers.
|
||||
if self.__cursor_manager:
|
||||
cluster_type = self._cluster.description.cluster_type
|
||||
if cluster_type not in (CLUSTER_TYPE.Single, CLUSTER_TYPE.Sharded):
|
||||
topology_type = self._topology.description.topology_type
|
||||
if topology_type not in (TOPOLOGY_TYPE.Single, TOPOLOGY_TYPE.Sharded):
|
||||
raise InvalidOperation(
|
||||
"Can't use custom CursorManager with cluster type %s" %
|
||||
CLUSTER_TYPE._fields[cluster_type])
|
||||
"Can't use custom CursorManager with topology type %s" %
|
||||
TOPOLOGY_TYPE._fields[topology_type])
|
||||
|
||||
self.__cursor_manager.close(cursor_id)
|
||||
else:
|
||||
@ -1079,9 +1079,9 @@ class MongoClient(common.BaseObject):
|
||||
if from_host is not None:
|
||||
command["fromhost"] = from_host
|
||||
|
||||
# _get_cluster() starts connecting, if we initialized with
|
||||
# _get_topology() starts connecting, if we initialized with
|
||||
# connect=False.
|
||||
server = self._get_cluster().select_server(
|
||||
server = self._get_topology().select_server(
|
||||
writable_server_selector)
|
||||
|
||||
if self.in_request() and not server.in_request():
|
||||
|
||||
@ -45,6 +45,6 @@ class MongoReplicaSetClient(mongo_client.MongoClient):
|
||||
super(MongoReplicaSetClient, self).__init__(*args, **kwargs)
|
||||
|
||||
def __repr__(self):
|
||||
sds = self._cluster.description.server_descriptions()
|
||||
sds = self._topology.description.server_descriptions()
|
||||
return "MongoReplicaSetClient(%r)" % (["%s:%d" % s.address
|
||||
for s in sds.values()],)
|
||||
|
||||
@ -31,22 +31,22 @@ class Monitor(object):
|
||||
def __init__(
|
||||
self,
|
||||
server_description,
|
||||
cluster,
|
||||
topology,
|
||||
pool,
|
||||
cluster_settings):
|
||||
topology_settings):
|
||||
"""Class to monitor a MongoDB server on a background thread.
|
||||
|
||||
Pass an initial ServerDescription, a Cluster, a Pool, and a
|
||||
ClusterSettings.
|
||||
Pass an initial ServerDescription, a Topology, a Pool, and
|
||||
TopologySettings.
|
||||
|
||||
The Cluster is weakly referenced. The Pool must be exclusive to this
|
||||
The Topology is weakly referenced. The Pool must be exclusive to this
|
||||
Monitor.
|
||||
"""
|
||||
super(Monitor, self).__init__()
|
||||
self._server_description = server_description
|
||||
self._cluster = weakref.proxy(cluster)
|
||||
self._topology = weakref.proxy(topology)
|
||||
self._pool = pool
|
||||
self._settings = cluster_settings
|
||||
self._settings = topology_settings
|
||||
self._stopped = False
|
||||
self._event = thread_util.Event(self._settings.condition_class)
|
||||
self._thread = None
|
||||
@ -96,9 +96,9 @@ class Monitor(object):
|
||||
while not self._stopped:
|
||||
try:
|
||||
self._server_description = self._check_with_retry()
|
||||
self._cluster.on_change(self._server_description)
|
||||
self._topology.on_change(self._server_description)
|
||||
except ReferenceError:
|
||||
# Cluster was garbage-collected.
|
||||
# Topology was garbage-collected.
|
||||
self.close()
|
||||
else:
|
||||
start = time.time() # TODO: monotonic.
|
||||
@ -122,7 +122,7 @@ class Monitor(object):
|
||||
if new_server_description:
|
||||
return new_server_description
|
||||
else:
|
||||
self._cluster.reset_pool(self._server_description.address)
|
||||
self._topology.reset_pool(self._server_description.address)
|
||||
if retry:
|
||||
server_description = self._check_once()
|
||||
if server_description:
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License.
|
||||
|
||||
"""Communicate with one MongoDB server in a cluster."""
|
||||
"""Communicate with one MongoDB server in a topology."""
|
||||
|
||||
import socket
|
||||
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Represent one server in the cluster."""
|
||||
"""Represent one server in the topology."""
|
||||
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
from pymongo.ismaster import IsMaster
|
||||
|
||||
@ -17,12 +17,12 @@
|
||||
import threading
|
||||
|
||||
from pymongo import common, monitor, pool
|
||||
from pymongo.topology_description import CLUSTER_TYPE
|
||||
from pymongo.topology_description import TOPOLOGY_TYPE
|
||||
from pymongo.pool import PoolOptions
|
||||
from pymongo.server_description import ServerDescription
|
||||
|
||||
|
||||
class ClusterSettings(object):
|
||||
class TopologySettings(object):
|
||||
def __init__(
|
||||
self,
|
||||
seeds=None,
|
||||
@ -77,13 +77,13 @@ class ClusterSettings(object):
|
||||
"""
|
||||
return self._direct
|
||||
|
||||
def get_cluster_type(self):
|
||||
def get_topology_type(self):
|
||||
if self.direct:
|
||||
return CLUSTER_TYPE.Single
|
||||
return TOPOLOGY_TYPE.Single
|
||||
elif self.set_name is not None:
|
||||
return CLUSTER_TYPE.ReplicaSetNoPrimary
|
||||
return TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
||||
else:
|
||||
return CLUSTER_TYPE.Unknown
|
||||
return TOPOLOGY_TYPE.Unknown
|
||||
|
||||
def get_server_descriptions(self):
|
||||
"""Initial dict of (address, ServerDescription) for all seeds."""
|
||||
|
||||
@ -229,7 +229,7 @@ class Event(object):
|
||||
"""Copy of standard threading.Event, but uses a custom condition class.
|
||||
|
||||
Allows async frameworks to override monitors' synchronization behavior
|
||||
with ClusterSettings.condition_class.
|
||||
with TopologySettings.condition_class.
|
||||
|
||||
Copied from CPython's threading.py at hash c7960cc9.
|
||||
"""
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License.
|
||||
|
||||
"""Internal classes to monitor clusters of one or more servers."""
|
||||
"""Internal class to monitor a topology of one or more servers."""
|
||||
|
||||
import random
|
||||
import threading
|
||||
@ -20,9 +20,9 @@ import time
|
||||
|
||||
from bson.py3compat import itervalues
|
||||
from pymongo import common
|
||||
from pymongo.topology_description import (updated_cluster_description,
|
||||
CLUSTER_TYPE,
|
||||
ClusterDescription)
|
||||
from pymongo.topology_description import (updated_topology_description,
|
||||
TOPOLOGY_TYPE,
|
||||
TopologyDescription)
|
||||
from pymongo.errors import AutoReconnect
|
||||
from pymongo.server import Server
|
||||
from pymongo.server_selectors import (arbiter_server_selector,
|
||||
@ -30,16 +30,16 @@ from pymongo.server_selectors import (arbiter_server_selector,
|
||||
writable_server_selector)
|
||||
|
||||
|
||||
class Cluster(object):
|
||||
"""Monitor a cluster of one or more servers."""
|
||||
def __init__(self, cluster_settings):
|
||||
self._settings = cluster_settings
|
||||
cluster_description = ClusterDescription(
|
||||
cluster_settings.get_cluster_type(),
|
||||
cluster_settings.get_server_descriptions(),
|
||||
cluster_settings.set_name)
|
||||
class Topology(object):
|
||||
"""Monitor a topology of one or more servers."""
|
||||
def __init__(self, topology_settings):
|
||||
self._settings = topology_settings
|
||||
topology_description = TopologyDescription(
|
||||
topology_settings.get_topology_type(),
|
||||
topology_settings.get_server_descriptions(),
|
||||
topology_settings.set_name)
|
||||
|
||||
self._description = cluster_description
|
||||
self._description = topology_description
|
||||
self._opened = False
|
||||
self._lock = threading.Lock()
|
||||
self._condition = self._settings.condition_class(self._lock)
|
||||
@ -82,19 +82,20 @@ class Cluster(object):
|
||||
# No suitable servers.
|
||||
if wait_time == 0 or now > end_time:
|
||||
# TODO: more error diagnostics. E.g., if state is
|
||||
# ReplicaSet but every server is Unknown, and the host list
|
||||
# is non-empty, and doesn't intersect with settings.seeds,
|
||||
# the set is probably configured with internal hostnames or
|
||||
# IPs and we're connecting from outside. Or if state is
|
||||
# ReplicaSet and clusterDescription.server_descriptions is
|
||||
# empty, we have the wrong set_name. Include
|
||||
# ClusterDescription's stringification in exception msg.
|
||||
# ReplicaSet but every server is Unknown, and the host
|
||||
# list is non-empty, and doesn't intersect with
|
||||
# settings.seeds, the set is probably configured with
|
||||
# internal hostnames or IPs and we're connecting from
|
||||
# outside. Or if we're a replica set and
|
||||
# server_descriptions is empty, we have the wrong
|
||||
# set_name. Include TopologyDescription's str() in
|
||||
# exception msg.
|
||||
raise AutoReconnect("No suitable servers available")
|
||||
|
||||
self._ensure_opened()
|
||||
self._request_check_all()
|
||||
|
||||
# Release the lock and wait for the cluster description to
|
||||
# Release the lock and wait for the topology description to
|
||||
# change, or for a timeout. We won't miss any changes that
|
||||
# came after our most recent selector() call, since we've
|
||||
# held the lock until now.
|
||||
@ -114,12 +115,12 @@ class Cluster(object):
|
||||
"""Process a new ServerDescription after an ismaster call completes."""
|
||||
# We do no I/O holding the lock.
|
||||
with self._lock:
|
||||
# Any monitored server was definitely in the cluster description
|
||||
# Any monitored server was definitely in the topology description
|
||||
# once. Check if it's still in the description or if some state-
|
||||
# change removed it. E.g., we got a host list from the primary
|
||||
# that didn't include this server.
|
||||
if self._description.has_server(server_description.address):
|
||||
self._description = updated_cluster_description(
|
||||
self._description = updated_topology_description(
|
||||
self._description, server_description)
|
||||
|
||||
self._update_servers()
|
||||
@ -136,10 +137,10 @@ class Cluster(object):
|
||||
|
||||
def get_primary(self):
|
||||
"""Return primary's address or None."""
|
||||
# Implemented here in Cluster instead of MongoClient, so it can lock.
|
||||
# Implemented here in Topology instead of MongoClient, so it can lock.
|
||||
with self._lock:
|
||||
cluster_type = self._description.cluster_type
|
||||
if cluster_type != CLUSTER_TYPE.ReplicaSetWithPrimary:
|
||||
topology_type = self._description.topology_type
|
||||
if topology_type != TOPOLOGY_TYPE.ReplicaSetWithPrimary:
|
||||
return None
|
||||
|
||||
description = writable_server_selector(
|
||||
@ -149,11 +150,11 @@ class Cluster(object):
|
||||
|
||||
def _get_replica_set_members(self, selector):
|
||||
"""Return set of replica set member addresses."""
|
||||
# Implemented here in Cluster instead of MongoClient, so it can lock.
|
||||
# Implemented here in Topology instead of MongoClient, so it can lock.
|
||||
with self._lock:
|
||||
cluster_type = self._description.cluster_type
|
||||
if cluster_type not in (CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
CLUSTER_TYPE.ReplicaSetNoPrimary):
|
||||
topology_type = self._description.topology_type
|
||||
if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary):
|
||||
return []
|
||||
|
||||
descriptions = selector(self._description.known_servers)
|
||||
@ -187,7 +188,7 @@ class Cluster(object):
|
||||
with self._lock:
|
||||
server = self._servers.get(address)
|
||||
|
||||
# "server" is None if another thread removed it from the cluster.
|
||||
# "server" is None if another thread removed it from the topology.
|
||||
if server:
|
||||
server.pool.reset()
|
||||
|
||||
@ -198,7 +199,7 @@ class Cluster(object):
|
||||
def reset(self):
|
||||
"""Reset all pools and disconnect from all servers.
|
||||
|
||||
The cluster reconnects on demand, or after common.HEARTBEAT_FREQUENCY
|
||||
The Topology reconnects on demand, or after common.HEARTBEAT_FREQUENCY
|
||||
seconds.
|
||||
"""
|
||||
with self._lock:
|
||||
@ -232,14 +233,14 @@ class Cluster(object):
|
||||
server.request_check()
|
||||
|
||||
def _apply_selector(self, selector):
|
||||
if self._description.cluster_type == CLUSTER_TYPE.Single:
|
||||
if self._description.topology_type == TOPOLOGY_TYPE.Single:
|
||||
# Ignore the selector.
|
||||
return self._description.known_servers
|
||||
else:
|
||||
return selector(self._description.known_servers)
|
||||
|
||||
def _update_servers(self):
|
||||
"""Sync our set of Servers from ClusterDescription.server_descriptions.
|
||||
"""Sync our set of Servers from TopologyDescription.server_descriptions.
|
||||
|
||||
Hold the lock while calling this.
|
||||
"""
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# implied. See the License for the specific language governing
|
||||
# permissions and limitations under the License.
|
||||
|
||||
"""Represent the cluster of servers."""
|
||||
"""Represent the topology of servers."""
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
@ -22,22 +22,22 @@ from pymongo.errors import ConfigurationError
|
||||
from pymongo.server_description import ServerDescription
|
||||
|
||||
|
||||
CLUSTER_TYPE = namedtuple('ClusterType', ['Single', 'ReplicaSetNoPrimary',
|
||||
'ReplicaSetWithPrimary', 'Sharded',
|
||||
'Unknown'])(*range(5))
|
||||
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
|
||||
'ReplicaSetWithPrimary', 'Sharded',
|
||||
'Unknown'])(*range(5))
|
||||
|
||||
|
||||
class ClusterDescription(object):
|
||||
def __init__(self, cluster_type, server_descriptions, set_name):
|
||||
"""Represent a cluster of servers.
|
||||
class TopologyDescription(object):
|
||||
def __init__(self, topology_type, server_descriptions, set_name):
|
||||
"""Represent a topology of servers.
|
||||
|
||||
:Parameters:
|
||||
- `cluster_type`: initial type
|
||||
- `topology_type`: initial type
|
||||
- `server_descriptions`: dict of (address, ServerDescription) for
|
||||
all seeds
|
||||
- `set_name`: replica set name or None
|
||||
"""
|
||||
self._cluster_type = cluster_type
|
||||
self._topology_type = topology_type
|
||||
self._set_name = set_name
|
||||
self._server_descriptions = server_descriptions
|
||||
|
||||
@ -80,33 +80,33 @@ class ClusterDescription(object):
|
||||
# The default ServerDescription's type is Unknown.
|
||||
sds[address] = ServerDescription(address)
|
||||
|
||||
if self._cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary:
|
||||
cluster_type = _check_has_primary(sds)
|
||||
if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
|
||||
topology_type = _check_has_primary(sds)
|
||||
else:
|
||||
cluster_type = self._cluster_type
|
||||
topology_type = self._topology_type
|
||||
|
||||
return ClusterDescription(cluster_type, sds, self._set_name)
|
||||
return TopologyDescription(topology_type, sds, self._set_name)
|
||||
|
||||
def reset(self):
|
||||
"""A copy of this description, with all servers marked Unknown."""
|
||||
if self._cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary:
|
||||
cluster_type = CLUSTER_TYPE.ReplicaSetNoPrimary
|
||||
if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
|
||||
topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
||||
else:
|
||||
cluster_type = self._cluster_type
|
||||
topology_type = self._topology_type
|
||||
|
||||
# The default ServerDescription's type is Unknown.
|
||||
sds = dict((address, ServerDescription(address))
|
||||
for address in self._server_descriptions)
|
||||
|
||||
return ClusterDescription(cluster_type, sds, self._set_name)
|
||||
return TopologyDescription(topology_type, sds, self._set_name)
|
||||
|
||||
def server_descriptions(self):
|
||||
"""Dict of (address, ServerDescription)."""
|
||||
return self._server_descriptions.copy()
|
||||
|
||||
@property
|
||||
def cluster_type(self):
|
||||
return self._cluster_type
|
||||
def topology_type(self):
|
||||
return self._topology_type
|
||||
|
||||
@property
|
||||
def set_name(self):
|
||||
@ -120,103 +120,103 @@ class ClusterDescription(object):
|
||||
if s.is_server_type_known]
|
||||
|
||||
|
||||
# If cluster type is Unknown and we receive an ismaster response, what should
|
||||
# the new cluster type be?
|
||||
_SERVER_TYPE_TO_CLUSTER_TYPE = {
|
||||
SERVER_TYPE.Mongos: CLUSTER_TYPE.Sharded,
|
||||
SERVER_TYPE.RSPrimary: CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
SERVER_TYPE.RSSecondary: CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
SERVER_TYPE.RSArbiter: CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
SERVER_TYPE.RSOther: CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
# If topology type is Unknown and we receive an ismaster response, what should
|
||||
# the new topology type be?
|
||||
_SERVER_TYPE_TO_TOPOLOGY_TYPE = {
|
||||
SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded,
|
||||
SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
}
|
||||
|
||||
|
||||
def updated_cluster_description(cluster_description, server_description):
|
||||
"""Return an updated copy of a ClusterDescription.
|
||||
def updated_topology_description(topology_description, server_description):
|
||||
"""Return an updated copy of a TopologyDescription.
|
||||
|
||||
:Parameters:
|
||||
- `cluster_description`: the current ClusterDescription
|
||||
- `topology_description`: the current TopologyDescription
|
||||
- `server_description`: a new ServerDescription that resulted from
|
||||
an ismaster call
|
||||
|
||||
Called after attempting (successfully or not) to call ismaster on the
|
||||
server at server_description.address. Does not modify cluster_description.
|
||||
server at server_description.address. Does not modify topology_description.
|
||||
"""
|
||||
address = server_description.address
|
||||
|
||||
# These values will be updated, if necessary, to form the new
|
||||
# ClusterDescription.
|
||||
cluster_type = cluster_description.cluster_type
|
||||
set_name = cluster_description.set_name
|
||||
# TopologyDescription.
|
||||
topology_type = topology_description.topology_type
|
||||
set_name = topology_description.set_name
|
||||
server_type = server_description.server_type
|
||||
|
||||
# Don't mutate the original dict of server descriptions; copy it.
|
||||
sds = cluster_description.server_descriptions()
|
||||
sds = topology_description.server_descriptions()
|
||||
|
||||
# Replace this server's description with the new one.
|
||||
sds[address] = server_description
|
||||
|
||||
if cluster_type == CLUSTER_TYPE.Single:
|
||||
if topology_type == TOPOLOGY_TYPE.Single:
|
||||
# Single type never changes.
|
||||
return ClusterDescription(CLUSTER_TYPE.Single, sds, set_name)
|
||||
return TopologyDescription(TOPOLOGY_TYPE.Single, sds, set_name)
|
||||
|
||||
if cluster_type == CLUSTER_TYPE.Unknown:
|
||||
if topology_type == TOPOLOGY_TYPE.Unknown:
|
||||
if server_type == SERVER_TYPE.Standalone:
|
||||
sds.pop(address)
|
||||
|
||||
elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost):
|
||||
cluster_type = _SERVER_TYPE_TO_CLUSTER_TYPE[server_type]
|
||||
topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type]
|
||||
|
||||
if cluster_type == CLUSTER_TYPE.Sharded:
|
||||
if topology_type == TOPOLOGY_TYPE.Sharded:
|
||||
if server_type != SERVER_TYPE.Mongos:
|
||||
sds.pop(address)
|
||||
|
||||
elif cluster_type == CLUSTER_TYPE.ReplicaSetNoPrimary:
|
||||
elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary:
|
||||
if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
|
||||
sds.pop(address)
|
||||
|
||||
elif server_type == SERVER_TYPE.RSPrimary:
|
||||
cluster_type, set_name = _update_rs_from_primary(
|
||||
topology_type, set_name = _update_rs_from_primary(
|
||||
sds, set_name, server_description)
|
||||
|
||||
elif server_type in (
|
||||
SERVER_TYPE.RSSecondary,
|
||||
SERVER_TYPE.RSArbiter,
|
||||
SERVER_TYPE.RSOther):
|
||||
cluster_type, set_name = _update_rs_no_primary_from_member(
|
||||
topology_type, set_name = _update_rs_no_primary_from_member(
|
||||
sds, set_name, server_description)
|
||||
|
||||
elif cluster_type == CLUSTER_TYPE.ReplicaSetWithPrimary:
|
||||
elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
|
||||
if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
|
||||
sds.pop(address)
|
||||
cluster_type = _check_has_primary(sds)
|
||||
topology_type = _check_has_primary(sds)
|
||||
|
||||
elif server_type == SERVER_TYPE.RSPrimary:
|
||||
cluster_type, set_name = _update_rs_from_primary(
|
||||
topology_type, set_name = _update_rs_from_primary(
|
||||
sds, set_name, server_description)
|
||||
|
||||
elif server_type in (
|
||||
SERVER_TYPE.RSSecondary,
|
||||
SERVER_TYPE.RSArbiter,
|
||||
SERVER_TYPE.RSOther):
|
||||
cluster_type = _update_rs_with_primary_from_member(
|
||||
topology_type = _update_rs_with_primary_from_member(
|
||||
sds, set_name, server_description)
|
||||
|
||||
else:
|
||||
# Server type is Unknown or RSGhost: did we just lose the primary?
|
||||
cluster_type = _check_has_primary(sds)
|
||||
topology_type = _check_has_primary(sds)
|
||||
|
||||
# Return updated copy.
|
||||
return ClusterDescription(cluster_type, sds, set_name)
|
||||
return TopologyDescription(topology_type, sds, set_name)
|
||||
|
||||
|
||||
def _update_rs_from_primary(sds, set_name, server_description):
|
||||
"""Update cluster description from a primary's ismaster response.
|
||||
"""Update topology description from a primary's ismaster response.
|
||||
|
||||
Pass in a dict of ServerDescriptions, current replica set name, and the
|
||||
ServerDescription we are processing.
|
||||
|
||||
Returns (new cluster type, new set_name).
|
||||
Returns (new topology type, new set_name).
|
||||
"""
|
||||
if set_name is None:
|
||||
set_name = server_description.set_name
|
||||
@ -258,7 +258,7 @@ def _update_rs_with_primary_from_member(sds, set_name, server_description):
|
||||
Pass in a dict of ServerDescriptions, current replica set name, and the
|
||||
ServerDescription we are processing.
|
||||
|
||||
Returns new cluster type.
|
||||
Returns new topology type.
|
||||
"""
|
||||
assert set_name is not None
|
||||
|
||||
@ -275,15 +275,15 @@ def _update_rs_no_primary_from_member(sds, set_name, server_description):
|
||||
Pass in a dict of ServerDescriptions, current replica set name, and the
|
||||
ServerDescription we are processing.
|
||||
|
||||
Returns (new cluster type, new set_name).
|
||||
Returns (new topology type, new set_name).
|
||||
"""
|
||||
cluster_type = CLUSTER_TYPE.ReplicaSetNoPrimary
|
||||
topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
||||
if set_name is None:
|
||||
set_name = server_description.set_name
|
||||
|
||||
elif set_name != server_description.set_name:
|
||||
sds.pop(server_description.address)
|
||||
return cluster_type, set_name
|
||||
return topology_type, set_name
|
||||
|
||||
# This isn't the primary's response, so don't remove any servers
|
||||
# it doesn't report. Only add new servers.
|
||||
@ -291,18 +291,18 @@ def _update_rs_no_primary_from_member(sds, set_name, server_description):
|
||||
if address not in sds:
|
||||
sds[address] = ServerDescription(address)
|
||||
|
||||
return cluster_type, set_name
|
||||
return topology_type, set_name
|
||||
|
||||
|
||||
def _check_has_primary(sds):
|
||||
"""Current cluster type is ReplicaSetWithPrimary. Is primary still known?
|
||||
"""Current topology type is ReplicaSetWithPrimary. Is primary still known?
|
||||
|
||||
Pass in a dict of ServerDescriptions.
|
||||
|
||||
Returns new cluster type.
|
||||
Returns new topology type.
|
||||
"""
|
||||
for s in sds.values():
|
||||
if s.server_type == SERVER_TYPE.RSPrimary:
|
||||
return CLUSTER_TYPE.ReplicaSetWithPrimary
|
||||
return TOPOLOGY_TYPE.ReplicaSetWithPrimary
|
||||
else:
|
||||
return CLUSTER_TYPE.ReplicaSetNoPrimary
|
||||
return TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
||||
|
||||
@ -975,9 +975,9 @@ class TestReplicaSetRequest(HATestCase):
|
||||
|
||||
self.assertTrue(self.c.in_request())
|
||||
|
||||
cluster = self.c._get_cluster()
|
||||
primary_pool = cluster.select_server(writable_server_selector).pool
|
||||
secondary_pool = cluster.select_server(secondary_server_selector).pool
|
||||
topology = self.c._get_topology()
|
||||
primary_pool = topology.select_server(writable_server_selector).pool
|
||||
secondary_pool = topology.select_server(secondary_server_selector).pool
|
||||
|
||||
# Trigger start_request on primary pool
|
||||
utils.assertReadFrom(self, self.c, primary, PRIMARY)
|
||||
|
||||
@ -60,9 +60,9 @@ class MockMonitor(Monitor):
|
||||
self,
|
||||
client,
|
||||
server_description,
|
||||
cluster,
|
||||
topology,
|
||||
pool,
|
||||
cluster_settings):
|
||||
topology_settings):
|
||||
# MockMonitor gets a 'client' arg, regular monitors don't.
|
||||
self.client = client
|
||||
self.mock_address = server_description.address
|
||||
@ -71,9 +71,9 @@ class MockMonitor(Monitor):
|
||||
Monitor.__init__(
|
||||
self,
|
||||
ServerDescription((default_host, default_port)),
|
||||
cluster,
|
||||
topology,
|
||||
pool,
|
||||
cluster_settings)
|
||||
topology_settings)
|
||||
|
||||
def _check_once(self):
|
||||
try:
|
||||
@ -90,10 +90,10 @@ class MockClient(MongoClient):
|
||||
def __init__(
|
||||
self, standalones, members, mongoses, ismaster_hosts=None,
|
||||
*args, **kwargs):
|
||||
"""A MongoClient connected to the default server, with a mock cluster.
|
||||
"""A MongoClient connected to the default server, with a mock topology.
|
||||
|
||||
standalones, members, mongoses determine the configuration of the
|
||||
cluster. They are formatted like ['a:1', 'b:2']. ismaster_hosts
|
||||
topology. They are formatted like ['a:1', 'b:2']. ismaster_hosts
|
||||
provides an alternative host list for the server's mocked ismaster
|
||||
response; see test_connect_with_internal_ips.
|
||||
"""
|
||||
|
||||
@ -453,7 +453,7 @@ class TestClient(IntegrationTest, TestRequestMixin):
|
||||
|
||||
def f(pipe):
|
||||
try:
|
||||
servers = self.client._cluster.select_servers(
|
||||
servers = self.client._topology.select_servers(
|
||||
any_server_selector)
|
||||
|
||||
# In child, only the thread that called fork() is alive.
|
||||
@ -935,13 +935,13 @@ class TestClientProperties(MockClientTest):
|
||||
connect=False)
|
||||
|
||||
c.set_wire_version_range('a:1', 1, 5)
|
||||
c._get_cluster().select_servers(writable_server_selector) # Connect.
|
||||
c._get_topology().select_servers(writable_server_selector) # Connect.
|
||||
self.assertEqual(c.min_wire_version, 1)
|
||||
self.assertEqual(c.max_wire_version, 5)
|
||||
|
||||
c.set_wire_version_range('a:1', 10, 11)
|
||||
c.disconnect()
|
||||
c._get_cluster()
|
||||
c._get_topology()
|
||||
self.assertRaises(ConfigurationError, c.db.collection.find_one)
|
||||
|
||||
def test_max_wire_version(self):
|
||||
@ -1063,7 +1063,7 @@ class TestMongoClientFailover(MockClientTest):
|
||||
c.disconnect()
|
||||
self.assertEqual(0, len(c.nodes))
|
||||
|
||||
c._get_cluster().select_servers(writable_server_selector)
|
||||
c._get_topology().select_servers(writable_server_selector)
|
||||
self.assertEqual('b', c.host)
|
||||
self.assertEqual(2, c.port)
|
||||
|
||||
@ -1092,7 +1092,7 @@ class TestMongoClientFailover(MockClientTest):
|
||||
|
||||
# But it can reconnect.
|
||||
c.revive_host('a:1')
|
||||
c._get_cluster().select_servers(writable_server_selector)
|
||||
c._get_topology().select_servers(writable_server_selector)
|
||||
self.assertEqual('a', c.host)
|
||||
self.assertEqual(1, c.port)
|
||||
|
||||
@ -1116,7 +1116,7 @@ class TestMongoClientFailover(MockClientTest):
|
||||
connected(c)
|
||||
wait_until(lambda: len(c.nodes) == 2, 'connect')
|
||||
|
||||
sd = c._get_cluster().get_server_by_address(('a', 1)).description
|
||||
sd = c._get_topology().get_server_by_address(('a', 1)).description
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, sd.server_type)
|
||||
self.assertEqual(0, sd.min_wire_version)
|
||||
self.assertEqual(1, sd.max_wire_version)
|
||||
@ -1127,13 +1127,13 @@ class TestMongoClientFailover(MockClientTest):
|
||||
self.assertRaises(AutoReconnect, c.db.collection.find_one)
|
||||
|
||||
# The primary's description is reset.
|
||||
sd_a = c._get_cluster().get_server_by_address(('a', 1)).description
|
||||
sd_a = c._get_topology().get_server_by_address(('a', 1)).description
|
||||
self.assertEqual(SERVER_TYPE.Unknown, sd_a.server_type)
|
||||
self.assertEqual(0, sd_a.min_wire_version)
|
||||
self.assertEqual(0, sd_a.max_wire_version)
|
||||
|
||||
# ...but not the secondary's.
|
||||
sd_b = c._get_cluster().get_server_by_address(('b', 2)).description
|
||||
sd_b = c._get_topology().get_server_by_address(('b', 2)).description
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, sd_b.server_type)
|
||||
self.assertEqual(0, sd_b.min_wire_version)
|
||||
self.assertEqual(2, sd_b.max_wire_version)
|
||||
|
||||
@ -77,7 +77,7 @@ class TestCursorManager(IntegrationTest, TestRequestMixin):
|
||||
|
||||
@client_context.require_replica_set
|
||||
def test_cursor_manager_prohibited_with_rs(self):
|
||||
# Test that kill_cursors() throws an error while the cluster type
|
||||
# Test that kill_cursors() throws an error while the topology type
|
||||
# isn't Single or Sharded.
|
||||
client = get_client(connection_string(),
|
||||
replicaSet=client_context.setname)
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Test the cluster module."""
|
||||
"""Test the topology module."""
|
||||
|
||||
import json
|
||||
import os
|
||||
@ -22,12 +22,12 @@ import threading
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from pymongo import common
|
||||
from pymongo.topology import Cluster
|
||||
from pymongo.topology_description import CLUSTER_TYPE
|
||||
from pymongo.topology import Topology
|
||||
from pymongo.topology_description import TOPOLOGY_TYPE
|
||||
from pymongo.ismaster import IsMaster
|
||||
from pymongo.read_preferences import MovingAverage
|
||||
from pymongo.server_description import ServerDescription, SERVER_TYPE
|
||||
from pymongo.settings import ClusterSettings
|
||||
from pymongo.settings import TopologySettings
|
||||
from pymongo.uri_parser import parse_uri
|
||||
from test import unittest
|
||||
|
||||
@ -65,9 +65,9 @@ class MockPool(object):
|
||||
|
||||
|
||||
class MockMonitor(object):
|
||||
def __init__(self, server_description, cluster, pool, cluster_settings):
|
||||
def __init__(self, server_description, topology, pool, topology_settings):
|
||||
self._server_description = server_description
|
||||
self._cluster = cluster
|
||||
self._topology = topology
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
@ -79,7 +79,7 @@ class MockMonitor(object):
|
||||
pass
|
||||
|
||||
|
||||
def create_mock_cluster(uri, monitor_class=MockMonitor):
|
||||
def create_mock_topology(uri, monitor_class=MockMonitor):
|
||||
# Some tests in the spec include URIs like mongodb://A/?connect=direct,
|
||||
# but PyMongo considers any single-seed URI with no setName to be "direct".
|
||||
parsed_uri = parse_uri(uri.replace('connect=direct', ''))
|
||||
@ -87,28 +87,28 @@ def create_mock_cluster(uri, monitor_class=MockMonitor):
|
||||
if 'replicaset' in parsed_uri['options']:
|
||||
set_name = parsed_uri['options']['replicaset']
|
||||
|
||||
cluster_settings = ClusterSettings(
|
||||
topology_settings = TopologySettings(
|
||||
parsed_uri['nodelist'],
|
||||
set_name=set_name,
|
||||
pool_class=MockPool,
|
||||
monitor_class=monitor_class)
|
||||
|
||||
c = Cluster(cluster_settings)
|
||||
c = Topology(topology_settings)
|
||||
c.open()
|
||||
return c
|
||||
|
||||
|
||||
def got_ismaster(cluster, server_address, ismaster_response):
|
||||
def got_ismaster(topology, server_address, ismaster_response):
|
||||
server_description = ServerDescription(
|
||||
server_address,
|
||||
IsMaster(ismaster_response),
|
||||
MovingAverage([0]))
|
||||
|
||||
cluster.on_change(server_description)
|
||||
topology.on_change(server_description)
|
||||
|
||||
|
||||
def get_type(cluster, hostname):
|
||||
description = cluster.get_server_by_address((hostname, 27017)).description
|
||||
def get_type(topology, hostname):
|
||||
description = topology.get_server_by_address((hostname, 27017)).description
|
||||
return description.server_type
|
||||
|
||||
|
||||
@ -116,28 +116,28 @@ class TestAllScenarios(unittest.TestCase):
|
||||
pass
|
||||
|
||||
|
||||
def cluster_type_name(cluster_type):
|
||||
return CLUSTER_TYPE._fields[cluster_type]
|
||||
def topology_type_name(topology_type):
|
||||
return TOPOLOGY_TYPE._fields[topology_type]
|
||||
|
||||
|
||||
def server_type_name(server_type):
|
||||
return SERVER_TYPE._fields[server_type]
|
||||
|
||||
|
||||
def check_outcome(self, cluster, outcome):
|
||||
def check_outcome(self, topology, outcome):
|
||||
expected_servers = outcome['servers']
|
||||
|
||||
# Check weak equality before proceeding.
|
||||
self.assertEqual(
|
||||
len(cluster.description.server_descriptions()),
|
||||
len(topology.description.server_descriptions()),
|
||||
len(expected_servers))
|
||||
|
||||
# Since lengths are equal, every actual server must have a corresponding
|
||||
# expected server.
|
||||
for expected_server_address, expected_server in expected_servers.items():
|
||||
node = common.partition_node(expected_server_address)
|
||||
self.assertTrue(cluster.has_server(node))
|
||||
actual_server = cluster.get_server_by_address(node)
|
||||
self.assertTrue(topology.has_server(node))
|
||||
actual_server = topology.get_server_by_address(node)
|
||||
actual_server_description = actual_server.description
|
||||
|
||||
if expected_server['type'] == 'PossiblePrimary':
|
||||
@ -157,15 +157,15 @@ def check_outcome(self, cluster, outcome):
|
||||
expected_server['setName'],
|
||||
actual_server_description.set_name)
|
||||
|
||||
self.assertEqual(outcome['setName'], cluster.description.set_name)
|
||||
expected_cluster_type = getattr(CLUSTER_TYPE, outcome['clusterType'])
|
||||
self.assertEqual(cluster_type_name(expected_cluster_type),
|
||||
cluster_type_name(cluster.description.cluster_type))
|
||||
self.assertEqual(outcome['setName'], topology.description.set_name)
|
||||
expected_topology_type = getattr(TOPOLOGY_TYPE, outcome['clusterType'])
|
||||
self.assertEqual(topology_type_name(expected_topology_type),
|
||||
topology_type_name(topology.description.topology_type))
|
||||
|
||||
|
||||
def create_test(scenario_def):
|
||||
def run_scenario(self):
|
||||
c = create_mock_cluster(scenario_def['uri'])
|
||||
c = create_mock_topology(scenario_def['uri'])
|
||||
|
||||
for phase in scenario_def['phases']:
|
||||
for response in phase['responses']:
|
||||
|
||||
@ -233,7 +233,7 @@ class _TestPoolingBase(unittest.TestCase):
|
||||
|
||||
def assert_no_request(self):
|
||||
try:
|
||||
server = self.c._cluster.select_server(
|
||||
server = self.c._topology.select_server(
|
||||
writable_server_selector,
|
||||
server_wait_time=0)
|
||||
|
||||
@ -253,7 +253,7 @@ class _TestPoolingBase(unittest.TestCase):
|
||||
def assert_pool_size(self, pool_size):
|
||||
if pool_size == 0:
|
||||
try:
|
||||
server = self.c._cluster.select_server(
|
||||
server = self.c._topology.select_server(
|
||||
writable_server_selector,
|
||||
server_wait_time=0)
|
||||
|
||||
|
||||
@ -190,7 +190,7 @@ class TestReadPreferences(TestReadPreferencesBase):
|
||||
latencies = ', '.join(
|
||||
'%s: %dms' % (server.description.address,
|
||||
server.description.round_trip_time)
|
||||
for server in c._get_cluster().select_servers(any_server_selector))
|
||||
for server in c._get_topology().select_servers(any_server_selector))
|
||||
|
||||
self.assertFalse(not_used,
|
||||
"Expected to use primary and all secondaries for mode NEAREST,"
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Test the cluster module."""
|
||||
"""Test the topology module."""
|
||||
|
||||
import sys
|
||||
|
||||
@ -24,8 +24,8 @@ import threading
|
||||
from bson.py3compat import imap
|
||||
from pymongo import common
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
from pymongo.topology import Cluster
|
||||
from pymongo.topology_description import CLUSTER_TYPE
|
||||
from pymongo.topology import Topology
|
||||
from pymongo.topology_description import TOPOLOGY_TYPE
|
||||
from pymongo.errors import (ConfigurationError,
|
||||
ConnectionFailure)
|
||||
from pymongo.ismaster import IsMaster
|
||||
@ -34,7 +34,7 @@ from pymongo.read_preferences import MovingAverage
|
||||
from pymongo.server_description import ServerDescription
|
||||
from pymongo.server_selectors import (any_server_selector,
|
||||
writable_server_selector)
|
||||
from pymongo.settings import ClusterSettings
|
||||
from pymongo.settings import TopologySettings
|
||||
from test import unittest, client_knobs
|
||||
|
||||
|
||||
@ -66,9 +66,9 @@ class MockPool(object):
|
||||
|
||||
|
||||
class MockMonitor(object):
|
||||
def __init__(self, server_description, cluster, pool, cluster_settings):
|
||||
def __init__(self, server_description, topology, pool, topology_settings):
|
||||
self._server_description = server_description
|
||||
self._cluster = cluster
|
||||
self._topology = topology
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
@ -80,60 +80,60 @@ class MockMonitor(object):
|
||||
pass
|
||||
|
||||
|
||||
class SetNameDiscoverySettings(ClusterSettings):
|
||||
def get_cluster_type(self):
|
||||
return CLUSTER_TYPE.ReplicaSetNoPrimary
|
||||
class SetNameDiscoverySettings(TopologySettings):
|
||||
def get_topology_type(self):
|
||||
return TOPOLOGY_TYPE.ReplicaSetNoPrimary
|
||||
|
||||
|
||||
address = ('a', 27017)
|
||||
|
||||
|
||||
def create_mock_cluster(seeds=None, set_name=None, monitor_class=MockMonitor):
|
||||
def create_mock_topology(seeds=None, set_name=None, monitor_class=MockMonitor):
|
||||
partitioned_seeds = list(imap(common.partition_node, seeds or ['a']))
|
||||
cluster_settings = ClusterSettings(
|
||||
topology_settings = TopologySettings(
|
||||
partitioned_seeds,
|
||||
set_name=set_name,
|
||||
pool_class=MockPool,
|
||||
monitor_class=monitor_class)
|
||||
|
||||
c = Cluster(cluster_settings)
|
||||
c = Topology(topology_settings)
|
||||
c.open()
|
||||
return c
|
||||
|
||||
|
||||
def got_ismaster(cluster, server_address, ismaster_response):
|
||||
def got_ismaster(topology, server_address, ismaster_response):
|
||||
server_description = ServerDescription(
|
||||
server_address,
|
||||
IsMaster(ismaster_response),
|
||||
MovingAverage([0]))
|
||||
|
||||
cluster.on_change(server_description)
|
||||
topology.on_change(server_description)
|
||||
|
||||
|
||||
def disconnected(cluster, server_address):
|
||||
def disconnected(topology, server_address):
|
||||
# Create new description of server type Unknown.
|
||||
cluster.on_change(ServerDescription(server_address))
|
||||
topology.on_change(ServerDescription(server_address))
|
||||
|
||||
|
||||
def get_type(cluster, hostname):
|
||||
description = cluster.get_server_by_address((hostname, 27017)).description
|
||||
def get_type(topology, hostname):
|
||||
description = topology.get_server_by_address((hostname, 27017)).description
|
||||
return description.server_type
|
||||
|
||||
|
||||
class ClusterTest(unittest.TestCase):
|
||||
class TopologyTest(unittest.TestCase):
|
||||
"""Disables periodic monitoring, to make tests deterministic."""
|
||||
|
||||
def setUp(self):
|
||||
super(ClusterTest, self).setUp()
|
||||
super(TopologyTest, self).setUp()
|
||||
self.client_knobs = client_knobs(heartbeat_frequency=999999)
|
||||
self.client_knobs.enable()
|
||||
|
||||
def tearDown(self):
|
||||
self.client_knobs.disable()
|
||||
super(ClusterTest, self).tearDown()
|
||||
super(TopologyTest, self).tearDown()
|
||||
|
||||
|
||||
class TestSingleServerCluster(ClusterTest):
|
||||
class TestSingleServerTopology(TopologyTest):
|
||||
def test_direct_connection(self):
|
||||
for server_type, ismaster_response in [
|
||||
(SERVER_TYPE.RSPrimary, {
|
||||
@ -170,7 +170,7 @@ class TestSingleServerCluster(ClusterTest):
|
||||
'ok': 1,
|
||||
'ismaster': False}),
|
||||
]:
|
||||
c = create_mock_cluster()
|
||||
c = create_mock_topology()
|
||||
|
||||
# Can't select a server while the only server is of type Unknown.
|
||||
self.assertRaises(
|
||||
@ -179,8 +179,8 @@ class TestSingleServerCluster(ClusterTest):
|
||||
|
||||
got_ismaster(c, address, ismaster_response)
|
||||
|
||||
# Cluster type never changes.
|
||||
self.assertEqual(CLUSTER_TYPE.Single, c.description.cluster_type)
|
||||
# Topology type never changes.
|
||||
self.assertEqual(TOPOLOGY_TYPE.Single, c.description.topology_type)
|
||||
|
||||
# No matter whether the server is writable,
|
||||
# select_servers() returns it.
|
||||
@ -188,14 +188,14 @@ class TestSingleServerCluster(ClusterTest):
|
||||
self.assertEqual(server_type, s.description.server_type)
|
||||
|
||||
def test_reopen(self):
|
||||
c = create_mock_cluster()
|
||||
c = create_mock_topology()
|
||||
|
||||
# Additional calls are permitted.
|
||||
c.open()
|
||||
c.open()
|
||||
|
||||
def test_unavailable_seed(self):
|
||||
c = create_mock_cluster()
|
||||
c = create_mock_topology()
|
||||
disconnected(c, address)
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a'))
|
||||
|
||||
@ -206,7 +206,7 @@ class TestSingleServerCluster(ClusterTest):
|
||||
def _check_with_socket(self, sock_info):
|
||||
return IsMaster({'ok': 1}), round_trip_time
|
||||
|
||||
c = create_mock_cluster(monitor_class=TestMonitor)
|
||||
c = create_mock_topology(monitor_class=TestMonitor)
|
||||
s = c.select_server(writable_server_selector)
|
||||
self.assertEqual(1, s.description.round_trip_time)
|
||||
|
||||
@ -217,11 +217,11 @@ class TestSingleServerCluster(ClusterTest):
|
||||
self.assertEqual(2, s.description.round_trip_time)
|
||||
|
||||
|
||||
class TestMultiServerCluster(ClusterTest):
|
||||
class TestMultiServerTopology(TopologyTest):
|
||||
def test_unexpected_host(self):
|
||||
# Received ismaster response from host not in cluster.
|
||||
# Received ismaster response from host not in topology.
|
||||
# E.g., a race where the host is removed before it responds.
|
||||
c = create_mock_cluster(['a', 'b'], set_name='rs')
|
||||
c = create_mock_topology(['a', 'b'], set_name='rs')
|
||||
|
||||
# 'b' is not in the set.
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
@ -244,17 +244,17 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertFalse(c.has_server(('b', 27017)))
|
||||
|
||||
def test_ghost_seed(self):
|
||||
c = create_mock_cluster(['a', 'b'])
|
||||
c = create_mock_topology(['a', 'b'])
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
'isreplicaset': True})
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSGhost, get_type(c, 'a'))
|
||||
self.assertEqual(CLUSTER_TYPE.Unknown, c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.Unknown, c.description.topology_type)
|
||||
|
||||
def test_standalone_removed(self):
|
||||
c = create_mock_cluster(['a', 'b'])
|
||||
c = create_mock_topology(['a', 'b'])
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True})
|
||||
@ -267,13 +267,13 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(0, len(c.description.server_descriptions()))
|
||||
|
||||
def test_mongos_ha(self):
|
||||
c = create_mock_cluster(['a', 'b'])
|
||||
c = create_mock_topology(['a', 'b'])
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'msg': 'isdbgrid'})
|
||||
|
||||
self.assertEqual(CLUSTER_TYPE.Sharded, c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.Sharded, c.description.topology_type)
|
||||
got_ismaster(c, ('b', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -283,7 +283,7 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(SERVER_TYPE.Mongos, get_type(c, 'b'))
|
||||
|
||||
def test_non_mongos_server(self):
|
||||
c = create_mock_cluster(['a', 'b'])
|
||||
c = create_mock_topology(['a', 'b'])
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -294,7 +294,7 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertFalse(c.has_server(('b', 27017)))
|
||||
|
||||
def test_rs_discovery(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
|
||||
# At first, A, B, and C are secondaries.
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
@ -308,8 +308,8 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'c'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
# Admin removes A, adds a high-priority member D which becomes primary.
|
||||
got_ismaster(c, ('b', 27017), {
|
||||
@ -325,8 +325,8 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'c'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'd'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
# Primary responds.
|
||||
got_ismaster(c, ('d', 27017), {
|
||||
@ -342,8 +342,8 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
# E is new.
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'e'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
# Stale response from C.
|
||||
got_ismaster(c, ('c', 27017), {
|
||||
@ -363,7 +363,7 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'e'))
|
||||
|
||||
def test_reset(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -379,16 +379,16 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
c.reset()
|
||||
self.assertEqual(2, len(c.description.server_descriptions()))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b'))
|
||||
self.assertEqual('rs', c.description.set_name)
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
@ -398,11 +398,11 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_reset_server(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -420,8 +420,8 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.RSSecondary, get_type(c, 'b'))
|
||||
self.assertEqual('rs', c.description.set_name)
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
got_ismaster(c, ('a', 27017), {
|
||||
'ok': 1,
|
||||
@ -430,20 +430,20 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a'))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
c.reset_server(('b', 27017))
|
||||
self.assertEqual(SERVER_TYPE.RSPrimary, get_type(c, 'a'))
|
||||
self.assertEqual(SERVER_TYPE.Unknown, get_type(c, 'b'))
|
||||
self.assertEqual('rs', c.description.set_name)
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_reset_removed_server(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
|
||||
# No error resetting a server not in the ClusterDescription.
|
||||
# No error resetting a server not in the TopologyDescription.
|
||||
c.reset_server(('b', 27017))
|
||||
|
||||
# Server was *not* added as type Unknown.
|
||||
@ -451,16 +451,16 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
def test_discover_set_name_from_primary(self):
|
||||
# Discovering a replica set without the setName supplied by the user
|
||||
# is not yet supported by MongoClient, but Cluster can do it.
|
||||
cluster_settings = SetNameDiscoverySettings(
|
||||
# is not yet supported by MongoClient, but Topology can do it.
|
||||
topology_settings = SetNameDiscoverySettings(
|
||||
seeds=[address],
|
||||
pool_class=MockPool,
|
||||
monitor_class=MockMonitor)
|
||||
|
||||
c = Cluster(cluster_settings)
|
||||
c = Topology(topology_settings)
|
||||
self.assertEqual(c.description.set_name, None)
|
||||
self.assertEqual(c.description.cluster_type,
|
||||
CLUSTER_TYPE.ReplicaSetNoPrimary)
|
||||
self.assertEqual(c.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
||||
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
@ -469,11 +469,11 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertEqual(c.description.set_name, 'rs')
|
||||
self.assertEqual(c.description.cluster_type,
|
||||
CLUSTER_TYPE.ReplicaSetWithPrimary)
|
||||
self.assertEqual(c.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetWithPrimary)
|
||||
|
||||
# Another response from the primary. Tests the code that processes
|
||||
# primary response when cluster type is already ReplicaSetWithPrimary.
|
||||
# primary response when topology type is already ReplicaSetWithPrimary.
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -482,21 +482,21 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
# No change.
|
||||
self.assertEqual(c.description.set_name, 'rs')
|
||||
self.assertEqual(c.description.cluster_type,
|
||||
CLUSTER_TYPE.ReplicaSetWithPrimary)
|
||||
self.assertEqual(c.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetWithPrimary)
|
||||
|
||||
def test_discover_set_name_from_secondary(self):
|
||||
# Discovering a replica set without the setName supplied by the user
|
||||
# is not yet supported by MongoClient, but Cluster can do it.
|
||||
cluster_settings = SetNameDiscoverySettings(
|
||||
# is not yet supported by MongoClient, but Topology can do it.
|
||||
topology_settings = SetNameDiscoverySettings(
|
||||
seeds=[address],
|
||||
pool_class=MockPool,
|
||||
monitor_class=MockMonitor)
|
||||
|
||||
c = Cluster(cluster_settings)
|
||||
c = Topology(topology_settings)
|
||||
self.assertEqual(c.description.set_name, None)
|
||||
self.assertEqual(c.description.cluster_type,
|
||||
CLUSTER_TYPE.ReplicaSetNoPrimary)
|
||||
self.assertEqual(c.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
||||
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
@ -506,44 +506,44 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertEqual(c.description.set_name, 'rs')
|
||||
self.assertEqual(c.description.cluster_type,
|
||||
CLUSTER_TYPE.ReplicaSetNoPrimary)
|
||||
self.assertEqual(c.description.topology_type,
|
||||
TOPOLOGY_TYPE.ReplicaSetNoPrimary)
|
||||
|
||||
def test_primary_disconnect(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
disconnected(c, address)
|
||||
self.assertTrue(c.has_server(address)) # Not removed.
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_primary_becomes_standalone(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
'setName': 'rs',
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
# An administrator restarts primary as standalone.
|
||||
got_ismaster(c, address, {'ok': 1})
|
||||
self.assertFalse(c.has_server(address))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_primary_wrong_set_name(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': True,
|
||||
@ -551,11 +551,11 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertFalse(c.has_server(address))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_secondary_wrong_set_name(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
got_ismaster(c, address, {
|
||||
'ok': 1,
|
||||
'ismaster': False,
|
||||
@ -564,11 +564,11 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'hosts': ['a']})
|
||||
|
||||
self.assertFalse(c.has_server(address))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetNoPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_secondary_wrong_set_name_with_primary(self):
|
||||
c = create_mock_cluster(['a', 'b'], set_name='rs')
|
||||
c = create_mock_topology(['a', 'b'], set_name='rs')
|
||||
|
||||
# Find the primary normally.
|
||||
got_ismaster(c, address, {
|
||||
@ -577,8 +577,8 @@ class TestMultiServerCluster(ClusterTest):
|
||||
'setName': 'rs',
|
||||
'hosts': ['a', 'b']})
|
||||
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
self.assertTrue(c.has_server(('b', 27017)))
|
||||
got_ismaster(c, ('b', 27017), {
|
||||
@ -590,17 +590,17 @@ class TestMultiServerCluster(ClusterTest):
|
||||
|
||||
# Secondary removed.
|
||||
self.assertFalse(c.has_server(('b', 27017)))
|
||||
self.assertEqual(CLUSTER_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.cluster_type)
|
||||
self.assertEqual(TOPOLOGY_TYPE.ReplicaSetWithPrimary,
|
||||
c.description.topology_type)
|
||||
|
||||
def test_non_rs_member(self):
|
||||
c = create_mock_cluster(['a', 'b'], set_name='rs')
|
||||
c = create_mock_topology(['a', 'b'], set_name='rs')
|
||||
self.assertTrue(c.has_server(('b', 27017)))
|
||||
got_ismaster(c, ('b', 27017), {'ok': 1}) # Standalone is removed.
|
||||
self.assertFalse(c.has_server(('b', 27017)))
|
||||
|
||||
def test_wire_version(self):
|
||||
c = create_mock_cluster(set_name='rs')
|
||||
c = create_mock_topology(set_name='rs')
|
||||
c.description.check_compatible() # No error.
|
||||
|
||||
got_ismaster(c, address, {
|
||||
@ -644,7 +644,7 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.fail('No error with incompatible wire version')
|
||||
|
||||
def test_max_write_batch_size(self):
|
||||
c = create_mock_cluster(seeds=['a', 'b'], set_name='rs')
|
||||
c = create_mock_topology(seeds=['a', 'b'], set_name='rs')
|
||||
|
||||
def write_batch_size():
|
||||
s = c.select_server(writable_server_selector)
|
||||
@ -679,7 +679,7 @@ class TestMultiServerCluster(ClusterTest):
|
||||
self.assertEqual(2, write_batch_size())
|
||||
|
||||
|
||||
class TestClusterErrors(ClusterTest):
|
||||
class TestTopologyErrors(TopologyTest):
|
||||
# Errors when calling ismaster.
|
||||
|
||||
def test_pool_reset(self):
|
||||
@ -694,7 +694,7 @@ class TestClusterErrors(ClusterTest):
|
||||
else:
|
||||
raise socket.error()
|
||||
|
||||
c = create_mock_cluster(monitor_class=TestMonitor)
|
||||
c = create_mock_topology(monitor_class=TestMonitor)
|
||||
# Await first ismaster call.
|
||||
s = c.select_server(writable_server_selector)
|
||||
self.assertEqual(1, ismaster_count[0])
|
||||
@ -716,7 +716,7 @@ class TestClusterErrors(ClusterTest):
|
||||
else:
|
||||
raise socket.error()
|
||||
|
||||
c = create_mock_cluster(monitor_class=TestMonitor)
|
||||
c = create_mock_topology(monitor_class=TestMonitor)
|
||||
|
||||
# Await first ismaster call.
|
||||
s = c.select_server(writable_server_selector)
|
||||
@ -737,7 +737,7 @@ class TestClusterErrors(ClusterTest):
|
||||
ismaster_count[0] += 1
|
||||
raise socket.error()
|
||||
|
||||
c = create_mock_cluster(monitor_class=TestMonitor)
|
||||
c = create_mock_topology(monitor_class=TestMonitor)
|
||||
|
||||
self.assertRaises(
|
||||
ConnectionFailure,
|
||||
|
||||
@ -374,8 +374,8 @@ def assertReadFromAll(testcase, rsc, members, *args, **kwargs):
|
||||
testcase.assertEqual(members, used)
|
||||
|
||||
def get_pool(client):
|
||||
cluster = client._get_cluster()
|
||||
server = cluster.select_server(writable_server_selector)
|
||||
topology = client._get_topology()
|
||||
server = topology.select_server(writable_server_selector)
|
||||
return server.pool
|
||||
|
||||
def pools_from_rs_client(client):
|
||||
@ -383,7 +383,7 @@ def pools_from_rs_client(client):
|
||||
"""
|
||||
return [
|
||||
server.pool for server in
|
||||
client._get_cluster().select_servers(any_server_selector)]
|
||||
client._get_topology().select_servers(any_server_selector)]
|
||||
|
||||
class TestRequestMixin(object):
|
||||
"""Inherit from this class and from unittest.TestCase to get some
|
||||
|
||||
Loading…
Reference in New Issue
Block a user