PYTHON-2328 Reset the connection pool in Topology.on_change (#470)

PYTHON-2304 Ensure _RttMonitor closes socket on when the client is closed
This commit is contained in:
Shane Harvey 2020-07-27 13:27:05 -07:00 committed by GitHub
parent c16b5b95a1
commit b04e3343cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 18 deletions

View File

@ -177,7 +177,10 @@ class Monitor(MonitorBase):
# discover that we've been cancelled.
self._executor.skip_sleep()
return
self._topology.on_change(self._server_description)
# Update the Topology and clear the server pool on error.
self._topology.on_change(self._server_description,
reset_pool=self._server_description.error)
if (self._server_description.is_server_type_known and
self._server_description.topology_version):
@ -185,12 +188,9 @@ class Monitor(MonitorBase):
# Immediately check for the next streaming response.
self._executor.skip_sleep()
if self._server_description.error:
# Reset the server pool only after marking the server Unknown.
self._topology.reset_pool(self._server_description.address)
if prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
if self._server_description.error and prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
except ReferenceError:
# Topology was garbage-collected.
self.close()
@ -377,6 +377,8 @@ class _RttMonitor(MonitorBase):
def _ping(self):
"""Run an "isMaster" command and return the RTT."""
with self._pool.get_socket({}) as sock_info:
if self._executor._stopped:
raise Exception('_RttMonitor closed')
start = _time()
sock_info.ismaster()
return _time() - start

View File

@ -265,7 +265,7 @@ class Topology(object):
server_selection_timeout,
address)
def _process_change(self, server_description):
def _process_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription on an opened topology.
Hold the lock when calling this.
@ -303,10 +303,16 @@ class Topology(object):
SRV_POLLING_TOPOLOGIES):
self._srv_monitor.close()
# Clear the pool from a failed heartbeat.
if reset_pool:
server = self._servers.get(server_description.address)
if server:
server.pool.reset()
# Wake waiters in select_servers().
self._condition.notify_all()
def on_change(self, server_description):
def on_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription after an ismaster call completes."""
# We do no I/O holding the lock.
with self._lock:
@ -320,7 +326,7 @@ class Topology(object):
# that didn't include this server.
if (self._opened and
self._description.has_server(server_description.address)):
self._process_change(server_description)
self._process_change(server_description, reset_pool)
def _process_srv_update(self, seedlist):
"""Process a new seedlist on an opened topology.
@ -414,20 +420,14 @@ class Topology(object):
self._request_check_all()
self._condition.wait(wait_time)
def reset_pool(self, address):
with self._lock:
server = self._servers.get(address)
if server:
server.pool.reset()
def handle_getlasterror(self, address, error_msg):
"""Clear our pool for a server, mark it Unknown, and check it soon."""
error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg})
with self._lock:
server = self._servers.get(address)
if server:
self._process_change(ServerDescription(address, error=error))
server.pool.reset()
self._process_change(
ServerDescription(address, error=error), True)
server.request_check()
def update_pool(self, all_credentials):