MOTOR-1087 motor_asyncio freezes when using multiprocessing (#196)

This commit is contained in:
Julius Park 2023-03-16 11:20:26 -07:00 committed by GitHub
parent f4422ddaaf
commit 935c543919
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 1 deletions

View File

@ -66,6 +66,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs):
if contextvars:
context = contextvars.copy_context()

View File

@ -61,6 +61,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs):
if contextvars:
context = contextvars.copy_context()

View File

@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import test
from abc import ABC
from asyncio import new_event_loop, set_event_loop
from multiprocessing import Pipe
from test.asyncio_tests import AsyncIOTestCase, asyncio_test
from test.utils import ignore_deprecations
@ -145,3 +148,22 @@ class AIOMotorTestBasic(AsyncIOTestCase):
coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(AsyncIOTestCase):
@asyncio_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
set_event_loop(None)
self.loop = new_event_loop()
client = self.asyncio_client()
try:
self.loop.run_until_complete(client.db.command("ping"))
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")

View File

@ -13,15 +13,17 @@
# limitations under the License.
"""Test Motor, an asynchronous driver for MongoDB and Tornado."""
import os
import test
from abc import ABC
from multiprocessing import Pipe
from test.tornado_tests import MotorTest
from test.utils import ignore_deprecations
import pymongo
from pymongo import WriteConcern
from pymongo.read_preferences import Nearest, ReadPreference, Secondary
from tornado.ioloop import IOLoop
from tornado.testing import gen_test
import motor
@ -146,3 +148,21 @@ class MotorTestBasic(MotorTest):
coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(MotorTest):
@gen_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
self.loop = IOLoop.current()
client = self.motor_client()
try:
self.loop.spawn_callback(client.db.command, "ping")
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")