Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
14f381cddc | ||
|
|
c2228bfb3d | ||
|
|
decdb2b528 | ||
|
|
c920be5a31 |
@ -3,6 +3,13 @@ Changelog
|
||||
|
||||
.. currentmodule:: motor.motor_tornado
|
||||
|
||||
|
||||
Motor 3.1.2
|
||||
-----------
|
||||
|
||||
Motor 3.1.2 fixes a bug when using Motor with ``multiprocessing``.
|
||||
|
||||
|
||||
Motor 3.1.1
|
||||
-----------
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
"""Motor, an asynchronous driver for MongoDB."""
|
||||
|
||||
version_tuple = (3, 1, 1)
|
||||
version_tuple = (3, 1, 3, 'dev0')
|
||||
|
||||
|
||||
def get_version_string():
|
||||
|
||||
@ -66,6 +66,17 @@ else:
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
def _reset_global_executor():
|
||||
"""Re-initialize the global ThreadPoolExecutor"""
|
||||
global _EXECUTOR
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
if hasattr(os, "register_at_fork"):
|
||||
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
|
||||
os.register_at_fork(after_in_child=_reset_global_executor)
|
||||
|
||||
|
||||
def run_on_executor(loop, fn, *args, **kwargs):
|
||||
if contextvars:
|
||||
context = contextvars.copy_context()
|
||||
|
||||
@ -61,6 +61,17 @@ else:
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
def _reset_global_executor():
|
||||
"""Re-initialize the global ThreadPoolExecutor"""
|
||||
global _EXECUTOR
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
if hasattr(os, "register_at_fork"):
|
||||
# We need this to make sure that creating new clients in subprocesses doesn't deadlock.
|
||||
os.register_at_fork(after_in_child=_reset_global_executor)
|
||||
|
||||
|
||||
def run_on_executor(loop, fn, *args, **kwargs):
|
||||
if contextvars:
|
||||
context = contextvars.copy_context()
|
||||
|
||||
2
setup.py
2
setup.py
@ -151,7 +151,7 @@ packages = [
|
||||
|
||||
setup(
|
||||
name="motor",
|
||||
version="3.1.1",
|
||||
version="3.1.3.dev0",
|
||||
packages=packages,
|
||||
description=description,
|
||||
long_description=long_description,
|
||||
|
||||
@ -11,9 +11,12 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import test
|
||||
import unittest
|
||||
from abc import ABC
|
||||
from asyncio import new_event_loop, set_event_loop
|
||||
from multiprocessing import Pipe
|
||||
from test.asyncio_tests import AsyncIOTestCase, asyncio_test
|
||||
from test.utils import ignore_deprecations
|
||||
|
||||
@ -145,3 +148,23 @@ class AIOMotorTestBasic(AsyncIOTestCase):
|
||||
coll = db["testcoll"]
|
||||
self.assertIsInstance(coll, CollectionSubclass)
|
||||
self.assertIsNotNone(await coll.insert_one({}))
|
||||
|
||||
|
||||
class ExecutorForkTest(AsyncIOTestCase):
|
||||
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
|
||||
@asyncio_test()
|
||||
async def test_executor_reset(self):
|
||||
parent_conn, child_conn = Pipe()
|
||||
lock_pid = os.fork()
|
||||
if lock_pid == 0: # Child
|
||||
set_event_loop(None)
|
||||
self.loop = new_event_loop()
|
||||
client = self.asyncio_client()
|
||||
try:
|
||||
self.loop.run_until_complete(client.db.command("ping"))
|
||||
except Exception:
|
||||
child_conn.send(False)
|
||||
child_conn.send(True)
|
||||
os._exit(0)
|
||||
else: # Parent
|
||||
self.assertTrue(parent_conn.recv(), "Child process did not complete.")
|
||||
|
||||
@ -13,15 +13,18 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""Test Motor, an asynchronous driver for MongoDB and Tornado."""
|
||||
|
||||
import os
|
||||
import test
|
||||
import unittest
|
||||
from abc import ABC
|
||||
from multiprocessing import Pipe
|
||||
from test.tornado_tests import MotorTest
|
||||
from test.utils import ignore_deprecations
|
||||
|
||||
import pymongo
|
||||
from pymongo import WriteConcern
|
||||
from pymongo.read_preferences import Nearest, ReadPreference, Secondary
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.testing import gen_test
|
||||
|
||||
import motor
|
||||
@ -146,3 +149,22 @@ class MotorTestBasic(MotorTest):
|
||||
coll = db["testcoll"]
|
||||
self.assertIsInstance(coll, CollectionSubclass)
|
||||
self.assertIsNotNone(await coll.insert_one({}))
|
||||
|
||||
|
||||
class ExecutorForkTest(MotorTest):
|
||||
@unittest.skipUnless(hasattr(os, "fork"), "This test requires fork")
|
||||
@gen_test()
|
||||
async def test_executor_reset(self):
|
||||
parent_conn, child_conn = Pipe()
|
||||
lock_pid = os.fork()
|
||||
if lock_pid == 0: # Child
|
||||
self.loop = IOLoop.current()
|
||||
client = self.motor_client()
|
||||
try:
|
||||
self.loop.spawn_callback(client.db.command, "ping")
|
||||
except Exception:
|
||||
child_conn.send(False)
|
||||
child_conn.send(True)
|
||||
os._exit(0)
|
||||
else: # Parent
|
||||
self.assertTrue(parent_conn.recv(), "Child process did not complete.")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user