diff --git a/pymongo/mongo_replica_set_client.py b/pymongo/mongo_replica_set_client.py index b33cc6667..cc25aa303 100644 --- a/pymongo/mongo_replica_set_client.py +++ b/pymongo/mongo_replica_set_client.py @@ -106,13 +106,171 @@ def _partition_node(node): return host, port +# Concurrency notes: A MongoReplicaSetClient keeps its view of the replica-set +# state in an RSState instance. RSStates are immutable, except for +# host-pinning. Pools, which are internally thread / greenlet safe, can be +# copied from old to new RSStates safely. The client updates its view of the +# set's state not by modifying its RSState but by replacing it with an updated +# copy. + +# In __init__, MongoReplicaSetClient gets a list of potential members called +# 'seeds' from its initial parameters, and calls refresh(). refresh() iterates +# over the the seeds in arbitrary order looking for a member it can connect to. +# Once it finds one, it calls 'ismaster' and sets self.__hosts to the list of +# members in the response, and connects to the rest of the members. refresh() +# sets the MongoReplicaSetClient's RSState. Finally, __init__ launches the +# replica-set monitor. + +# The monitor calls refresh() every 30 seconds, or whenever the client has +# encountered an error that prompts it to wake the monitor. + +# Every method that accesses the RSState multiple times within the method makes +# a local reference first and uses that throughout, so it's isolated from a +# concurrent method replacing the RSState with an updated copy. This technique +# avoids the need to lock around accesses to the RSState. + + +class RSState(object): + def __init__( + self, threadlocal, host_to_member=None, arbiters=None, writer=None, + error_message='No primary available'): + """An immutable snapshot of the client's view of the replica set state. + + :Parameters: + - `threadlocal`: Thread- or greenlet-local storage + - `host_to_member`: Optional dict: (host, port) -> Member instance + - `arbiters`: Optional sequence of arbiters as (host, port) + - `writer`: Optional (host, port) of primary + - `error_message`: Optional error if `writer` is None + """ + self._threadlocal = threadlocal # threading.local or gevent local + self._arbiters = frozenset(arbiters or []) # set of (host, port) + self._writer = writer # (host, port) of the primary, or None + self._error_message = error_message + self._host_to_member = host_to_member or {} + self._hosts = frozenset(self._host_to_member) + self._members = frozenset(self._host_to_member.values()) + + if writer and self._host_to_member[writer].up: + self._primary_member = self._host_to_member[writer] + else: + self._primary_member = None + + def clone_with_host_down(self, host, error_message): + """Get a clone, marking as "down" the member with the given (host, port) + """ + members = self._host_to_member.copy() + down_member = members.pop(host, None) + if down_member: + members[host] = down_member.clone_down() + + if host == self.writer: + # The primary went down; record the error message. + return RSState( + self._threadlocal, members, self._arbiters, + None, error_message) + else: + # Some other host went down. Keep our current primary or, if it's + # already down, keep our current error message. + return RSState( + self._threadlocal, members, self._arbiters, + self._writer, self._error_message) + + def clone_without_writer(self, threadlocal): + """Get a clone without a primary. Unpins all threads. + + :Parameters: + - `threadlocal`: Thread- or greenlet-local storage + """ + return RSState( + threadlocal, self._host_to_member.copy(), self._arbiters, None) + + @property + def arbiters(self): + """Set of (host, port) pairs.""" + return self._arbiters + + @property + def writer(self): + """(host, port) of primary, or None.""" + return self._writer + + @property + def primary_member(self): + return self._primary_member + + @property + def hosts(self): + """Set of (host, port) tuples of data members of the replica set.""" + return self._hosts + + @property + def members(self): + """Set of Member instances.""" + return self._members + + @property + def error_message(self): + """The error, if any, raised when trying to connect to the primary""" + return self._error_message + + @property + def secondaries(self): + """Set of (host, port) pairs.""" + # Unlike the other properties, this isn't cached because it isn't used + # in regular operations. + return set([ + host for host, member in self._host_to_member.items() + if member.is_secondary]) + + def get(self, host): + """Return a Member instance or None for the given (host, port).""" + return self._host_to_member.get(host) + + def pin_host(self, host, mode, tag_sets, latency): + """Pin this thread / greenlet to a member. + + `host` is a (host, port) pair. The remaining parameters are a read + preference. + """ + # Fun fact: Unlike in thread_util.ThreadIdent, we needn't lock around + # assignment here. Assignment to a threadlocal is only unsafe if it + # can cause other Python code to run implicitly. + self._threadlocal.host = host + self._threadlocal.read_preference = (mode, tag_sets, latency) + + def keep_pinned_host(self, mode, tag_sets, latency): + """Does a read pref match the last used by this thread / greenlet?""" + return self._threadlocal.read_preference == (mode, tag_sets, latency) + + @property + def pinned_host(self): + """The (host, port) last used by this thread / greenlet, or None.""" + return getattr(self._threadlocal, 'host', None) + + def unpin_host(self): + """Forget this thread / greenlet's last used member.""" + self._threadlocal.host = self._threadlocal.read_preference = None + + @property + def threadlocal(self): + return self._threadlocal + + def __str__(self): + return '' % ( + ', '.join(str(member) for member in self._host_to_member.itervalues()), + self.writer and '%s:%s' % self.writer or None) + + class Monitor(object): """Base class for replica set monitors. """ _refresh_interval = 30 + def __init__(self, rsc, event_class): self.rsc = weakref.proxy(rsc, self.shutdown) self.event = event_class() + self.refreshed = event_class() self.stopped = False def shutdown(self, dummy=None): @@ -124,8 +282,14 @@ class Monitor(object): def schedule_refresh(self): """Refresh immediately """ + self.refreshed.clear() self.event.set() + def wait_for_refresh(self, timeout_seconds): + """Block until a scheduled refresh completes + """ + self.refreshed.wait(timeout_seconds) + def monitor(self): """Run until the RSC is collected or an unexpected error occurs. @@ -135,10 +299,15 @@ class Monitor(object): if self.stopped: break self.event.clear() + try: - self.rsc.refresh() + try: + self.rsc.refresh() + finally: + self.refreshed.set() except AutoReconnect: pass + # RSC has been collected or there # was an unexpected error. except: @@ -188,18 +357,25 @@ except ImportError: class Member(object): - """Represent one member of a replica set + """Immutable representation of one member of a replica set. + + :Parameters: + - `host`: A (host, port) pair + - `connection_pool`: A Pool instance + - `ismaster_response`: A dict, MongoDB's ismaster response + - `ping_time`: A MovingAverage instance + - `up`: Whether we think this member is available """ # For unittesting only. Use under no circumstances! _host_to_ping_time = {} - def __init__(self, host, ismaster_response, ping_time, connection_pool): + def __init__(self, host, connection_pool, ismaster_response, ping_time, up): self.host = host self.pool = connection_pool - self.ping_time = MovingAverage(5) - self.update(ismaster_response, ping_time) + self.ismaster_response = ismaster_response + self.ping_time = ping_time + self.up = up - def update(self, ismaster_response, ping_time): if ismaster_response['ismaster']: self.state = PRIMARY elif ismaster_response.get('secondary'): @@ -207,11 +383,22 @@ class Member(object): else: self.state = OTHER + self.tags = ismaster_response.get('tags', {}) self.max_bson_size = ismaster_response.get( 'maxBsonObjectSize', MAX_BSON_SIZE) - self.tags = ismaster_response.get('tags', {}) - self.record_ping_time(ping_time) - self.up = True + + def clone_with(self, ismaster_response, ping_time_sample): + """Get a clone updated with ismaster response and a single ping time. + """ + ping_time = self.ping_time.clone_with(ping_time_sample) + return Member(self.host, self.pool, ismaster_response, ping_time, True) + + def clone_down(self): + """Get a clone of this Member, but with up=False. + """ + return Member( + self.host, self.pool, self.ismaster_response, self.ping_time, + False) @property def is_primary(self): @@ -222,7 +409,7 @@ class Member(object): return self.state == SECONDARY def get_avg_ping_time(self): - """Get a moving average of this member's ping times + """Get a moving average of this member's ping times. """ if self.host in Member._host_to_ping_time: # Simulate ping times for unittesting @@ -230,9 +417,6 @@ class Member(object): return self.ping_time.get() - def record_ping_time(self, ping_time): - self.ping_time.update(ping_time) - def matches_mode(self, mode): if mode == ReadPreference.PRIMARY and not self.is_primary: return False @@ -265,6 +449,10 @@ class Member(object): return False + def __str__(self): + return '' % ( + self.host[0], self.host[1], self.is_primary, self.up) + class MongoReplicaSetClient(common.BaseObject): """Connection to a MongoDB replica set. @@ -407,10 +595,6 @@ class MongoReplicaSetClient(common.BaseObject): """ self.__opts = {} self.__seeds = set() - self.__hosts = None - self.__arbiters = set() - self.__writer = None - self.__members = {} self.__index_cache = {} self.__auth_credentials = {} @@ -458,13 +642,14 @@ class MongoReplicaSetClient(common.BaseObject): "The gevent module is not available. " "Install the gevent package from PyPI.") + self.__rs_state = RSState(self.__make_threadlocal()) + self.__request_counter = thread_util.Counter(self.__use_greenlets) self.__auto_start_request = self.__opts.get('auto_start_request', False) if self.__auto_start_request: self.start_request() - self.__reset_pinned_hosts() self.__name = self.__opts.get('replicaset') if not self.__name: raise ConfigurationError("the replicaSet " @@ -479,7 +664,7 @@ class MongoReplicaSetClient(common.BaseObject): self.__ssl_ca_certs = self.__opts.get('ssl_ca_certs', None) ssl_kwarg_keys = [k for k in kwargs.keys() if k.startswith('ssl_')] - if self.__use_ssl == False and ssl_kwarg_keys: + if not self.__use_ssl and ssl_kwarg_keys: raise ConfigurationError("ssl has not been enabled but the " "following ssl parameters have been set: " "%s. Please set `ssl=True` or remove." @@ -610,8 +795,13 @@ class MongoReplicaSetClient(common.BaseObject): 'to this database. You must logout first.') # Try to authenticate even during failover. - members = self.__members.copy().values() - member = select_member(members, ReadPreference.PRIMARY_PREFERRED) + member = select_member( + self.__rs_state.members, ReadPreference.PRIMARY_PREFERRED) + + if not member: + raise AutoReconnect( + "No replica set members available for authentication") + sock_info = self.__socket(member) try: # Since __check_auth was called in __socket @@ -650,6 +840,8 @@ class MongoReplicaSetClient(common.BaseObject): @property def seeds(self): """The seed list used to connect to this replica set. + + A sequence of (host, port) pairs. """ return self.__seeds @@ -658,34 +850,38 @@ class MongoReplicaSetClient(common.BaseObject): """All active and passive (priority 0) replica set members known to this client. This does not include hidden or slaveDelay members, or arbiters. + + A sequence of (host, port) pairs. """ - return self.__hosts + return self.__rs_state.hosts @property def primary(self): - """The current primary of the replica set. + """The (host, port) of the current primary of the replica set. Returns None if there is no primary. """ - return self.__writer + return self.__rs_state.writer @property def secondaries(self): """The secondary members known to this client. + + A sequence of (host, port) pairs. """ - return set([ - host for host, member in self.__members.items() - if member.is_secondary]) + return self.__rs_state.secondaries @property def arbiters(self): """The arbiters known to this client. + + A sequence of (host, port) pairs. """ - return self.__arbiters + return self.__rs_state.arbiters @property def is_mongos(self): - """If this instance is connected to mongos (always False) + """If this instance is connected to mongos (always False). .. versionadded:: 2.3 """ @@ -738,8 +934,9 @@ class MongoReplicaSetClient(common.BaseObject): accepts in bytes. Defaults to 4MB in server < 1.7.4. Returns 0 if no primary is available. """ - if self.__writer: - return self.__members[self.__writer].max_bson_size + rs_state = self.__rs_state + if rs_state.primary_member: + return rs_state.primary_member.max_bson_size return 0 @property @@ -798,89 +995,66 @@ class MongoReplicaSetClient(common.BaseObject): connection_pool.discard_socket(sock_info) raise - def __update_pools(self): - """Update the mapping of (host, port) pairs to connection pools. + def __schedule_refresh(self, sync=False): + """Awake the monitor to update our view of the replica set's state. + + If `sync` is True, block until the refresh completes. + + If multiple application threads call __schedule_refresh while refresh + is in progress, the work of refreshing the state is only performed + once. """ - primary = None - for host in self.__hosts: - member, sock_info = None, None - try: - if host in self.__members: - member = self.__members[host] - sock_info = self.__socket(member) - res, ping_time = self.__simple_command( - sock_info, 'admin', {'ismaster': 1}) - member.pool.maybe_return_socket(sock_info) - member.update(res, ping_time) - else: - res, connection_pool, ping_time = self.__is_master(host) - self.__members[host] = Member( - host=host, - ismaster_response=res, - ping_time=ping_time, - connection_pool=connection_pool) - except (ConnectionFailure, socket.error): - if member: - member.pool.discard_socket(sock_info) - self.__members.pop(member.host, None) - continue - - if res['ismaster']: - primary = host - - if primary != self.__writer: - self.__reset_pinned_hosts() - - self.__writer = primary - - def __schedule_refresh(self): self.__monitor.schedule_refresh() + if sync: + self.__monitor.wait_for_refresh(timeout_seconds=5) - def __pin_host(self, host, mode, tag_sets, latency): - # After first successful read in a request, continue reading from same - # member until read preferences change, host goes down, or - # end_request(). This offers a small assurance that reads won't jump - # around in time. - self.__threadlocal.host = host - self.__threadlocal.read_preference = (mode, tag_sets, latency) - - def __keep_pinned_host(self, mode, tag_sets, latency): - # If read preferences have changed, return False - return getattr(self.__threadlocal, 'read_preference', None) == ( - mode, tag_sets, latency) - - def __pinned_host(self): - return getattr(self.__threadlocal, 'host', None) - - def __unpin_host(self): - self.__threadlocal.host = self.__threadlocal.read_preference = None - - def __reset_pinned_hosts(self): + def __make_threadlocal(self): if self.__use_greenlets: - self.__threadlocal = gevent_local() + return gevent_local() else: - self.__threadlocal = threading.local() + return threading.local() def refresh(self): """Iterate through the existing host list, or possibly the seed list, to update the list of hosts and arbiters in this replica set. """ + # Only one thread / greenlet calls refresh() at a time: the one + # running __init__() or the monitor. We won't modify the state, only + # replace it at the end. + rs_state = self.__rs_state errors = [] - nodes = self.__hosts or self.__seeds + if rs_state.hosts: + # Try first those hosts we think are up, then the down ones. + nodes = sorted( + rs_state.hosts, key=lambda host: rs_state.get(host).up) + else: + nodes = self.__seeds + hosts = set() + # This will become the new RSState. + members = {} + arbiters = set() + writer = None + + # Look for first member from which we can get a list of all members. for node in nodes: - member, sock_info = None, None + member, sock_info = rs_state.get(node), None try: - if node in self.__members: - member = self.__members[node] + if member: sock_info = self.__socket(member) - response, _ = self.__simple_command( + response, ping_time = self.__simple_command( sock_info, 'admin', {'ismaster': 1}) member.pool.maybe_return_socket(sock_info) + new_member = member.clone_with(response, ping_time) else: - response, _, _ = self.__is_master(node) + response, pool, ping_time = self.__is_master(node) + new_member = Member( + node, pool, response, MovingAverage([ping_time]), True) + + if response['ismaster']: + writer = node # Check that this host is part of the given replica set. set_name = response.get('setName') @@ -893,86 +1067,87 @@ class MongoReplicaSetClient(common.BaseObject): "replica set %s" % (host, port, self.__name)) if "arbiters" in response: - self.__arbiters = set([_partition_node(h) - for h in response["arbiters"]]) + arbiters = set([ + _partition_node(h) for h in response["arbiters"]]) if "hosts" in response: hosts.update([_partition_node(h) for h in response["hosts"]]) if "passives" in response: hosts.update([_partition_node(h) for h in response["passives"]]) + + # Start off the new 'members' dict with this member. + members[node] = new_member except (ConnectionFailure, socket.error), why: if member: member.pool.discard_socket(sock_info) errors.append("%s:%d: %s" % (node[0], node[1], str(why))) if hosts: - self.__hosts = hosts break else: if errors: raise AutoReconnect(', '.join(errors)) raise ConfigurationError('No suitable hosts found') - self.__update_pools() + # Ensure we have a pool for each member, and find the primary. + for host in hosts: + if host in members: + # This member was the first we connected to, in the loop above. + continue - def __check_is_primary(self, host): - """Checks if this host is the primary for the replica set. - """ - member, sock_info = None, None - try: - if host in self.__members: - member = self.__members[host] - sock_info = self.__socket(member) - res, ping_time = self.__simple_command( - sock_info, 'admin', {'ismaster': 1} - ) - else: - res, connection_pool, ping_time = self.__is_master(host) - self.__members[host] = Member( - host=host, - ismaster_response=res, - ping_time=ping_time, - connection_pool=connection_pool) - except (ConnectionFailure, socket.error), why: - if member: - member.pool.discard_socket(sock_info) - raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why))) - - if member and sock_info: - member.pool.maybe_return_socket(sock_info) - - if res["ismaster"]: - return host - elif "primary" in res: - candidate = _partition_node(res["primary"]) - # Don't report the same connect failure multiple times. + member, sock_info = rs_state.get(host), None try: - return self.__check_is_primary(candidate) + if member: + sock_info = self.__socket(member) + res, ping_time = self.__simple_command( + sock_info, 'admin', {'ismaster': 1}) + member.pool.maybe_return_socket(sock_info) + new_member = member.clone_with(res, ping_time) + else: + res, connection_pool, ping_time = self.__is_master(host) + new_member = Member( + host, connection_pool, res, MovingAverage([ping_time]), + True) + + members[host] = new_member + except (ConnectionFailure, socket.error): - pass - raise AutoReconnect('%s:%d: not primary' % host) + if member: + member.pool.discard_socket(sock_info) + continue + + if res['ismaster']: + writer = host + + if writer == rs_state.writer: + threadlocal = self.__rs_state.threadlocal + else: + # We unpin threads from members if the primary has changed, since + # no monotonic consistency can be promised now anyway. + threadlocal = self.__make_threadlocal() + + # Replace old state with new. + self.__rs_state = RSState(threadlocal, members, arbiters, writer) def __find_primary(self): """Returns a connection to the primary of this replica set, - if one exists. + if one exists, or raises AutoReconnect. """ - if self.__writer: - primary = self.__members[self.__writer] - if primary.up: - return primary + primary = self.__rs_state.primary_member + if primary: + return primary - # This is either the first connection or we had a failover. - self.refresh() + # We had a failover. + self.__schedule_refresh(sync=True) + + # Try again. This time copy the RSState reference so we're guaranteed + # primary_member and error_message are from the same state. + rs_state = self.__rs_state + if rs_state.primary_member: + return rs_state.primary_member - errors = [] - for candidate in self.__hosts: - try: - self.__writer = self.__check_is_primary(candidate) - return self.__members[self.__writer] - except (ConnectionFailure, socket.error), why: - errors.append(str(why)) # Couldn't find the primary. - raise AutoReconnect(', '.join(errors)) + raise AutoReconnect(rs_state.error_message) def __socket(self, member): """Get a SocketInfo from the pool. @@ -990,13 +1165,15 @@ class MongoReplicaSetClient(common.BaseObject): return sock_info def disconnect(self): - """Disconnect from the replica set primary and refresh our view of - the replica set. + """Disconnect from the replica set primary, unpin all members, and + refresh our view of the replica set. """ - member = self.__members.get(self.__writer) - if member: - member.pool.reset() - self.__writer = None + rs_state = self.__rs_state + if rs_state.primary_member: + rs_state.primary_member.pool.reset() + + threadlocal = self.__make_threadlocal() + self.__rs_state = rs_state.clone_without_writer(threadlocal) self.__schedule_refresh() def close(self): @@ -1014,12 +1191,12 @@ class MongoReplicaSetClient(common.BaseObject): The :meth:`close` method now terminates the replica set monitor. """ if self.__monitor: - self.__monitor.shutdown(None) + self.__monitor.shutdown() # Use a reasonable timeout. self.__monitor.join(1.0) self.__monitor = None - self.__writer = None - self.__members = {} + + self.__rs_state = RSState(self.__make_threadlocal()) def alive(self): """Return ``False`` if there has been an error communicating with the @@ -1148,7 +1325,7 @@ class MongoReplicaSetClient(common.BaseObject): if _connection_to_use in (None, -1): member = self.__find_primary() else: - member = self.__members[_connection_to_use] + member = self.__rs_state.get(_connection_to_use) sock_info = None try: @@ -1179,6 +1356,8 @@ class MongoReplicaSetClient(common.BaseObject): def __send_and_receive(self, member, msg, **kwargs): """Send a message on the given socket and return the response data. + + Can raise socket.error. """ sock_info = None try: @@ -1196,13 +1375,8 @@ class MongoReplicaSetClient(common.BaseObject): member.pool.maybe_return_socket(sock_info) return response - except (ConnectionFailure, socket.error), why: - host, port = member.pool.pair - member.pool.discard_socket(sock_info) - raise AutoReconnect("%s:%d: %s" % (host, port, str(why))) except: - if sock_info: - sock_info.close() + member.pool.discard_socket(sock_info) raise def __try_read(self, member, msg, **kwargs): @@ -1211,10 +1385,23 @@ class MongoReplicaSetClient(common.BaseObject): """ try: return self.__send_and_receive(member, msg, **kwargs) - except AutoReconnect: - member.up = False + except socket.timeout, e: + # Could be one slow query, don't refresh. + host, port = member.host + raise AutoReconnect("%s:%d: %s" % (host, port, e)) + except (socket.error, ConnectionFailure), why: + # Try to replace our RSState with a clone where this member is + # marked "down", to reduce exceptions on other threads, or repeated + # exceptions on this thread. We accept that there's a race + # condition (another thread could be replacing our state with a + # different version concurrently) but this approach is simple and + # lock-free. + self.__rs_state = self.__rs_state.clone_with_host_down( + member.host, str(why)) + self.__schedule_refresh() - raise + host, port = member.host + raise AutoReconnect("%s:%d: %s" % (host, port, why)) def _send_message_with_response(self, msg, _connection_to_use=None, _must_use_master=False, **kwargs): @@ -1224,52 +1411,62 @@ class MongoReplicaSetClient(common.BaseObject): :Parameters: - `msg`: (request_id, data) pair making up the message to send + - `_connection_to_use`: Optional (host, port) of member for message, + used by Cursor for getMore and killCursors messages. + - `_must_use_master`: If True, send to primary. """ - - # If we've disconnected since last read, trigger refresh - try: - self.__find_primary() - except ConnectionFailure: - # We'll throw an error later - pass - + rs_state = self.__rs_state tag_sets = kwargs.get('tag_sets', [{}]) mode = kwargs.get('read_preference', ReadPreference.PRIMARY) if _must_use_master: mode = ReadPreference.PRIMARY tag_sets = [{}] + if not rs_state.primary_member: + # Primary was down last we checked. Start a refresh if one is not + # already in progress. If caller requested the primary, wait to + # see if it's up, otherwise continue with known-good members. + sync = (mode == ReadPreference.PRIMARY) + self.__schedule_refresh(sync=sync) + rs_state = self.__rs_state + latency = kwargs.get( 'secondary_acceptable_latency_ms', self.secondary_acceptable_latency_ms) - member = None try: if _connection_to_use is not None: if _connection_to_use == -1: - member = self.__find_primary() + member = rs_state.primary_member + error_message = rs_state.error_message else: - member = self.__members[_connection_to_use] + member = rs_state.get(_connection_to_use) + error_message = '%s:%s not available' % _connection_to_use + + if not member: + raise AutoReconnect(error_message) + return member.pool.pair, self.__try_read( member, msg, **kwargs) except AutoReconnect: - if member == self.__members.get(self.__writer): + if _connection_to_use in (-1, rs_state.writer): + # Primary's down. Refresh. self.disconnect() raise # To provide some monotonic consistency, we use the same member as # long as this thread is in a request and all reads use the same # mode, tags, and latency. The member gets unpinned if pref changes, - # if member changes state, if we detect a failover and call - # __reset_pinned_hosts(), or if this thread calls end_request(). + # if member changes state, if we detect a failover, or if this thread + # calls end_request(). errors = [] - pinned_member = self.__members.get(self.__pinned_host()) + + pinned_host = rs_state.pinned_host + pinned_member = rs_state.get(pinned_host) if (pinned_member - and pinned_member.matches_mode(mode) - and pinned_member.matches_tag_sets(tag_sets) - and pinned_member.up - and self.__keep_pinned_host(mode, tag_sets, latency) - ): + and pinned_member.matches_mode(mode) + and pinned_member.matches_tag_sets(tag_sets) # TODO: REMOVE? + and rs_state.keep_pinned_host(mode, tag_sets, latency)): try: return ( pinned_member.host, @@ -1282,10 +1479,9 @@ class MongoReplicaSetClient(common.BaseObject): errors.append(str(why)) # No pinned member, or pinned member down or doesn't match read pref - self.__unpin_host() - - members = self.__members.copy().values() + rs_state.unpin_host() + members = list(rs_state.members) while len(errors) < MAX_RETRY: member = select_member( members=members, @@ -1306,7 +1502,7 @@ class MongoReplicaSetClient(common.BaseObject): if self.in_request(): # Keep reading from this member in this thread / greenlet # unless read preference changes - self.__pin_host(member.host, mode, tag_sets, latency) + rs_state.pin_host(member.host, mode, tag_sets, latency) return member.host, response except AutoReconnect, why: errors.append(str(why)) @@ -1360,7 +1556,7 @@ class MongoReplicaSetClient(common.BaseObject): # exceed 1. This keeps things sane when we create and delete pools # within a request. if 1 == self.__request_counter.inc(): - for member in self.__members.values(): + for member in self.__rs_state.members: member.pool.start_request() return pool.Request(self) @@ -1386,12 +1582,13 @@ class MongoReplicaSetClient(common.BaseObject): in the middle of a sequence of operations in which ordering is important. This could lead to unexpected results. """ + rs_state = self.__rs_state if 0 == self.__request_counter.dec(): - for member in self.__members.values(): + for member in rs_state.members: # No effect if not in a request member.pool.end_request() - self.__unpin_host() + rs_state.unpin_host() def __eq__(self, other): # XXX: Implement this? @@ -1402,7 +1599,7 @@ class MongoReplicaSetClient(common.BaseObject): def __repr__(self): return "MongoReplicaSetClient(%r)" % (["%s:%d" % n - for n in self.__hosts],) + for n in self.hosts],) def __getattr__(self, name): """Get a database by name. diff --git a/pymongo/read_preferences.py b/pymongo/read_preferences.py index ce5fefd3a..241342f10 100644 --- a/pymongo/read_preferences.py +++ b/pymongo/read_preferences.py @@ -15,8 +15,6 @@ """Utilities for choosing which member of a replica set to read from.""" import random -import threading -from collections import deque from pymongo.errors import ConfigurationError @@ -86,7 +84,6 @@ def mongos_mode(mode): def mongos_enum(enum): return _mongos_modes.index(enum) - def select_primary(members): for member in members: if member.is_primary: @@ -151,6 +148,7 @@ def select_member( return select_primary(members) elif mode == PRIMARY_PREFERRED: + # Recurse. candidate_primary = select_member(members, PRIMARY, [{}], latency) if candidate_primary: return candidate_primary @@ -166,6 +164,7 @@ def select_member( return None elif mode == SECONDARY_PREFERRED: + # Recurse. candidate_secondary = select_member( members, SECONDARY, tag_sets, latency) if candidate_secondary: @@ -196,32 +195,16 @@ secondary_ok_commands = frozenset([ class MovingAverage(object): - """Tracks a moving average. - """ - def __init__(self, window_sz): - self.window_sz = window_sz - self.samples = deque() - self.total = 0 - self.lock = threading.Lock() + def __init__(self, samples): + """Immutable structure to track a 5-sample moving average. + """ + self.samples = samples[-5:] + assert self.samples + self.average = sum(self.samples) / float(len(self.samples)) - def update(self, sample): - # One reason we synchronize MovingAverage is that Jython's - # popleft isn't safe: http://bugs.jython.org/issue2001 - self.lock.acquire() - try: - self.samples.append(sample) - self.total += sample - if len(self.samples) > self.window_sz: - self.total -= self.samples.popleft() - finally: - self.lock.release() + def clone_with(self, sample): + """Get a copy of this instance plus a new sample""" + return MovingAverage(self.samples + [sample]) def get(self): - self.lock.acquire() - try: - if self.samples: - return self.total / float(len(self.samples)) - else: - return None - finally: - self.lock.release() + return self.average diff --git a/test/__init__.py b/test/__init__.py index a6db843ae..51a2e6b8d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -20,14 +20,17 @@ import os import pymongo from pymongo.errors import ConnectionFailure -host = os.environ.get("DB_IP", 'localhost') +# hostnames retrieved by MongoReplicaSetClient from isMaster will be of unicode +# type in Python 2, so ensure these hostnames are unicodes, too. It makes tests +# like `test_repr` predictable. +host = unicode(os.environ.get("DB_IP", 'localhost')) port = int(os.environ.get("DB_PORT", 27017)) pair = '%s:%d' % (host, port) -host2 = os.environ.get("DB_IP2", 'localhost') +host2 = unicode(os.environ.get("DB_IP2", 'localhost')) port2 = int(os.environ.get("DB_PORT2", 27018)) -host3 = os.environ.get("DB_IP3", 'localhost') +host3 = unicode(os.environ.get("DB_IP3", 'localhost')) port3 = int(os.environ.get("DB_PORT3", 27019)) diff --git a/test/high_availability/test_ha.py b/test/high_availability/test_ha.py index 72f772423..e258b46bc 100644 --- a/test/high_availability/test_ha.py +++ b/test/high_availability/test_ha.py @@ -27,14 +27,11 @@ import ha_tools from ha_tools import use_greenlets -from pymongo import (MongoReplicaSetClient, - ReadPreference) -from pymongo.mongo_replica_set_client import ( - Member, Monitor, MongoReplicaSetClient) -from pymongo.mongo_client import _partition_node -from pymongo.mongo_client import MongoClient from pymongo.errors import AutoReconnect, OperationFailure, ConnectionFailure -from pymongo.read_preferences import modes +from pymongo.mongo_replica_set_client import Member, Monitor +from pymongo.mongo_replica_set_client import MongoReplicaSetClient +from pymongo.mongo_client import MongoClient, _partition_node +from pymongo.read_preferences import ReadPreference, modes from test import utils from test.utils import one @@ -52,7 +49,15 @@ SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED NEAREST = ReadPreference.NEAREST -class TestDirectConnection(unittest.TestCase): +class HATestCase(unittest.TestCase): + """A test case for connections to replica sets or mongos.""" + + def tearDown(self): + ha_tools.kill_all_members() + ha_tools.nodes.clear() + + +class TestDirectConnection(HATestCase): def setUp(self): members = [{}, {}, {'arbiterOnly': True}] @@ -145,10 +150,10 @@ class TestDirectConnection(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() + super(TestDirectConnection, self).tearDown() -class TestPassiveAndHidden(unittest.TestCase): +class TestPassiveAndHidden(HATestCase): def setUp(self): members = [{}, @@ -177,10 +182,10 @@ class TestPassiveAndHidden(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() + super(TestPassiveAndHidden, self).tearDown() -class TestMonitorRemovesRecoveringMember(unittest.TestCase): +class TestMonitorRemovesRecoveringMember(HATestCase): # Members in STARTUP2 or RECOVERING states are shown in the primary's # isMaster response, but aren't secondaries and shouldn't be read from. # Verify that if a secondary goes into RECOVERING mode, the Monitor removes @@ -211,10 +216,10 @@ class TestMonitorRemovesRecoveringMember(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() + super(TestMonitorRemovesRecoveringMember, self).tearDown() -class TestTriggeredRefresh(unittest.TestCase): +class TestTriggeredRefresh(HATestCase): # Verify that if a secondary goes into RECOVERING mode or if the primary # changes, the next exception triggers an immediate refresh. @@ -282,14 +287,14 @@ class TestTriggeredRefresh(unittest.TestCase): # We've detected the stepdown self.assertTrue( not c_find_one.primary - or primary != _partition_node(c_find_one.primary)) + or _partition_node(primary) != c_find_one.primary) def tearDown(self): Monitor._refresh_interval = MONITOR_INTERVAL - ha_tools.kill_all_members() + super(TestTriggeredRefresh, self).tearDown() -class TestHealthMonitor(unittest.TestCase): +class TestHealthMonitor(HATestCase): def setUp(self): res = ha_tools.start_replica_set([{}, {}, {}]) @@ -357,17 +362,10 @@ class TestHealthMonitor(unittest.TestCase): ha_tools.stepdown_primary() self.assertTrue(primary_changed()) - - # There can be a delay between finding the primary and updating - # secondaries - sleep(5) self.assertNotEqual(secondaries, c.secondaries) - def tearDown(self): - ha_tools.kill_all_members() - -class TestWritesWithFailover(unittest.TestCase): +class TestWritesWithFailover(HATestCase): def setUp(self): res = ha_tools.start_replica_set([{}, {}, {}]) @@ -398,11 +396,8 @@ class TestWritesWithFailover(unittest.TestCase): self.assertTrue(primary != c.primary) self.assertEqual('baz', db.test.find_one({'bar': 'baz'})['bar']) - def tearDown(self): - ha_tools.kill_all_members() - -class TestReadWithFailover(unittest.TestCase): +class TestReadWithFailover(HATestCase): def setUp(self): res = ha_tools.start_replica_set([{}, {}, {}]) @@ -435,11 +430,8 @@ class TestReadWithFailover(unittest.TestCase): self.assertTrue(iter_cursor(cursor)) self.assertEqual(10, cursor._Cursor__retrieved) - def tearDown(self): - ha_tools.kill_all_members() - -class TestReadPreference(unittest.TestCase): +class TestReadPreference(HATestCase): def setUp(self): members = [ # primary @@ -784,11 +776,10 @@ class TestReadPreference(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() - self.clear_ping_times() + super(TestReadPreference, self).tearDown() -class TestReplicaSetAuth(unittest.TestCase): +class TestReplicaSetAuth(HATestCase): def setUp(self): members = [ {}, @@ -829,10 +820,10 @@ class TestReplicaSetAuth(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() + super(TestReplicaSetAuth, self).tearDown() -class TestAlive(unittest.TestCase): +class TestAlive(HATestCase): def setUp(self): members = [{}, {}] self.seed, self.name = ha_tools.start_replica_set(members) @@ -866,11 +857,8 @@ class TestAlive(unittest.TestCase): finally: rsc.close() - def tearDown(self): - ha_tools.kill_all_members() - -class TestMongosHighAvailability(unittest.TestCase): +class TestMongosHighAvailability(HATestCase): def setUp(self): seed_list = ha_tools.create_sharded_cluster() self.dbname = 'pymongo_mongos_ha' @@ -910,10 +898,10 @@ class TestMongosHighAvailability(unittest.TestCase): def tearDown(self): self.client.drop_database(self.dbname) - ha_tools.kill_all_members() + super(TestMongosHighAvailability, self).tearDown() -class TestReplicaSetRequest(unittest.TestCase): +class TestReplicaSetRequest(HATestCase): def setUp(self): members = [{}, {}, {'arbiterOnly': True}] res = ha_tools.start_replica_set(members) @@ -928,8 +916,9 @@ class TestReplicaSetRequest(unittest.TestCase): self.assertTrue(self.c.auto_start_request) self.assertTrue(self.c.in_request()) - primary_pool = self.c._MongoReplicaSetClient__members[primary].pool - secondary_pool = self.c._MongoReplicaSetClient__members[secondary].pool + rs_state = self.c._MongoReplicaSetClient__rs_state + primary_pool = rs_state.get(primary).pool + secondary_pool = rs_state.get(secondary).pool # Trigger start_request on primary pool utils.assertReadFrom(self, self.c, primary, PRIMARY) @@ -967,7 +956,7 @@ class TestReplicaSetRequest(unittest.TestCase): def tearDown(self): self.c.close() - ha_tools.kill_all_members() + super(TestReplicaSetRequest, self).tearDown() if __name__ == '__main__': diff --git a/test/test_client.py b/test/test_client.py index d838e9693..807869ab2 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -100,7 +100,9 @@ class TestClient(unittest.TestCase, TestRequestMixin): ConnectionFailure, MongoClient, "%s:1234567" % (host,), port) def test_repr(self): - self.assertEqual(repr(MongoClient(host, port)), + # Making host a str avoids the 'u' prefix in Python 2, so the repr is + # the same in Python 2 and 3. + self.assertEqual(repr(MongoClient(str(host), port)), "MongoClient('%s', %d)" % (host, port)) def test_getters(self): diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index b2ca7d3a4..b23c49e28 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -180,7 +180,7 @@ class TestReadPreferences(TestReadPreferencesBase): not_used = data_members.difference(used) latencies = ', '.join( '%s: %dms' % (member.host, member.ping_time.get()) - for member in c._MongoReplicaSetClient__members.values()) + for member in c._MongoReplicaSetClient__rs_state.members) self.assertFalse(not_used, "Expected to use primary and all secondaries for mode NEAREST," @@ -426,51 +426,24 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase): class TestMovingAverage(unittest.TestCase): - def test_empty_moving_average(self): - avg = MovingAverage(0) - self.assertEqual(None, avg.get()) - avg.update(10) - self.assertEqual(None, avg.get()) + def test_empty_init(self): + self.assertRaises(AssertionError, MovingAverage, []) - def test_trivial_moving_average(self): - avg = MovingAverage(1) - self.assertEqual(None, avg.get()) - avg.update(10) + def test_moving_average(self): + avg = MovingAverage([10]) self.assertEqual(10, avg.get()) - avg.update(20) - self.assertEqual(20, avg.get()) - avg.update(0) - self.assertEqual(0, avg.get()) - - def test_2_sample_moving_average(self): - avg = MovingAverage(2) - self.assertEqual(None, avg.get()) - avg.update(10) - self.assertEqual(10, avg.get()) - avg.update(20) - self.assertEqual(15, avg.get()) - avg.update(30) - self.assertEqual(25, avg.get()) - avg.update(-100) - self.assertEqual(-35, avg.get()) - - def test_5_sample_moving_average(self): - avg = MovingAverage(5) - self.assertEqual(None, avg.get()) - avg.update(10) - self.assertEqual(10, avg.get()) - avg.update(20) - self.assertEqual(15, avg.get()) - avg.update(30) - self.assertEqual(20, avg.get()) - avg.update(-100) - self.assertEqual((10 + 20 + 30 - 100) / 4, avg.get()) - avg.update(17) - self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg.get()) - avg.update(43) - self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg.get()) - avg.update(-1111) - self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg.get()) + avg2 = avg.clone_with(20) + self.assertEqual(15, avg2.get()) + avg3 = avg2.clone_with(30) + self.assertEqual(20, avg3.get()) + avg4 = avg3.clone_with(-100) + self.assertEqual((10 + 20 + 30 - 100) / 4., avg4.get()) + avg5 = avg4.clone_with(17) + self.assertEqual((10 + 20 + 30 - 100 + 17) / 5., avg5.get()) + avg6 = avg5.clone_with(43) + self.assertEqual((20 + 30 - 100 + 17 + 43) / 5., avg6.get()) + avg7 = avg6.clone_with(-1111) + self.assertEqual((30 - 100 + 17 + 43 - 1111) / 5., avg7.get()) class TestMongosConnection(unittest.TestCase): diff --git a/test/test_replica_set_client.py b/test/test_replica_set_client.py index 20be233de..45b65297b 100644 --- a/test/test_replica_set_client.py +++ b/test/test_replica_set_client.py @@ -14,6 +14,8 @@ """Test the replica_set_connection module.""" +# TODO: anywhere we wait for refresh in tests, consider just refreshing w/ sync + import copy import datetime import os @@ -35,6 +37,7 @@ from bson.tz_util import utc from pymongo.mongo_client import MongoClient from pymongo.read_preferences import ReadPreference from pymongo.mongo_replica_set_client import MongoReplicaSetClient +from pymongo.mongo_replica_set_client import PRIMARY, SECONDARY, OTHER from pymongo.mongo_replica_set_client import _partition_node, have_gevent from pymongo.database import Database from pymongo.pool import SocketInfo @@ -46,7 +49,7 @@ from pymongo.errors import (AutoReconnect, from test import version, port, pair from test.utils import ( delay, assertReadFrom, assertReadFromAll, read_from_which_host, - assertRaisesExactly, TestRequestMixin) + assertRaisesExactly, TestRequestMixin, one) class TestReplicaSetClientAgainstStandalone(unittest.TestCase): @@ -526,6 +529,77 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): no_timeout.close() timeout.close() + def test_socket_error_marks_member_down(self): + # A socket error (besides timeout) changes a member's state to "down". + c = self._get_client() + collection = c.pymongo_test.test + collection.insert({}, w=self.w) + previous_writer = c._MongoReplicaSetClient__rs_state.writer + + def kill_sockets(): + for member in c._MongoReplicaSetClient__rs_state.members: + for socket_info in member.pool.sockets: + socket_info.sock.close() + + kill_sockets() + + # Query the primary. + self.assertRaises(ConnectionFailure, collection.find_one) + + # primary_member returns None if primary is marked "down". + rs_state = c._MongoReplicaSetClient__rs_state + + self.assertEqual(None, rs_state.writer) + self.assertFalse(rs_state.get(previous_writer).up) + + collection.find_one() # No error, we recovered. + rs_state = c._MongoReplicaSetClient__rs_state + self.assertTrue(rs_state.get(rs_state.writer).up) + + kill_sockets() + + # Query secondaries. Client marks them "down" as they fail, and tries + # up to 3 of them before raising. + self.assertRaises( + ConnectionFailure, + collection.find_one, + read_preference=SECONDARY) + + # Secondaries were either removed from state or marked "down". + rs_state = c._MongoReplicaSetClient__rs_state + for secondary_host in rs_state.secondaries: + self.assertFalse(rs_state.get(secondary_host).up) + + def test_timeout_does_not_mark_member_down(self): + # If a query times out, the RS client shouldn't mark the member "down". + c = self._get_client(socketTimeoutMS=1000) + collection = c.pymongo_test.test + collection.insert({}, w=self.w) + + # Query the primary. + self.assertRaises( + ConnectionFailure, + collection.find_one, + {'$where': delay(5)}) + + # primary_member returns None if primary is marked "down". + rs_state = c._MongoReplicaSetClient__rs_state + self.assertTrue(rs_state.primary_member) + + collection.find_one() # No error. + + # Query the secondary. + self.assertRaises( + ConnectionFailure, + collection.find_one, + {'$where': delay(5)}, + read_preference=SECONDARY) + + rs_state = c._MongoReplicaSetClient__rs_state + secondary_host = one(rs_state.secondaries) + self.assertTrue(rs_state.get(secondary_host).up) + collection.find_one(read_preference=SECONDARY) # No error. + def test_tz_aware(self): self.assertRaises(ConfigurationError, MongoReplicaSetClient, tz_aware='foo', replicaSet=self.name) @@ -586,7 +660,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): self.assertNotEqual(0, cursor.cursor_id) connection_id = cursor._Cursor__connection_id - writer = c._MongoReplicaSetClient__writer + writer = c._MongoReplicaSetClient__rs_state.writer if read_pref == ReadPreference.PRIMARY: msg = "Expected cursor's connection_id to be %s, got %s" % ( writer, connection_id) @@ -687,7 +761,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): # Ensure MongoReplicaSetClient doesn't close socket after it gets an # error response to getLastError. PYTHON-395. c = self._get_client(auto_start_request=False) - pool = c._MongoReplicaSetClient__members[self.primary].pool + pool = c._MongoReplicaSetClient__rs_state.get(self.primary).pool self.assertEqual(1, len(pool.sockets)) old_sock_info = iter(pool.sockets).next() c.pymongo_test.test.drop() @@ -707,7 +781,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): # error response to getLastError. PYTHON-395. c = self._get_client(auto_start_request=True) c.pymongo_test.test.find_one() - pool = c._MongoReplicaSetClient__members[self.primary].pool + pool = c._MongoReplicaSetClient__rs_state.get(self.primary).pool # Client reserved a socket for this thread self.assertTrue(isinstance(pool._get_request_state(), SocketInfo)) @@ -732,13 +806,13 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): client = self._get_client(auto_start_request=True) self.assertTrue(client.auto_start_request) - pools = [mongo.pool for mongo in - client._MongoReplicaSetClient__members.values()] + pools = [member.pool for member in + client._MongoReplicaSetClient__rs_state.members] self.assertInRequestAndSameSock(client, pools) primary_pool = \ - client._MongoReplicaSetClient__members[client.primary].pool + client._MongoReplicaSetClient__rs_state.get(client.primary).pool # Trigger the RSC to actually start a request on primary pool client.pymongo_test.test.find_one() @@ -754,7 +828,8 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): pass secondary = cursor._Cursor__connection_id - secondary_pool = client._MongoReplicaSetClient__members[secondary].pool + rs_state = client._MongoReplicaSetClient__rs_state + secondary_pool = rs_state.get(secondary).pool self.assertTrue(secondary_pool.in_request()) client.end_request() @@ -767,7 +842,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): client = self._get_client() pools = [mongo.pool for mongo in - client._MongoReplicaSetClient__members.values()] + client._MongoReplicaSetClient__rs_state.members] self.assertNotInRequestAndDifferentSock(client, pools) client.start_request() @@ -780,7 +855,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): client = self._get_client(auto_start_request=True) try: pools = [member.pool for member in - client._MongoReplicaSetClient__members.values()] + client._MongoReplicaSetClient__rs_state.members] self.assertTrue(client.in_request()) # Start and end request - we're still in "outer" original request @@ -823,7 +898,7 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): client = self._get_client() try: pools = [member.pool for member in - client._MongoReplicaSetClient__members.values()] + client._MongoReplicaSetClient__rs_state.members] self.assertNotInRequestAndDifferentSock(client, pools) started_request, ended_request = threading.Event(), threading.Event() @@ -862,31 +937,17 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin): client.close() def test_schedule_refresh(self): - # Monitor thread starts waiting for _refresh_interval, 30 seconds client = self._get_client() + new_rs_state = rs_state = client._MongoReplicaSetClient__rs_state + for host in rs_state.hosts: + new_rs_state = new_rs_state.clone_with_host_down(host, 'error!') - # Reconnect if necessary - client.pymongo_test.test.find_one() - - secondaries = client.secondaries - for secondary in secondaries: - client._MongoReplicaSetClient__members[secondary].up = False - - client._MongoReplicaSetClient__members[client.primary].up = False - - # Wake up monitor thread - client._MongoReplicaSetClient__schedule_refresh() - - # Refresh interval is 30 seconds; scheduling a refresh tells the - # monitor thread / greenlet to start a refresh now. We still need to - # sleep a few seconds for it to complete. - time.sleep(5) - for secondary in secondaries: - self.assertTrue(client._MongoReplicaSetClient__members[secondary].up, - "MongoReplicaSetClient didn't detect secondary is up") - - self.assertTrue(client._MongoReplicaSetClient__members[client.primary].up, - "MongoReplicaSetClient didn't detect primary is up") + client._MongoReplicaSetClient__rs_state = new_rs_state + client._MongoReplicaSetClient__schedule_refresh(sync=True) + rs_state = client._MongoReplicaSetClient__rs_state + for member in rs_state.members: + self.assertTrue( + member.up, "MongoReplicaSetClient didn't detect member is up") client.close() diff --git a/test/test_threads.py b/test/test_threads.py index 84397a20f..ceefd9d0c 100644 --- a/test/test_threads.py +++ b/test/test_threads.py @@ -32,9 +32,8 @@ def get_pool(client): if isinstance(client, MongoClient): return client._MongoClient__pool elif isinstance(client, MongoReplicaSetClient): - writer = client._MongoReplicaSetClient__writer - pools = client._MongoReplicaSetClient__members - return pools[writer].pool + rs_state = client._MongoReplicaSetClient__rs_state + return rs_state[rs_state.writer].pool else: raise TypeError(str(client))