Fix MongoReplicaSetClient race conditions PYTHON-467

* RSState, Member, and MovingAverage are now immutable
* In refresh(), try up members before down ones
* A test_ha fixup (clear process-list after killing them in each test, so we don't re-kill previous tests' processes)
This commit is contained in:
A. Jesse Jiryu Davis 2013-03-16 16:13:37 -07:00
parent 55f1df7577
commit 6c2e009f45
8 changed files with 562 additions and 355 deletions

View File

@ -106,13 +106,171 @@ def _partition_node(node):
return host, port
# Concurrency notes: A MongoReplicaSetClient keeps its view of the replica-set
# state in an RSState instance. RSStates are immutable, except for
# host-pinning. Pools, which are internally thread / greenlet safe, can be
# copied from old to new RSStates safely. The client updates its view of the
# set's state not by modifying its RSState but by replacing it with an updated
# copy.
# In __init__, MongoReplicaSetClient gets a list of potential members called
# 'seeds' from its initial parameters, and calls refresh(). refresh() iterates
# over the the seeds in arbitrary order looking for a member it can connect to.
# Once it finds one, it calls 'ismaster' and sets self.__hosts to the list of
# members in the response, and connects to the rest of the members. refresh()
# sets the MongoReplicaSetClient's RSState. Finally, __init__ launches the
# replica-set monitor.
# The monitor calls refresh() every 30 seconds, or whenever the client has
# encountered an error that prompts it to wake the monitor.
# Every method that accesses the RSState multiple times within the method makes
# a local reference first and uses that throughout, so it's isolated from a
# concurrent method replacing the RSState with an updated copy. This technique
# avoids the need to lock around accesses to the RSState.
class RSState(object):
def __init__(
self, threadlocal, host_to_member=None, arbiters=None, writer=None,
error_message='No primary available'):
"""An immutable snapshot of the client's view of the replica set state.
:Parameters:
- `threadlocal`: Thread- or greenlet-local storage
- `host_to_member`: Optional dict: (host, port) -> Member instance
- `arbiters`: Optional sequence of arbiters as (host, port)
- `writer`: Optional (host, port) of primary
- `error_message`: Optional error if `writer` is None
"""
self._threadlocal = threadlocal # threading.local or gevent local
self._arbiters = frozenset(arbiters or []) # set of (host, port)
self._writer = writer # (host, port) of the primary, or None
self._error_message = error_message
self._host_to_member = host_to_member or {}
self._hosts = frozenset(self._host_to_member)
self._members = frozenset(self._host_to_member.values())
if writer and self._host_to_member[writer].up:
self._primary_member = self._host_to_member[writer]
else:
self._primary_member = None
def clone_with_host_down(self, host, error_message):
"""Get a clone, marking as "down" the member with the given (host, port)
"""
members = self._host_to_member.copy()
down_member = members.pop(host, None)
if down_member:
members[host] = down_member.clone_down()
if host == self.writer:
# The primary went down; record the error message.
return RSState(
self._threadlocal, members, self._arbiters,
None, error_message)
else:
# Some other host went down. Keep our current primary or, if it's
# already down, keep our current error message.
return RSState(
self._threadlocal, members, self._arbiters,
self._writer, self._error_message)
def clone_without_writer(self, threadlocal):
"""Get a clone without a primary. Unpins all threads.
:Parameters:
- `threadlocal`: Thread- or greenlet-local storage
"""
return RSState(
threadlocal, self._host_to_member.copy(), self._arbiters, None)
@property
def arbiters(self):
"""Set of (host, port) pairs."""
return self._arbiters
@property
def writer(self):
"""(host, port) of primary, or None."""
return self._writer
@property
def primary_member(self):
return self._primary_member
@property
def hosts(self):
"""Set of (host, port) tuples of data members of the replica set."""
return self._hosts
@property
def members(self):
"""Set of Member instances."""
return self._members
@property
def error_message(self):
"""The error, if any, raised when trying to connect to the primary"""
return self._error_message
@property
def secondaries(self):
"""Set of (host, port) pairs."""
# Unlike the other properties, this isn't cached because it isn't used
# in regular operations.
return set([
host for host, member in self._host_to_member.items()
if member.is_secondary])
def get(self, host):
"""Return a Member instance or None for the given (host, port)."""
return self._host_to_member.get(host)
def pin_host(self, host, mode, tag_sets, latency):
"""Pin this thread / greenlet to a member.
`host` is a (host, port) pair. The remaining parameters are a read
preference.
"""
# Fun fact: Unlike in thread_util.ThreadIdent, we needn't lock around
# assignment here. Assignment to a threadlocal is only unsafe if it
# can cause other Python code to run implicitly.
self._threadlocal.host = host
self._threadlocal.read_preference = (mode, tag_sets, latency)
def keep_pinned_host(self, mode, tag_sets, latency):
"""Does a read pref match the last used by this thread / greenlet?"""
return self._threadlocal.read_preference == (mode, tag_sets, latency)
@property
def pinned_host(self):
"""The (host, port) last used by this thread / greenlet, or None."""
return getattr(self._threadlocal, 'host', None)
def unpin_host(self):
"""Forget this thread / greenlet's last used member."""
self._threadlocal.host = self._threadlocal.read_preference = None
@property
def threadlocal(self):
return self._threadlocal
def __str__(self):
return '<RSState [%s] writer="%s">' % (
', '.join(str(member) for member in self._host_to_member.itervalues()),
self.writer and '%s:%s' % self.writer or None)
class Monitor(object):
"""Base class for replica set monitors.
"""
_refresh_interval = 30
def __init__(self, rsc, event_class):
self.rsc = weakref.proxy(rsc, self.shutdown)
self.event = event_class()
self.refreshed = event_class()
self.stopped = False
def shutdown(self, dummy=None):
@ -124,8 +282,14 @@ class Monitor(object):
def schedule_refresh(self):
"""Refresh immediately
"""
self.refreshed.clear()
self.event.set()
def wait_for_refresh(self, timeout_seconds):
"""Block until a scheduled refresh completes
"""
self.refreshed.wait(timeout_seconds)
def monitor(self):
"""Run until the RSC is collected or an
unexpected error occurs.
@ -135,10 +299,15 @@ class Monitor(object):
if self.stopped:
break
self.event.clear()
try:
self.rsc.refresh()
try:
self.rsc.refresh()
finally:
self.refreshed.set()
except AutoReconnect:
pass
# RSC has been collected or there
# was an unexpected error.
except:
@ -188,18 +357,25 @@ except ImportError:
class Member(object):
"""Represent one member of a replica set
"""Immutable representation of one member of a replica set.
:Parameters:
- `host`: A (host, port) pair
- `connection_pool`: A Pool instance
- `ismaster_response`: A dict, MongoDB's ismaster response
- `ping_time`: A MovingAverage instance
- `up`: Whether we think this member is available
"""
# For unittesting only. Use under no circumstances!
_host_to_ping_time = {}
def __init__(self, host, ismaster_response, ping_time, connection_pool):
def __init__(self, host, connection_pool, ismaster_response, ping_time, up):
self.host = host
self.pool = connection_pool
self.ping_time = MovingAverage(5)
self.update(ismaster_response, ping_time)
self.ismaster_response = ismaster_response
self.ping_time = ping_time
self.up = up
def update(self, ismaster_response, ping_time):
if ismaster_response['ismaster']:
self.state = PRIMARY
elif ismaster_response.get('secondary'):
@ -207,11 +383,22 @@ class Member(object):
else:
self.state = OTHER
self.tags = ismaster_response.get('tags', {})
self.max_bson_size = ismaster_response.get(
'maxBsonObjectSize', MAX_BSON_SIZE)
self.tags = ismaster_response.get('tags', {})
self.record_ping_time(ping_time)
self.up = True
def clone_with(self, ismaster_response, ping_time_sample):
"""Get a clone updated with ismaster response and a single ping time.
"""
ping_time = self.ping_time.clone_with(ping_time_sample)
return Member(self.host, self.pool, ismaster_response, ping_time, True)
def clone_down(self):
"""Get a clone of this Member, but with up=False.
"""
return Member(
self.host, self.pool, self.ismaster_response, self.ping_time,
False)
@property
def is_primary(self):
@ -222,7 +409,7 @@ class Member(object):
return self.state == SECONDARY
def get_avg_ping_time(self):
"""Get a moving average of this member's ping times
"""Get a moving average of this member's ping times.
"""
if self.host in Member._host_to_ping_time:
# Simulate ping times for unittesting
@ -230,9 +417,6 @@ class Member(object):
return self.ping_time.get()
def record_ping_time(self, ping_time):
self.ping_time.update(ping_time)
def matches_mode(self, mode):
if mode == ReadPreference.PRIMARY and not self.is_primary:
return False
@ -265,6 +449,10 @@ class Member(object):
return False
def __str__(self):
return '<Member "%s:%s" primary=%r up=%r>' % (
self.host[0], self.host[1], self.is_primary, self.up)
class MongoReplicaSetClient(common.BaseObject):
"""Connection to a MongoDB replica set.
@ -407,10 +595,6 @@ class MongoReplicaSetClient(common.BaseObject):
"""
self.__opts = {}
self.__seeds = set()
self.__hosts = None
self.__arbiters = set()
self.__writer = None
self.__members = {}
self.__index_cache = {}
self.__auth_credentials = {}
@ -458,13 +642,14 @@ class MongoReplicaSetClient(common.BaseObject):
"The gevent module is not available. "
"Install the gevent package from PyPI.")
self.__rs_state = RSState(self.__make_threadlocal())
self.__request_counter = thread_util.Counter(self.__use_greenlets)
self.__auto_start_request = self.__opts.get('auto_start_request', False)
if self.__auto_start_request:
self.start_request()
self.__reset_pinned_hosts()
self.__name = self.__opts.get('replicaset')
if not self.__name:
raise ConfigurationError("the replicaSet "
@ -479,7 +664,7 @@ class MongoReplicaSetClient(common.BaseObject):
self.__ssl_ca_certs = self.__opts.get('ssl_ca_certs', None)
ssl_kwarg_keys = [k for k in kwargs.keys() if k.startswith('ssl_')]
if self.__use_ssl == False and ssl_kwarg_keys:
if not self.__use_ssl and ssl_kwarg_keys:
raise ConfigurationError("ssl has not been enabled but the "
"following ssl parameters have been set: "
"%s. Please set `ssl=True` or remove."
@ -610,8 +795,13 @@ class MongoReplicaSetClient(common.BaseObject):
'to this database. You must logout first.')
# Try to authenticate even during failover.
members = self.__members.copy().values()
member = select_member(members, ReadPreference.PRIMARY_PREFERRED)
member = select_member(
self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED)
if not member:
raise AutoReconnect(
"No replica set members available for authentication")
sock_info = self.__socket(member)
try:
# Since __check_auth was called in __socket
@ -650,6 +840,8 @@ class MongoReplicaSetClient(common.BaseObject):
@property
def seeds(self):
"""The seed list used to connect to this replica set.
A sequence of (host, port) pairs.
"""
return self.__seeds
@ -658,34 +850,38 @@ class MongoReplicaSetClient(common.BaseObject):
"""All active and passive (priority 0) replica set
members known to this client. This does not include
hidden or slaveDelay members, or arbiters.
A sequence of (host, port) pairs.
"""
return self.__hosts
return self.__rs_state.hosts
@property
def primary(self):
"""The current primary of the replica set.
"""The (host, port) of the current primary of the replica set.
Returns None if there is no primary.
"""
return self.__writer
return self.__rs_state.writer
@property
def secondaries(self):
"""The secondary members known to this client.
A sequence of (host, port) pairs.
"""
return set([
host for host, member in self.__members.items()
if member.is_secondary])
return self.__rs_state.secondaries
@property
def arbiters(self):
"""The arbiters known to this client.
A sequence of (host, port) pairs.
"""
return self.__arbiters
return self.__rs_state.arbiters
@property
def is_mongos(self):
"""If this instance is connected to mongos (always False)
"""If this instance is connected to mongos (always False).
.. versionadded:: 2.3
"""
@ -738,8 +934,9 @@ class MongoReplicaSetClient(common.BaseObject):
accepts in bytes. Defaults to 4MB in server < 1.7.4. Returns
0 if no primary is available.
"""
if self.__writer:
return self.__members[self.__writer].max_bson_size
rs_state = self.__rs_state
if rs_state.primary_member:
return rs_state.primary_member.max_bson_size
return 0
@property
@ -798,89 +995,66 @@ class MongoReplicaSetClient(common.BaseObject):
connection_pool.discard_socket(sock_info)
raise
def __update_pools(self):
"""Update the mapping of (host, port) pairs to connection pools.
def __schedule_refresh(self, sync=False):
"""Awake the monitor to update our view of the replica set's state.
If `sync` is True, block until the refresh completes.
If multiple application threads call __schedule_refresh while refresh
is in progress, the work of refreshing the state is only performed
once.
"""
primary = None
for host in self.__hosts:
member, sock_info = None, None
try:
if host in self.__members:
member = self.__members[host]
sock_info = self.__socket(member)
res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1})
member.pool.maybe_return_socket(sock_info)
member.update(res, ping_time)
else:
res, connection_pool, ping_time = self.__is_master(host)
self.__members[host] = Member(
host=host,
ismaster_response=res,
ping_time=ping_time,
connection_pool=connection_pool)
except (ConnectionFailure, socket.error):
if member:
member.pool.discard_socket(sock_info)
self.__members.pop(member.host, None)
continue
if res['ismaster']:
primary = host
if primary != self.__writer:
self.__reset_pinned_hosts()
self.__writer = primary
def __schedule_refresh(self):
self.__monitor.schedule_refresh()
if sync:
self.__monitor.wait_for_refresh(timeout_seconds=5)
def __pin_host(self, host, mode, tag_sets, latency):
# After first successful read in a request, continue reading from same
# member until read preferences change, host goes down, or
# end_request(). This offers a small assurance that reads won't jump
# around in time.
self.__threadlocal.host = host
self.__threadlocal.read_preference = (mode, tag_sets, latency)
def __keep_pinned_host(self, mode, tag_sets, latency):
# If read preferences have changed, return False
return getattr(self.__threadlocal, 'read_preference', None) == (
mode, tag_sets, latency)
def __pinned_host(self):
return getattr(self.__threadlocal, 'host', None)
def __unpin_host(self):
self.__threadlocal.host = self.__threadlocal.read_preference = None
def __reset_pinned_hosts(self):
def __make_threadlocal(self):
if self.__use_greenlets:
self.__threadlocal = gevent_local()
return gevent_local()
else:
self.__threadlocal = threading.local()
return threading.local()
def refresh(self):
"""Iterate through the existing host list, or possibly the
seed list, to update the list of hosts and arbiters in this
replica set.
"""
# Only one thread / greenlet calls refresh() at a time: the one
# running __init__() or the monitor. We won't modify the state, only
# replace it at the end.
rs_state = self.__rs_state
errors = []
nodes = self.__hosts or self.__seeds
if rs_state.hosts:
# Try first those hosts we think are up, then the down ones.
nodes = sorted(
rs_state.hosts, key=lambda host: rs_state.get(host).up)
else:
nodes = self.__seeds
hosts = set()
# This will become the new RSState.
members = {}
arbiters = set()
writer = None
# Look for first member from which we can get a list of all members.
for node in nodes:
member, sock_info = None, None
member, sock_info = rs_state.get(node), None
try:
if node in self.__members:
member = self.__members[node]
if member:
sock_info = self.__socket(member)
response, _ = self.__simple_command(
response, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1})
member.pool.maybe_return_socket(sock_info)
new_member = member.clone_with(response, ping_time)
else:
response, _, _ = self.__is_master(node)
response, pool, ping_time = self.__is_master(node)
new_member = Member(
node, pool, response, MovingAverage([ping_time]), True)
if response['ismaster']:
writer = node
# Check that this host is part of the given replica set.
set_name = response.get('setName')
@ -893,86 +1067,87 @@ class MongoReplicaSetClient(common.BaseObject):
"replica set %s"
% (host, port, self.__name))
if "arbiters" in response:
self.__arbiters = set([_partition_node(h)
for h in response["arbiters"]])
arbiters = set([
_partition_node(h) for h in response["arbiters"]])
if "hosts" in response:
hosts.update([_partition_node(h)
for h in response["hosts"]])
if "passives" in response:
hosts.update([_partition_node(h)
for h in response["passives"]])
# Start off the new 'members' dict with this member.
members[node] = new_member
except (ConnectionFailure, socket.error), why:
if member:
member.pool.discard_socket(sock_info)
errors.append("%s:%d: %s" % (node[0], node[1], str(why)))
if hosts:
self.__hosts = hosts
break
else:
if errors:
raise AutoReconnect(', '.join(errors))
raise ConfigurationError('No suitable hosts found')
self.__update_pools()
# Ensure we have a pool for each member, and find the primary.
for host in hosts:
if host in members:
# This member was the first we connected to, in the loop above.
continue
def __check_is_primary(self, host):
"""Checks if this host is the primary for the replica set.
"""
member, sock_info = None, None
try:
if host in self.__members:
member = self.__members[host]
sock_info = self.__socket(member)
res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
else:
res, connection_pool, ping_time = self.__is_master(host)
self.__members[host] = Member(
host=host,
ismaster_response=res,
ping_time=ping_time,
connection_pool=connection_pool)
except (ConnectionFailure, socket.error), why:
if member:
member.pool.discard_socket(sock_info)
raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why)))
if member and sock_info:
member.pool.maybe_return_socket(sock_info)
if res["ismaster"]:
return host
elif "primary" in res:
candidate = _partition_node(res["primary"])
# Don't report the same connect failure multiple times.
member, sock_info = rs_state.get(host), None
try:
return self.__check_is_primary(candidate)
if member:
sock_info = self.__socket(member)
res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1})
member.pool.maybe_return_socket(sock_info)
new_member = member.clone_with(res, ping_time)
else:
res, connection_pool, ping_time = self.__is_master(host)
new_member = Member(
host, connection_pool, res, MovingAverage([ping_time]),
True)
members[host] = new_member
except (ConnectionFailure, socket.error):
pass
raise AutoReconnect('%s:%d: not primary' % host)
if member:
member.pool.discard_socket(sock_info)
continue
if res['ismaster']:
writer = host
if writer == rs_state.writer:
threadlocal = self.__rs_state.threadlocal
else:
# We unpin threads from members if the primary has changed, since
# no monotonic consistency can be promised now anyway.
threadlocal = self.__make_threadlocal()
# Replace old state with new.
self.__rs_state = RSState(threadlocal, members, arbiters, writer)
def __find_primary(self):
"""Returns a connection to the primary of this replica set,
if one exists.
if one exists, or raises AutoReconnect.
"""
if self.__writer:
primary = self.__members[self.__writer]
if primary.up:
return primary
primary = self.__rs_state.primary_member
if primary:
return primary
# This is either the first connection or we had a failover.
self.refresh()
# We had a failover.
self.__schedule_refresh(sync=True)
# Try again. This time copy the RSState reference so we're guaranteed
# primary_member and error_message are from the same state.
rs_state = self.__rs_state
if rs_state.primary_member:
return rs_state.primary_member
errors = []
for candidate in self.__hosts:
try:
self.__writer = self.__check_is_primary(candidate)
return self.__members[self.__writer]
except (ConnectionFailure, socket.error), why:
errors.append(str(why))
# Couldn't find the primary.
raise AutoReconnect(', '.join(errors))
raise AutoReconnect(rs_state.error_message)
def __socket(self, member):
"""Get a SocketInfo from the pool.
@ -990,13 +1165,15 @@ class MongoReplicaSetClient(common.BaseObject):
return sock_info
def disconnect(self):
"""Disconnect from the replica set primary and refresh our view of
the replica set.
"""Disconnect from the replica set primary, unpin all members, and
refresh our view of the replica set.
"""
member = self.__members.get(self.__writer)
if member:
member.pool.reset()
self.__writer = None
rs_state = self.__rs_state
if rs_state.primary_member:
rs_state.primary_member.pool.reset()
threadlocal = self.__make_threadlocal()
self.__rs_state = rs_state.clone_without_writer(threadlocal)
self.__schedule_refresh()
def close(self):
@ -1014,12 +1191,12 @@ class MongoReplicaSetClient(common.BaseObject):
The :meth:`close` method now terminates the replica set monitor.
"""
if self.__monitor:
self.__monitor.shutdown(None)
self.__monitor.shutdown()
# Use a reasonable timeout.
self.__monitor.join(1.0)
self.__monitor = None
self.__writer = None
self.__members = {}
self.__rs_state = RSState(self.__make_threadlocal())
def alive(self):
"""Return ``False`` if there has been an error communicating with the
@ -1148,7 +1325,7 @@ class MongoReplicaSetClient(common.BaseObject):
if _connection_to_use in (None, -1):
member = self.__find_primary()
else:
member = self.__members[_connection_to_use]
member = self.__rs_state.get(_connection_to_use)
sock_info = None
try:
@ -1179,6 +1356,8 @@ class MongoReplicaSetClient(common.BaseObject):
def __send_and_receive(self, member, msg, **kwargs):
"""Send a message on the given socket and return the response data.
Can raise socket.error.
"""
sock_info = None
try:
@ -1196,13 +1375,8 @@ class MongoReplicaSetClient(common.BaseObject):
member.pool.maybe_return_socket(sock_info)
return response
except (ConnectionFailure, socket.error), why:
host, port = member.pool.pair
member.pool.discard_socket(sock_info)
raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
except:
if sock_info:
sock_info.close()
member.pool.discard_socket(sock_info)
raise
def __try_read(self, member, msg, **kwargs):
@ -1211,10 +1385,23 @@ class MongoReplicaSetClient(common.BaseObject):
"""
try:
return self.__send_and_receive(member, msg, **kwargs)
except AutoReconnect:
member.up = False
except socket.timeout, e:
# Could be one slow query, don't refresh.
host, port = member.host
raise AutoReconnect("%s:%d: %s" % (host, port, e))
except (socket.error, ConnectionFailure), why:
# Try to replace our RSState with a clone where this member is
# marked "down", to reduce exceptions on other threads, or repeated
# exceptions on this thread. We accept that there's a race
# condition (another thread could be replacing our state with a
# different version concurrently) but this approach is simple and
# lock-free.
self.__rs_state = self.__rs_state.clone_with_host_down(
member.host, str(why))
self.__schedule_refresh()
raise
host, port = member.host
raise AutoReconnect("%s:%d: %s" % (host, port, why))
def _send_message_with_response(self, msg, _connection_to_use=None,
_must_use_master=False, **kwargs):
@ -1224,52 +1411,62 @@ class MongoReplicaSetClient(common.BaseObject):
:Parameters:
- `msg`: (request_id, data) pair making up the message to send
- `_connection_to_use`: Optional (host, port) of member for message,
used by Cursor for getMore and killCursors messages.
- `_must_use_master`: If True, send to primary.
"""
# If we've disconnected since last read, trigger refresh
try:
self.__find_primary()
except ConnectionFailure:
# We'll throw an error later
pass
rs_state = self.__rs_state
tag_sets = kwargs.get('tag_sets', [{}])
mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
if _must_use_master:
mode = ReadPreference.PRIMARY
tag_sets = [{}]
if not rs_state.primary_member:
# Primary was down last we checked. Start a refresh if one is not
# already in progress. If caller requested the primary, wait to
# see if it's up, otherwise continue with known-good members.
sync = (mode == ReadPreference.PRIMARY)
self.__schedule_refresh(sync=sync)
rs_state = self.__rs_state
latency = kwargs.get(
'secondary_acceptable_latency_ms',
self.secondary_acceptable_latency_ms)
member = None
try:
if _connection_to_use is not None:
if _connection_to_use == -1:
member = self.__find_primary()
member = rs_state.primary_member
error_message = rs_state.error_message
else:
member = self.__members[_connection_to_use]
member = rs_state.get(_connection_to_use)
error_message = '%s:%s not available' % _connection_to_use
if not member:
raise AutoReconnect(error_message)
return member.pool.pair, self.__try_read(
member, msg, **kwargs)
except AutoReconnect:
if member == self.__members.get(self.__writer):
if _connection_to_use in (-1, rs_state.writer):
# Primary's down. Refresh.
self.disconnect()
raise
# To provide some monotonic consistency, we use the same member as
# long as this thread is in a request and all reads use the same
# mode, tags, and latency. The member gets unpinned if pref changes,
# if member changes state, if we detect a failover and call
# __reset_pinned_hosts(), or if this thread calls end_request().
# if member changes state, if we detect a failover, or if this thread
# calls end_request().
errors = []
pinned_member = self.__members.get(self.__pinned_host())
pinned_host = rs_state.pinned_host
pinned_member = rs_state.get(pinned_host)
if (pinned_member
and pinned_member.matches_mode(mode)
and pinned_member.matches_tag_sets(tag_sets)
and pinned_member.up
and self.__keep_pinned_host(mode, tag_sets, latency)
):
and pinned_member.matches_mode(mode)
and pinned_member.matches_tag_sets(tag_sets) # TODO: REMOVE?
and rs_state.keep_pinned_host(mode, tag_sets, latency)):
try:
return (
pinned_member.host,
@ -1282,10 +1479,9 @@ class MongoReplicaSetClient(common.BaseObject):
errors.append(str(why))
# No pinned member, or pinned member down or doesn't match read pref
self.__unpin_host()
members = self.__members.copy().values()
rs_state.unpin_host()
members = list(rs_state.members)
while len(errors) < MAX_RETRY:
member = select_member(
members=members,
@ -1306,7 +1502,7 @@ class MongoReplicaSetClient(common.BaseObject):
if self.in_request():
# Keep reading from this member in this thread / greenlet
# unless read preference changes
self.__pin_host(member.host, mode, tag_sets, latency)
rs_state.pin_host(member.host, mode, tag_sets, latency)
return member.host, response
except AutoReconnect, why:
errors.append(str(why))
@ -1360,7 +1556,7 @@ class MongoReplicaSetClient(common.BaseObject):
# exceed 1. This keeps things sane when we create and delete pools
# within a request.
if 1 == self.__request_counter.inc():
for member in self.__members.values():
for member in self.__rs_state.members:
member.pool.start_request()
return pool.Request(self)
@ -1386,12 +1582,13 @@ class MongoReplicaSetClient(common.BaseObject):
in the middle of a sequence of operations in which ordering is
important. This could lead to unexpected results.
"""
rs_state = self.__rs_state
if 0 == self.__request_counter.dec():
for member in self.__members.values():
for member in rs_state.members:
# No effect if not in a request
member.pool.end_request()
self.__unpin_host()
rs_state.unpin_host()
def __eq__(self, other):
# XXX: Implement this?
@ -1402,7 +1599,7 @@ class MongoReplicaSetClient(common.BaseObject):
def __repr__(self):
return "MongoReplicaSetClient(%r)" % (["%s:%d" % n
for n in self.__hosts],)
for n in self.hosts],)
def __getattr__(self, name):
"""Get a database by name.

View File

@ -15,8 +15,6 @@
"""Utilities for choosing which member of a replica set to read from."""
import random
import threading
from collections import deque
from pymongo.errors import ConfigurationError
@ -86,7 +84,6 @@ def mongos_mode(mode):
def mongos_enum(enum):
return _mongos_modes.index(enum)
def select_primary(members):
for member in members:
if member.is_primary:
@ -151,6 +148,7 @@ def select_member(
return select_primary(members)
elif mode == PRIMARY_PREFERRED:
# Recurse.
candidate_primary = select_member(members, PRIMARY, [{}], latency)
if candidate_primary:
return candidate_primary
@ -166,6 +164,7 @@ def select_member(
return None
elif mode == SECONDARY_PREFERRED:
# Recurse.
candidate_secondary = select_member(
members, SECONDARY, tag_sets, latency)
if candidate_secondary:
@ -196,32 +195,16 @@ secondary_ok_commands = frozenset([
class MovingAverage(object):
"""Tracks a moving average.
"""
def __init__(self, window_sz):
self.window_sz = window_sz
self.samples = deque()
self.total = 0
self.lock = threading.Lock()
def __init__(self, samples):
"""Immutable structure to track a 5-sample moving average.
"""
self.samples = samples[-5:]
assert self.samples
self.average = sum(self.samples) / float(len(self.samples))
def update(self, sample):
# One reason we synchronize MovingAverage is that Jython's
# popleft isn't safe: http://bugs.jython.org/issue2001
self.lock.acquire()
try:
self.samples.append(sample)
self.total += sample
if len(self.samples) > self.window_sz:
self.total -= self.samples.popleft()
finally:
self.lock.release()
def clone_with(self, sample):
"""Get a copy of this instance plus a new sample"""
return MovingAverage(self.samples + [sample])
def get(self):
self.lock.acquire()
try:
if self.samples:
return self.total / float(len(self.samples))
else:
return None
finally:
self.lock.release()
return self.average

View File

@ -20,14 +20,17 @@ import os
import pymongo
from pymongo.errors import ConnectionFailure
host = os.environ.get("DB_IP", 'localhost')
# hostnames retrieved by MongoReplicaSetClient from isMaster will be of unicode
# type in Python 2, so ensure these hostnames are unicodes, too. It makes tests
# like `test_repr` predictable.
host = unicode(os.environ.get("DB_IP", 'localhost'))
port = int(os.environ.get("DB_PORT", 27017))
pair = '%s:%d' % (host, port)
host2 = os.environ.get("DB_IP2", 'localhost')
host2 = unicode(os.environ.get("DB_IP2", 'localhost'))
port2 = int(os.environ.get("DB_PORT2", 27018))
host3 = os.environ.get("DB_IP3", 'localhost')
host3 = unicode(os.environ.get("DB_IP3", 'localhost'))
port3 = int(os.environ.get("DB_PORT3", 27019))

View File

@ -27,14 +27,11 @@ import ha_tools
from ha_tools import use_greenlets
from pymongo import (MongoReplicaSetClient,
ReadPreference)
from pymongo.mongo_replica_set_client import (
Member, Monitor, MongoReplicaSetClient)
from pymongo.mongo_client import _partition_node
from pymongo.mongo_client import MongoClient
from pymongo.errors import AutoReconnect, OperationFailure, ConnectionFailure
from pymongo.read_preferences import modes
from pymongo.mongo_replica_set_client import Member, Monitor
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.mongo_client import MongoClient, _partition_node
from pymongo.read_preferences import ReadPreference, modes
from test import utils
from test.utils import one
@ -52,7 +49,15 @@ SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
class TestDirectConnection(unittest.TestCase):
class HATestCase(unittest.TestCase):
"""A test case for connections to replica sets or mongos."""
def tearDown(self):
ha_tools.kill_all_members()
ha_tools.nodes.clear()
class TestDirectConnection(HATestCase):
def setUp(self):
members = [{}, {}, {'arbiterOnly': True}]
@ -145,10 +150,10 @@ class TestDirectConnection(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
super(TestDirectConnection, self).tearDown()
class TestPassiveAndHidden(unittest.TestCase):
class TestPassiveAndHidden(HATestCase):
def setUp(self):
members = [{},
@ -177,10 +182,10 @@ class TestPassiveAndHidden(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
super(TestPassiveAndHidden, self).tearDown()
class TestMonitorRemovesRecoveringMember(unittest.TestCase):
class TestMonitorRemovesRecoveringMember(HATestCase):
# Members in STARTUP2 or RECOVERING states are shown in the primary's
# isMaster response, but aren't secondaries and shouldn't be read from.
# Verify that if a secondary goes into RECOVERING mode, the Monitor removes
@ -211,10 +216,10 @@ class TestMonitorRemovesRecoveringMember(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
super(TestMonitorRemovesRecoveringMember, self).tearDown()
class TestTriggeredRefresh(unittest.TestCase):
class TestTriggeredRefresh(HATestCase):
# Verify that if a secondary goes into RECOVERING mode or if the primary
# changes, the next exception triggers an immediate refresh.
@ -282,14 +287,14 @@ class TestTriggeredRefresh(unittest.TestCase):
# We've detected the stepdown
self.assertTrue(
not c_find_one.primary
or primary != _partition_node(c_find_one.primary))
or _partition_node(primary) != c_find_one.primary)
def tearDown(self):
Monitor._refresh_interval = MONITOR_INTERVAL
ha_tools.kill_all_members()
super(TestTriggeredRefresh, self).tearDown()
class TestHealthMonitor(unittest.TestCase):
class TestHealthMonitor(HATestCase):
def setUp(self):
res = ha_tools.start_replica_set([{}, {}, {}])
@ -357,17 +362,10 @@ class TestHealthMonitor(unittest.TestCase):
ha_tools.stepdown_primary()
self.assertTrue(primary_changed())
# There can be a delay between finding the primary and updating
# secondaries
sleep(5)
self.assertNotEqual(secondaries, c.secondaries)
def tearDown(self):
ha_tools.kill_all_members()
class TestWritesWithFailover(unittest.TestCase):
class TestWritesWithFailover(HATestCase):
def setUp(self):
res = ha_tools.start_replica_set([{}, {}, {}])
@ -398,11 +396,8 @@ class TestWritesWithFailover(unittest.TestCase):
self.assertTrue(primary != c.primary)
self.assertEqual('baz', db.test.find_one({'bar': 'baz'})['bar'])
def tearDown(self):
ha_tools.kill_all_members()
class TestReadWithFailover(unittest.TestCase):
class TestReadWithFailover(HATestCase):
def setUp(self):
res = ha_tools.start_replica_set([{}, {}, {}])
@ -435,11 +430,8 @@ class TestReadWithFailover(unittest.TestCase):
self.assertTrue(iter_cursor(cursor))
self.assertEqual(10, cursor._Cursor__retrieved)
def tearDown(self):
ha_tools.kill_all_members()
class TestReadPreference(unittest.TestCase):
class TestReadPreference(HATestCase):
def setUp(self):
members = [
# primary
@ -784,11 +776,10 @@ class TestReadPreference(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
self.clear_ping_times()
super(TestReadPreference, self).tearDown()
class TestReplicaSetAuth(unittest.TestCase):
class TestReplicaSetAuth(HATestCase):
def setUp(self):
members = [
{},
@ -829,10 +820,10 @@ class TestReplicaSetAuth(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
super(TestReplicaSetAuth, self).tearDown()
class TestAlive(unittest.TestCase):
class TestAlive(HATestCase):
def setUp(self):
members = [{}, {}]
self.seed, self.name = ha_tools.start_replica_set(members)
@ -866,11 +857,8 @@ class TestAlive(unittest.TestCase):
finally:
rsc.close()
def tearDown(self):
ha_tools.kill_all_members()
class TestMongosHighAvailability(unittest.TestCase):
class TestMongosHighAvailability(HATestCase):
def setUp(self):
seed_list = ha_tools.create_sharded_cluster()
self.dbname = 'pymongo_mongos_ha'
@ -910,10 +898,10 @@ class TestMongosHighAvailability(unittest.TestCase):
def tearDown(self):
self.client.drop_database(self.dbname)
ha_tools.kill_all_members()
super(TestMongosHighAvailability, self).tearDown()
class TestReplicaSetRequest(unittest.TestCase):
class TestReplicaSetRequest(HATestCase):
def setUp(self):
members = [{}, {}, {'arbiterOnly': True}]
res = ha_tools.start_replica_set(members)
@ -928,8 +916,9 @@ class TestReplicaSetRequest(unittest.TestCase):
self.assertTrue(self.c.auto_start_request)
self.assertTrue(self.c.in_request())
primary_pool = self.c._MongoReplicaSetClient__members[primary].pool
secondary_pool = self.c._MongoReplicaSetClient__members[secondary].pool
rs_state = self.c._MongoReplicaSetClient__rs_state
primary_pool = rs_state.get(primary).pool
secondary_pool = rs_state.get(secondary).pool
# Trigger start_request on primary pool
utils.assertReadFrom(self, self.c, primary, PRIMARY)
@ -967,7 +956,7 @@ class TestReplicaSetRequest(unittest.TestCase):
def tearDown(self):
self.c.close()
ha_tools.kill_all_members()
super(TestReplicaSetRequest, self).tearDown()
if __name__ == '__main__':

View File

@ -100,7 +100,9 @@ class TestClient(unittest.TestCase, TestRequestMixin):
ConnectionFailure, MongoClient, "%s:1234567" % (host,), port)
def test_repr(self):
self.assertEqual(repr(MongoClient(host, port)),
# Making host a str avoids the 'u' prefix in Python 2, so the repr is
# the same in Python 2 and 3.
self.assertEqual(repr(MongoClient(str(host), port)),
"MongoClient('%s', %d)" % (host, port))
def test_getters(self):

View File

@ -180,7 +180,7 @@ class TestReadPreferences(TestReadPreferencesBase):
not_used = data_members.difference(used)
latencies = ', '.join(
'%s: %dms' % (member.host, member.ping_time.get())
for member in c._MongoReplicaSetClient__members.values())
for member in c._MongoReplicaSetClient__rs_state.members)
self.assertFalse(not_used,
"Expected to use primary and all secondaries for mode NEAREST,"
@ -426,51 +426,24 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
class TestMovingAverage(unittest.TestCase):
def test_empty_moving_average(self):
avg = MovingAverage(0)
self.assertEqual(None, avg.get())
avg.update(10)
self.assertEqual(None, avg.get())
def test_empty_init(self):
self.assertRaises(AssertionError, MovingAverage, [])
def test_trivial_moving_average(self):
avg = MovingAverage(1)
self.assertEqual(None, avg.get())
avg.update(10)
def test_moving_average(self):
avg = MovingAverage([10])
self.assertEqual(10, avg.get())
avg.update(20)
self.assertEqual(20, avg.get())
avg.update(0)
self.assertEqual(0, avg.get())
def test_2_sample_moving_average(self):
avg = MovingAverage(2)
self.assertEqual(None, avg.get())
avg.update(10)
self.assertEqual(10, avg.get())
avg.update(20)
self.assertEqual(15, avg.get())
avg.update(30)
self.assertEqual(25, avg.get())
avg.update(-100)
self.assertEqual(-35, avg.get())
def test_5_sample_moving_average(self):
avg = MovingAverage(5)
self.assertEqual(None, avg.get())
avg.update(10)
self.assertEqual(10, avg.get())
avg.update(20)
self.assertEqual(15, avg.get())
avg.update(30)
self.assertEqual(20, avg.get())
avg.update(-100)
self.assertEqual((10 + 20 + 30 - 100) / 4, avg.get())
avg.update(17)
self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg.get())
avg.update(43)
self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg.get())
avg.update(-1111)
self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg.get())
avg2 = avg.clone_with(20)
self.assertEqual(15, avg2.get())
avg3 = avg2.clone_with(30)
self.assertEqual(20, avg3.get())
avg4 = avg3.clone_with(-100)
self.assertEqual((10 + 20 + 30 - 100) / 4., avg4.get())
avg5 = avg4.clone_with(17)
self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg5.get())
avg6 = avg5.clone_with(43)
self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg6.get())
avg7 = avg6.clone_with(-1111)
self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg7.get())
class TestMongosConnection(unittest.TestCase):

View File

@ -14,6 +14,8 @@
"""Test the replica_set_connection module."""
# TODO: anywhere we wait for refresh in tests, consider just refreshing w/ sync
import copy
import datetime
import os
@ -35,6 +37,7 @@ from bson.tz_util import utc
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import ReadPreference
from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from pymongo.mongo_replica_set_client import PRIMARY, SECONDARY, OTHER
from pymongo.mongo_replica_set_client import _partition_node, have_gevent
from pymongo.database import Database
from pymongo.pool import SocketInfo
@ -46,7 +49,7 @@ from pymongo.errors import (AutoReconnect,
from test import version, port, pair
from test.utils import (
delay, assertReadFrom, assertReadFromAll, read_from_which_host,
assertRaisesExactly, TestRequestMixin)
assertRaisesExactly, TestRequestMixin, one)
class TestReplicaSetClientAgainstStandalone(unittest.TestCase):
@ -526,6 +529,77 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
no_timeout.close()
timeout.close()
def test_socket_error_marks_member_down(self):
# A socket error (besides timeout) changes a member's state to "down".
c = self._get_client()
collection = c.pymongo_test.test
collection.insert({}, w=self.w)
previous_writer = c._MongoReplicaSetClient__rs_state.writer
def kill_sockets():
for member in c._MongoReplicaSetClient__rs_state.members:
for socket_info in member.pool.sockets:
socket_info.sock.close()
kill_sockets()
# Query the primary.
self.assertRaises(ConnectionFailure, collection.find_one)
# primary_member returns None if primary is marked "down".
rs_state = c._MongoReplicaSetClient__rs_state
self.assertEqual(None, rs_state.writer)
self.assertFalse(rs_state.get(previous_writer).up)
collection.find_one() # No error, we recovered.
rs_state = c._MongoReplicaSetClient__rs_state
self.assertTrue(rs_state.get(rs_state.writer).up)
kill_sockets()
# Query secondaries. Client marks them "down" as they fail, and tries
# up to 3 of them before raising.
self.assertRaises(
ConnectionFailure,
collection.find_one,
read_preference=SECONDARY)
# Secondaries were either removed from state or marked "down".
rs_state = c._MongoReplicaSetClient__rs_state
for secondary_host in rs_state.secondaries:
self.assertFalse(rs_state.get(secondary_host).up)
def test_timeout_does_not_mark_member_down(self):
# If a query times out, the RS client shouldn't mark the member "down".
c = self._get_client(socketTimeoutMS=1000)
collection = c.pymongo_test.test
collection.insert({}, w=self.w)
# Query the primary.
self.assertRaises(
ConnectionFailure,
collection.find_one,
{'$where': delay(5)})
# primary_member returns None if primary is marked "down".
rs_state = c._MongoReplicaSetClient__rs_state
self.assertTrue(rs_state.primary_member)
collection.find_one() # No error.
# Query the secondary.
self.assertRaises(
ConnectionFailure,
collection.find_one,
{'$where': delay(5)},
read_preference=SECONDARY)
rs_state = c._MongoReplicaSetClient__rs_state
secondary_host = one(rs_state.secondaries)
self.assertTrue(rs_state.get(secondary_host).up)
collection.find_one(read_preference=SECONDARY) # No error.
def test_tz_aware(self):
self.assertRaises(ConfigurationError, MongoReplicaSetClient,
tz_aware='foo', replicaSet=self.name)
@ -586,7 +660,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
self.assertNotEqual(0, cursor.cursor_id)
connection_id = cursor._Cursor__connection_id
writer = c._MongoReplicaSetClient__writer
writer = c._MongoReplicaSetClient__rs_state.writer
if read_pref == ReadPreference.PRIMARY:
msg = "Expected cursor's connection_id to be %s, got %s" % (
writer, connection_id)
@ -687,7 +761,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
# Ensure MongoReplicaSetClient doesn't close socket after it gets an
# error response to getLastError. PYTHON-395.
c = self._get_client(auto_start_request=False)
pool = c._MongoReplicaSetClient__members[self.primary].pool
pool = c._MongoReplicaSetClient__rs_state.get(self.primary).pool
self.assertEqual(1, len(pool.sockets))
old_sock_info = iter(pool.sockets).next()
c.pymongo_test.test.drop()
@ -707,7 +781,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
# error response to getLastError. PYTHON-395.
c = self._get_client(auto_start_request=True)
c.pymongo_test.test.find_one()
pool = c._MongoReplicaSetClient__members[self.primary].pool
pool = c._MongoReplicaSetClient__rs_state.get(self.primary).pool
# Client reserved a socket for this thread
self.assertTrue(isinstance(pool._get_request_state(), SocketInfo))
@ -732,13 +806,13 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
client = self._get_client(auto_start_request=True)
self.assertTrue(client.auto_start_request)
pools = [mongo.pool for mongo in
client._MongoReplicaSetClient__members.values()]
pools = [member.pool for member in
client._MongoReplicaSetClient__rs_state.members]
self.assertInRequestAndSameSock(client, pools)
primary_pool = \
client._MongoReplicaSetClient__members[client.primary].pool
client._MongoReplicaSetClient__rs_state.get(client.primary).pool
# Trigger the RSC to actually start a request on primary pool
client.pymongo_test.test.find_one()
@ -754,7 +828,8 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
pass
secondary = cursor._Cursor__connection_id
secondary_pool = client._MongoReplicaSetClient__members[secondary].pool
rs_state = client._MongoReplicaSetClient__rs_state
secondary_pool = rs_state.get(secondary).pool
self.assertTrue(secondary_pool.in_request())
client.end_request()
@ -767,7 +842,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
client = self._get_client()
pools = [mongo.pool for mongo in
client._MongoReplicaSetClient__members.values()]
client._MongoReplicaSetClient__rs_state.members]
self.assertNotInRequestAndDifferentSock(client, pools)
client.start_request()
@ -780,7 +855,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
client = self._get_client(auto_start_request=True)
try:
pools = [member.pool for member in
client._MongoReplicaSetClient__members.values()]
client._MongoReplicaSetClient__rs_state.members]
self.assertTrue(client.in_request())
# Start and end request - we're still in "outer" original request
@ -823,7 +898,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
client = self._get_client()
try:
pools = [member.pool for member in
client._MongoReplicaSetClient__members.values()]
client._MongoReplicaSetClient__rs_state.members]
self.assertNotInRequestAndDifferentSock(client, pools)
started_request, ended_request = threading.Event(), threading.Event()
@ -862,31 +937,17 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
client.close()
def test_schedule_refresh(self):
# Monitor thread starts waiting for _refresh_interval, 30 seconds
client = self._get_client()
new_rs_state = rs_state = client._MongoReplicaSetClient__rs_state
for host in rs_state.hosts:
new_rs_state = new_rs_state.clone_with_host_down(host, 'error!')
# Reconnect if necessary
client.pymongo_test.test.find_one()
secondaries = client.secondaries
for secondary in secondaries:
client._MongoReplicaSetClient__members[secondary].up = False
client._MongoReplicaSetClient__members[client.primary].up = False
# Wake up monitor thread
client._MongoReplicaSetClient__schedule_refresh()
# Refresh interval is 30 seconds; scheduling a refresh tells the
# monitor thread / greenlet to start a refresh now. We still need to
# sleep a few seconds for it to complete.
time.sleep(5)
for secondary in secondaries:
self.assertTrue(client._MongoReplicaSetClient__members[secondary].up,
"MongoReplicaSetClient didn't detect secondary is up")
self.assertTrue(client._MongoReplicaSetClient__members[client.primary].up,
"MongoReplicaSetClient didn't detect primary is up")
client._MongoReplicaSetClient__rs_state = new_rs_state
client._MongoReplicaSetClient__schedule_refresh(sync=True)
rs_state = client._MongoReplicaSetClient__rs_state
for member in rs_state.members:
self.assertTrue(
member.up, "MongoReplicaSetClient didn't detect member is up")
client.close()

View File

@ -32,9 +32,8 @@ def get_pool(client):
if isinstance(client, MongoClient):
return client._MongoClient__pool
elif isinstance(client, MongoReplicaSetClient):
writer = client._MongoReplicaSetClient__writer
pools = client._MongoReplicaSetClient__members
return pools[writer].pool
rs_state = client._MongoReplicaSetClient__rs_state
return rs_state[rs_state.writer].pool
else:
raise TypeError(str(client))