Compare commits

...

4 Commits
master ... v3.1

Author SHA1 Message Date
Steven Silvester
14f381cddc BUMP 3.1.3.dev0 2023-04-03 12:24:47 -05:00
Steven Silvester
c2228bfb3d BUMP 3.1.2 2023-04-03 12:23:56 -05:00
Julius Park
decdb2b528 MOTOR-1106 ERROR: module 'os' has no attribute 'fork' (AttributeError) (#199)
(cherry picked from commit 16c476e618)
2023-04-03 12:18:54 -05:00
Julius Park
c920be5a31 MOTOR-1087 motor_asyncio freezes when using multiprocessing (#196)
(cherry picked from commit 935c543919)
2023-04-03 12:18:32 -05:00
7 changed files with 78 additions and 4 deletions

View File

@ -3,6 +3,13 @@ Changelog
.. currentmodule:: motor.motor_tornado .. currentmodule:: motor.motor_tornado
Motor 3.1.2
-----------
Motor 3.1.2 fixes a bug when using Motor with ``multiprocessing``.
Motor 3.1.1 Motor 3.1.1
----------- -----------

View File

@ -14,7 +14,7 @@
"""Motor, an asynchronous driver for MongoDB.""" """Motor, an asynchronous driver for MongoDB."""
version_tuple = (3, 1, 1) version_tuple = (3, 1, 3, 'dev0')
def get_version_string(): def get_version_string():

View File

@ -66,6 +66,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) _EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs): def run_on_executor(loop, fn, *args, **kwargs):
if contextvars: if contextvars:
context = contextvars.copy_context() context = contextvars.copy_context()

View File

@ -61,6 +61,17 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) _EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def _reset_global_executor():
"""Re-initialize the global ThreadPoolExecutor"""
global _EXECUTOR
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
if hasattr(os, "register_at_fork"):
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
os.register_at_fork(after_in_child=_reset_global_executor)
def run_on_executor(loop, fn, *args, **kwargs): def run_on_executor(loop, fn, *args, **kwargs):
if contextvars: if contextvars:
context = contextvars.copy_context() context = contextvars.copy_context()

View File

@ -151,7 +151,7 @@ packages = [
setup( setup(
name="motor", name="motor",
version="3.1.1", version="3.1.3.dev0",
packages=packages, packages=packages,
description=description, description=description,
long_description=long_description, long_description=long_description,

View File

@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import test import test
import unittest
from abc import ABC from abc import ABC
from asyncio import new_event_loop, set_event_loop
from multiprocessing import Pipe
from test.asyncio_tests import AsyncIOTestCase, asyncio_test from test.asyncio_tests import AsyncIOTestCase, asyncio_test
from test.utils import ignore_deprecations from test.utils import ignore_deprecations
@ -145,3 +148,23 @@ class AIOMotorTestBasic(AsyncIOTestCase):
coll = db["testcoll"] coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass) self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({})) self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(AsyncIOTestCase):
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
@asyncio_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
set_event_loop(None)
self.loop = new_event_loop()
client = self.asyncio_client()
try:
self.loop.run_until_complete(client.db.command("ping"))
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")

View File

@ -13,15 +13,18 @@
# limitations under the License. # limitations under the License.
"""Test Motor, an asynchronous driver for MongoDB and Tornado.""" """Test Motor, an asynchronous driver for MongoDB and Tornado."""
import os
import test import test
import unittest
from abc import ABC from abc import ABC
from multiprocessing import Pipe
from test.tornado_tests import MotorTest from test.tornado_tests import MotorTest
from test.utils import ignore_deprecations from test.utils import ignore_deprecations
import pymongo import pymongo
from pymongo import WriteConcern from pymongo import WriteConcern
from pymongo.read_preferences import Nearest, ReadPreference, Secondary from pymongo.read_preferences import Nearest, ReadPreference, Secondary
from tornado.ioloop import IOLoop
from tornado.testing import gen_test from tornado.testing import gen_test
import motor import motor
@ -146,3 +149,22 @@ class MotorTestBasic(MotorTest):
coll = db["testcoll"] coll = db["testcoll"]
self.assertIsInstance(coll, CollectionSubclass) self.assertIsInstance(coll, CollectionSubclass)
self.assertIsNotNone(await coll.insert_one({})) self.assertIsNotNone(await coll.insert_one({}))
class ExecutorForkTest(MotorTest):
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
@gen_test()
async def test_executor_reset(self):
parent_conn, child_conn = Pipe()
lock_pid = os.fork()
if lock_pid == 0: # Child
self.loop = IOLoop.current()
client = self.motor_client()
try:
self.loop.spawn_callback(client.db.command, "ping")
except Exception:
child_conn.send(False)
child_conn.send(True)
os._exit(0)
else: # Parent
self.assertTrue(parent_conn.recv(), "Child process did not complete.")