Fix replica set monitor issues PYTHON-365.

Note: In CPython the monitor is no longer a daemon thread. This means that
care must be taken in the interactive shell and simple scripts to set
ReplicaSetConnection references to None so the monitor dies.

The monitor is still a daemon thread in Jython and PyPy.
This commit is contained in:
behackett 2012-07-03 12:56:01 -07:00
parent d21a98aeda
commit 1fe6029c5d
3 changed files with 70 additions and 65 deletions

View File

@ -34,6 +34,7 @@ attribute-style access:
import datetime
import socket
import struct
import sys
import threading
import time
import warnings
@ -73,6 +74,33 @@ def _partition_node(node):
return host, port
class Monitor(threading.Thread):
"""Thread based replica set monitor.
"""
def __init__(self, rsc, interval=5):
super(Monitor, self).__init__()
self.event = threading.Event()
def shutdown(dummy):
"""Weakref callback to kill the thread.
"""
self.event.set()
self.rsc = weakref.proxy(rsc, shutdown)
self.interval = interval
def run(self):
"""Run until the RSC is collected or an
unexpected error occurs.
"""
while not self.event.isSet():
try:
self.rsc.refresh()
except AutoReconnect:
pass
except:
break
self.event.wait(self.interval)
have_gevent = False
try:
import gevent
@ -80,12 +108,19 @@ try:
have_gevent = True
class GreenletMonitor(Greenlet):
"""Greenlet based replica set monitor.
"""
def __init__(self, obj, interval=5):
Greenlet.__init__(self)
self.obj = weakref.proxy(obj)
self.interval = interval
def _run(self):
"""Run until RSC is collected.
Note this method is called _run following
the gevent docs.
"""
while True:
try:
self.obj.refresh()
@ -211,7 +246,6 @@ class ReplicaSetConnection(common.BaseObject):
self.__pools = {}
self.__index_cache = {}
self.__auth_credentials = {}
self.__done = False
self.__max_pool_size = common.validate_positive_integer(
'max_pool_size', max_pool_size)
@ -285,14 +319,6 @@ class ReplicaSetConnection(common.BaseObject):
self.refresh()
if self.__opts.get('use_greenlets', False):
monitor = GreenletMonitor(self)
else:
monitor = threading.Thread(target=self.__refresh_loop)
monitor.setName("ReplicaSetMonitorThread")
monitor.setDaemon(True)
monitor.start()
if db_name and username is None:
warnings.warn("must provide a username and password "
"to authenticate to %s" % (db_name,))
@ -301,24 +327,23 @@ class ReplicaSetConnection(common.BaseObject):
if not self[db_name].authenticate(username, password):
raise ConfigurationError("authentication failed")
def __del__(self):
"""Shutdown the monitor thread.
"""
self.__done = True
# Start the monitor after we know the configuration is correct.
if self.__opts.get('use_greenlets', False):
monitor = GreenletMonitor(self)
else:
# NOTE: Don't ever make this a daemon thread in CPython. Daemon
# threads in CPython cause serious issues when the interpreter is
# torn down. Symptoms range from random exceptions to the
# interpreter dumping core.
monitor = Monitor(self)
monitor.setName("ReplicaSetMonitorThread")
# Sadly, weakrefs aren't totally reliable in PyPy and Jython
# so use a daemon thread there.
if (sys.platform.startswith('java') or
'PyPy' in sys.version):
monitor.setDaemon(True)
monitor.start()
def __refresh_loop(self):
"""Refresh loop used in the standard monitor thread.
"""
while True:
if not self.__done:
try:
self.refresh()
# Catch literally everything here to avoid
# exceptions when the interpreter shuts down.
except:
pass
if time:
time.sleep(5)
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
@ -525,19 +550,19 @@ class ReplicaSetConnection(common.BaseObject):
def __is_master(self, host):
"""Directly call ismaster.
"""
pool = self.pool_class(host, self.__max_pool_size,
self.__net_timeout, self.__conn_timeout,
self.__use_ssl)
sock_info = pool.get_socket()
mpool = self.pool_class(host, self.__max_pool_size,
self.__net_timeout, self.__conn_timeout,
self.__use_ssl)
sock_info = mpool.get_socket()
try:
response = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
pool.maybe_return_socket(sock_info)
return response, pool
mpool.maybe_return_socket(sock_info)
return response, mpool
except (ConnectionFailure, socket.error):
pool.discard_socket(sock_info)
mpool.discard_socket(sock_info)
raise
def __update_pools(self):
@ -589,7 +614,7 @@ class ReplicaSetConnection(common.BaseObject):
{'ismaster': 1})
mongo['pool'].maybe_return_socket(sock_info)
else:
response, conn = self.__is_master(node)
response, _ = self.__is_master(node)
# Check that this host is part of the given replica set.
set_name = response.get('setName')
@ -683,12 +708,12 @@ class ReplicaSetConnection(common.BaseObject):
def __socket(self, mongo):
"""Get a SocketInfo from the pool.
"""
pool = mongo['pool']
mpool = mongo['pool']
if self.__auto_start_request:
# No effect if a request already started
self.start_request()
sock_info = pool.get_socket()
sock_info = mpool.get_socket()
if self.__auth_credentials:
self.__check_auth(sock_info)

View File

@ -315,31 +315,6 @@ class TestConnection(TestConnectionReplicaSetBase):
self.assertRaises(TypeError, iterate)
connection.close()
# TODO this test is probably very dependent on the machine it's running on
# due to timing issues, but I want to get something in here.
def test_low_network_timeout(self):
c = None
i = 0
n = 10
while c is None and i < n:
try:
c = self._get_connection(socketTimeoutMS=0.1)
except AutoReconnect:
i += 1
if i == n:
raise SkipTest()
coll = c.pymongo_test.test
for _ in xrange(1000):
try:
coll.find_one()
except AutoReconnect:
pass
except AssertionError:
self.fail()
c.close()
def test_close(self):
c = self._get_connection()
coll = c.foo.bar

View File

@ -232,7 +232,9 @@ class BaseTestThreads(object):
self.db = self._get_connection().pymongo_test
def tearDown(self):
pass
# Clear connection reference so that RSC's monitor thread
# dies.
self.db = None
def _get_connection(self):
"""
@ -329,7 +331,6 @@ class BaseTestThreads(object):
#
# If we've fixed PYTHON-345, then only one AutoReconnect is raised,
# and all the threads get new request sockets.
cx = self.db.connection
self.assertTrue(cx.auto_start_request)
collection = self.db.pymongo_test
@ -401,9 +402,10 @@ class BaseTestThreadsAuth(object):
return get_connection()
def setUp(self):
self.conn = self._get_connection()
if not server_started_with_auth(self.conn):
conn = self._get_connection()
if not server_started_with_auth(conn):
raise SkipTest("Authentication is not enabled on server")
self.conn = conn
self.conn.admin.system.users.remove({})
self.conn.admin.add_user('admin-user', 'password')
self.conn.admin.authenticate("admin-user", "password")
@ -415,6 +417,9 @@ class BaseTestThreadsAuth(object):
self.conn.admin.authenticate("admin-user", "password")
self.conn.admin.system.users.remove({})
self.conn.auth_test.system.users.remove({})
# Clear connection reference so that RSC's monitor thread
# dies.
self.conn = None
def test_auto_auth_login(self):
conn = self._get_connection()