Compare commits

...

4 Commits
master ... v3.1

Author SHA1 Message Date
Steven Silvester
14f381cddc BUMP 3.1.3.dev0 2023-04-03 12:24:47 -05:00
Steven Silvester
c2228bfb3d BUMP 3.1.2 2023-04-03 12:23:56 -05:00
Julius Park
decdb2b528 MOTOR-1106 ERROR: module 'os' has no attribute 'fork' (AttributeError) (#199)
(cherry picked from commit 16c476e618)
2023-04-03 12:18:54 -05:00
Julius Park
c920be5a31 MOTOR-1087 motor_asyncio freezes when using multiprocessing (#196)
(cherry picked from commit 935c543919)
2023-04-03 12:18:32 -05:00
7 changed files with 78 additions and 4 deletions

View File

@ -3,6 +3,13 @@ Changelog
.. currentmodule:: motor.motor_tornado
Motor 3.1.2
-----------
Motor 3.1.2 fixes a bug when using Motor with ``multiprocessing``.
Motor 3.1.1
-----------

View File

@ -14,7 +14,7 @@
"""Motor, an asynchronous driver for MongoDB."""
version_tuple = (3, 1, 1)
version_tuple = (3, 1, 3, 'dev0')
def get_version_string():

View File

@ -66,6 +66,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs):
if contextvars:
context = contextvars.copy_context()

View File

@ -61,6 +61,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs):
if contextvars:
context = contextvars.copy_context()

View File

@ -151,7 +151,7 @@ packages = [
setup(
name="motor",
version="3.1.1",
version="3.1.3.dev0",
packages=packages,
description=description,
long_description=long_description,

View File

@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import test
import unittest
from abc import ABC
from asyncio import new_event_loop, set_event_loop
from multiprocessing import Pipe
from test.asyncio_tests import AsyncIOTestCase, asyncio_test
from test.utils import ignore_deprecations
@ -145,3 +148,23 @@ class AIOMotorTestBasic(AsyncIOTestCase):
coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(AsyncIOTestCase):
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
@asyncio_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
set_event_loop(None)
self.loop = new_event_loop()
client = self.asyncio_client()
try:
self.loop.run_until_complete(client.db.command("ping"))
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")

View File

@ -13,15 +13,18 @@
# limitations under the License.
"""Test Motor, an asynchronous driver for MongoDB and Tornado."""
import os
import test
import unittest
from abc import ABC
from multiprocessing import Pipe
from test.tornado_tests import MotorTest
from test.utils import ignore_deprecations
import pymongo
from pymongo import WriteConcern
from pymongo.read_preferences import Nearest, ReadPreference, Secondary
from tornado.ioloop import IOLoop
from tornado.testing import gen_test
import motor
@ -146,3 +149,22 @@ class MotorTestBasic(MotorTest):
coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(MotorTest):
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
@gen_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
self.loop = IOLoop.current()
client = self.motor_client()
try:
self.loop.spawn_callback(client.db.command, "ping")
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")