PYTHON-525 Restart monitor threads after fork.

A reimplementation of PYTHON-549 for PyMongo 3's new MongoClient.
This commit is contained in:
A. Jesse Jiryu Davis 2014-09-08 12:05:27 -04:00
parent fd3154d8a1
commit a801c38d4a
7 changed files with 71 additions and 89 deletions

View File

@ -18,6 +18,7 @@ import random
import threading
import time
from bson.py3compat import itervalues
from pymongo import common
from pymongo.cluster_description import (updated_cluster_description,
CLUSTER_TYPE,
@ -44,7 +45,7 @@ class Cluster(object):
self._servers = {}
def open(self):
"""Start monitoring.
"""Start monitoring, or restart after a fork.
No effect if called multiple times.
"""
@ -204,10 +205,17 @@ class Cluster(object):
return self._description
def _ensure_opened(self):
"""Start monitors. Hold the lock when calling this."""
"""Start monitors, or restart after a fork.
Hold the lock when calling this.
"""
if not self._opened:
self._opened = True
self._update_servers()
else:
# Restart monitors if we forked since previous call.
for server in itervalues(self._servers):
server.open()
def _request_check_all(self):
"""Wake all monitors. Hold the lock when calling this."""

View File

@ -27,7 +27,7 @@ from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
class Monitor(threading.Thread):
class Monitor(object):
def __init__(
self,
server_description,
@ -43,13 +43,31 @@ class Monitor(threading.Thread):
Monitor.
"""
super(Monitor, self).__init__()
self.daemon = True # Python 2.6's way to do setDaemon(True).
self._server_description = server_description
self._cluster = weakref.proxy(cluster)
self._pool = pool
self._settings = cluster_settings
self._stopped = False
self._event = thread_util.Event(self._settings.condition_class)
self._thread = None
def open(self):
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
started = False
try:
started = self._thread and self._thread.is_alive()
except ReferenceError:
# Thread terminated.
pass
if not started:
thread = threading.Thread(target=self.run)
thread.daemon = True
self._thread = weakref.proxy(thread)
thread.start()
def close(self):
"""Disconnect and stop monitoring.
@ -62,6 +80,14 @@ class Monitor(threading.Thread):
# Awake the thread so it notices that _stopped is True.
self.request_check()
def join(self, timeout=None):
if self._thread is not None:
try:
self._thread.join(timeout)
except ReferenceError:
# Thread already terminated.
pass
def request_check(self):
"""If the monitor is sleeping, wake and check the server soon."""
self._event.set()

View File

@ -29,7 +29,11 @@ class Server(object):
self._monitor = monitor
def open(self):
self._monitor.start()
"""Start monitoring, or restart after a fork.
Multiple calls have no effect.
"""
self._monitor.open()
def close(self):
self._monitor.close()

View File

@ -16,11 +16,13 @@
import contextlib
import datetime
import multiprocessing
import os
import threading
import socket
import sys
import time
import traceback
import warnings
sys.path[0:0] = [""]
@ -38,7 +40,8 @@ from pymongo.errors import (AutoReconnect,
InvalidName,
OperationFailure,
CursorNotFound)
from pymongo.server_selectors import writable_server_selector
from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
from pymongo.server_type import SERVER_TYPE
from test import (client_context,
client_knobs,
@ -442,56 +445,38 @@ class TestClient(IntegrationTest, TestRequestMixin):
if sys.platform == "win32":
raise SkipTest("Can't fork on windows")
try:
from multiprocessing import Process, Pipe
except ImportError:
raise SkipTest("No multiprocessing module")
db = self.client.pymongo_test
# Failure occurs if the client is used before the fork
# Ensure a socket is opened before the fork.
db.test.find_one()
db.connection.end_request()
def loop(pipe):
while True:
try:
db.test.insert({"a": "b"})
for _ in db.test.find():
pass
except:
pipe.send(True)
os._exit(1)
def f(pipe):
try:
servers = self.client._cluster.select_servers(
any_server_selector)
cp1, cc1 = Pipe()
cp2, cc2 = Pipe()
# In child, only the thread that called fork() is alive.
assert not any(s._monitor._thread.is_alive()
for s in servers)
p1 = Process(target=loop, args=(cc1,))
p2 = Process(target=loop, args=(cc2,))
db.test.find_one()
p1.start()
p2.start()
wait_until(
lambda: all(s._monitor._thread.is_alive() for s in servers),
"restart monitor threads")
except:
traceback.print_exc() # Aid debugging.
pipe.send(True)
p1.join(1)
p2.join(1)
parent_pipe, child_pipe = multiprocessing.Pipe()
p = multiprocessing.Process(target=f, args=(child_pipe,))
p.start()
p.join(10)
child_pipe.close()
p1.terminate()
p2.terminate()
p1.join()
p2.join()
cc1.close()
cc2.close()
# recv will only have data if the subprocess failed
# Pipe will only have data if the child process failed.
try:
cp1.recv()
self.fail()
except EOFError:
pass
try:
cp2.recv()
parent_pipe.recv()
self.fail()
except EOFError:
pass

View File

@ -70,7 +70,7 @@ class MockMonitor(object):
self._server_description = server_description
self._cluster = cluster
def start(self):
def open(self):
pass
def request_check(self):

View File

@ -69,7 +69,7 @@ class MockMonitor(object):
self._server_description = server_description
self._cluster = cluster
def start(self):
def open(self):
pass
def request_check(self):

View File

@ -520,47 +520,6 @@ class TestReplicaSetClient(TestReplicaSetClientBase, TestRequestMixin):
self.assertRaises(InvalidOperation, c.db.collection.find_one)
self.assertRaises(InvalidOperation, c.db.collection.insert, {})
def test_fork(self):
# After a fork the monitor thread is gone.
# Verify that schedule_refresh restarts it.
if sys.platform == "win32":
raise SkipTest("Can't fork on Windows")
try:
from multiprocessing import Process, Pipe
except ImportError:
raise SkipTest("No multiprocessing module")
client = client_context.rs_client
def f(pipe):
try:
# Trigger a refresh.
self.assertFalse(
client._MongoReplicaSetClient__monitor.isAlive())
client.disconnect()
self.assertSoon(
lambda: client._MongoReplicaSetClient__monitor.isAlive())
client.db.collection.find_one() # No error.
except:
traceback.print_exc()
pipe.send(True)
cp, cc = Pipe()
p = Process(target=f, args=(cc,))
p.start()
p.join(10)
cc.close()
# recv will only have data if the subprocess failed
try:
cp.recv()
self.fail()
except EOFError:
pass
def test_document_class(self):
c = client_context.rs_client
db = c.pymongo_test