Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b9ce1eaeb | ||
|
|
aa78ed129e | ||
|
|
aced74e749 | ||
|
|
792adfbba6 | ||
|
|
95f08f0398 | ||
|
|
efa3cf485f | ||
|
|
34daa766f7 | ||
|
|
b0f443cfd7 | ||
|
|
44eb6b6d35 | ||
|
|
73d3536416 | ||
|
|
bb74bf3df4 | ||
|
|
b6f1204856 | ||
|
|
8f89f1ebe1 | ||
|
|
b9db28d744 | ||
|
|
68136d8546 | ||
|
|
45900540d1 | ||
|
|
408c292b79 | ||
|
|
e121cfae80 | ||
|
|
08ef305847 | ||
|
|
78271b9693 |
@ -3,6 +3,40 @@ Changelog
|
||||
|
||||
.. currentmodule:: motor.motor_tornado
|
||||
|
||||
Motor 1.2.5
|
||||
-----------
|
||||
|
||||
Fix a Python 3.7 compatibility bug caused by importing "async", which is a
|
||||
keyword in Python 3.7. Drop support for Python 3.4.3 and older.
|
||||
|
||||
Motor 1.2.4
|
||||
-----------
|
||||
|
||||
Fix a Python 3.7 compatibility bug in the :class:`MotorChangeStream` class
|
||||
returned by :meth:`MotorCollection.watch`. It is now possible to use change
|
||||
streams in ``async for`` loops in Python 3.7.
|
||||
|
||||
Motor 1.2.3
|
||||
-----------
|
||||
|
||||
Compatibility with latest Sphinx and document how to use the latest TLS
|
||||
protocols.
|
||||
|
||||
Motor 1.2.2
|
||||
-----------
|
||||
|
||||
Motor 1.2.x requires PyMongo 3.6 or later. The dependency was properly
|
||||
documented, but not enforced in ``setup.py``. PyMongo 3.6 is now an install-time
|
||||
requirement; thanks to Shane Harvey for the fix.
|
||||
|
||||
Motor 1.2.1
|
||||
-----------
|
||||
|
||||
An asyncio application that created a Change Stream with
|
||||
:meth:`MotorCollection.watch` and shut down while the Change Stream was open
|
||||
would print several errors. I have rewritten :meth:`MotorChangeStream.next`
|
||||
and some Motor internals to allow clean shutdown with asyncio.
|
||||
|
||||
Motor 1.2
|
||||
---------
|
||||
|
||||
|
||||
@ -31,7 +31,7 @@ master_doc = 'index'
|
||||
|
||||
# General information about the project.
|
||||
project = u'Motor'
|
||||
copyright = u'2016 MongoDB, Inc.'
|
||||
copyright = u'2016-present MongoDB, Inc.'
|
||||
|
||||
# The version info for the project you're documenting, acts as replacement for
|
||||
# |version| and |release|, also used in various other places throughout the
|
||||
|
||||
@ -1,6 +1,45 @@
|
||||
Configuration
|
||||
=============
|
||||
|
||||
TLS Protocol Version
|
||||
''''''''''''''''''''
|
||||
|
||||
Industry best practices, and some regulations, require the use
|
||||
of TLS 1.1 or newer. Though no application changes are required for
|
||||
Motor to make use of the newest protocols, some operating systems or
|
||||
versions may not provide an OpenSSL version new enough to support them.
|
||||
|
||||
Users of macOS older than 10.13 (High Sierra) will need to install Python
|
||||
from `python.org`_, `homebrew`_, `macports`_, or another similar source.
|
||||
|
||||
Users of Linux or other non-macOS Unix can check their OpenSSL version like
|
||||
this::
|
||||
|
||||
$ openssl version
|
||||
|
||||
If the version number is less than 1.0.1 support for TLS 1.1 or newer is not
|
||||
available. Contact your operating system vendor for a solution or upgrade to
|
||||
a newer distribution.
|
||||
|
||||
You can check your Python interpreter by installing the `requests`_ module
|
||||
and executing the following command::
|
||||
|
||||
python -c "import requests; print(requests.get('https://www.howsmyssl.com/a/check', verify=False).json()['tls_version'])"
|
||||
|
||||
You should see "TLS 1.X" where X is >= 1.
|
||||
|
||||
You can read more about TLS versions and their security implications here:
|
||||
|
||||
`<https://www.owasp.org/index.php/Transport_Layer_Protection_Cheat_Sheet#Rule_-_Only_Support_Strong_Protocols>`_
|
||||
|
||||
.. _python.org: https://www.python.org/downloads/
|
||||
.. _homebrew: https://brew.sh/
|
||||
.. _macports: https://www.macports.org/
|
||||
.. _requests: https://pypi.python.org/pypi/requests
|
||||
|
||||
Thread Pool Size
|
||||
''''''''''''''''
|
||||
|
||||
Motor uses the Python standard library's :class:`~concurrent.futures.ThreadPoolExecutor` to defer network
|
||||
operations to threads. By default, the executor uses at most five threads per CPU core on your
|
||||
system; to override the default set the environment variable ``MOTOR_MAX_WORKERS``.
|
||||
|
||||
@ -83,9 +83,15 @@ class ChangesHandler(tornado.websocket.WebSocketHandler):
|
||||
ChangesHandler.update_cache(change)
|
||||
|
||||
|
||||
change_stream = None
|
||||
|
||||
|
||||
async def watch(collection):
|
||||
async for change in collection.watch():
|
||||
ChangesHandler.on_change(change)
|
||||
global change_stream
|
||||
|
||||
async with collection.watch() as change_stream:
|
||||
async for change in change_stream:
|
||||
ChangesHandler.on_change(change)
|
||||
|
||||
|
||||
def main():
|
||||
@ -103,7 +109,13 @@ def main():
|
||||
loop = tornado.ioloop.IOLoop.current()
|
||||
# Start watching collection for changes.
|
||||
loop.add_callback(watch, collection)
|
||||
loop.start()
|
||||
try:
|
||||
loop.start()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if change_stream is not None:
|
||||
change_stream.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
# Copyright 2009-2015 MongoDB, Inc.
|
||||
# Copyright 2009-present MongoDB, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -15,8 +15,8 @@
|
||||
"""MongoDB specific extensions to Sphinx."""
|
||||
|
||||
from docutils import nodes
|
||||
from docutils.parsers import rst
|
||||
from sphinx import addnodes
|
||||
from sphinx.util.compat import Directive
|
||||
|
||||
|
||||
class mongodoc(nodes.Admonition, nodes.Element):
|
||||
@ -28,7 +28,7 @@ class mongoref(nodes.reference):
|
||||
|
||||
|
||||
def visit_mongodoc_node(self, node):
|
||||
self.visit_admonition(node)
|
||||
self.visit_admonition(node, "seealso")
|
||||
|
||||
|
||||
def depart_mongodoc_node(self, node):
|
||||
@ -48,7 +48,7 @@ def depart_mongoref_node(self, node):
|
||||
self.body.append('\n')
|
||||
|
||||
|
||||
class MongodocDirective(Directive):
|
||||
class MongodocDirective(rst.Directive):
|
||||
|
||||
has_content = True
|
||||
required_arguments = 0
|
||||
@ -58,7 +58,7 @@ class MongodocDirective(Directive):
|
||||
|
||||
def run(self):
|
||||
node = mongodoc()
|
||||
title = 'See general MongoDB documentation'
|
||||
title = 'The MongoDB documentation on'
|
||||
node += nodes.title(title, title)
|
||||
self.state.nested_parse(self.content, self.content_offset, node)
|
||||
return [node]
|
||||
@ -94,6 +94,4 @@ def setup(app):
|
||||
html=(visit_mongoref_node, depart_mongoref_node))
|
||||
|
||||
app.add_directive("mongodoc", MongodocDirective)
|
||||
|
||||
app.connect("doctree-resolved", process_mongodoc_nodes)
|
||||
return {'parallel_write_safe': True, 'parallel_read_safe': True}
|
||||
|
||||
@ -20,12 +20,10 @@ import pymongo
|
||||
|
||||
from motor.motor_py3_compat import text_type
|
||||
|
||||
version_tuple = (1, 2, 0)
|
||||
version_tuple = (1, 2, 6, 'dev0')
|
||||
|
||||
|
||||
def get_version_string():
|
||||
if isinstance(version_tuple[-1], text_type):
|
||||
return '.'.join(map(str, version_tuple[:-1])) + version_tuple[-1]
|
||||
return '.'.join(map(str, version_tuple))
|
||||
|
||||
|
||||
|
||||
@ -462,15 +462,56 @@ class AgnosticCollection(AgnosticBaseProperties):
|
||||
Returns a :class:`~MotorChangeStream` cursor which iterates over changes
|
||||
on this collection. Introduced in MongoDB 3.6.
|
||||
|
||||
A change stream continues waiting indefinitely for matching change
|
||||
events. Code like the following allows a program to cancel the change
|
||||
stream and exit.
|
||||
|
||||
.. code-block:: python3
|
||||
|
||||
async with db.collection.watch() as stream:
|
||||
async for change in stream:
|
||||
print(change)
|
||||
change_stream = None
|
||||
|
||||
Using the change stream in an "async with" block as shown above ensures
|
||||
it is canceled promptly if your code breaks from the loop or throws an
|
||||
exception.
|
||||
async def watch_collection():
|
||||
global change_stream
|
||||
|
||||
# Using the change stream in an "async with" block
|
||||
# ensures it is canceled promptly if your code breaks
|
||||
# from the loop or throws an exception.
|
||||
async with db.collection.watch() as change_stream:
|
||||
async for change in stream:
|
||||
print(change)
|
||||
|
||||
# Tornado
|
||||
from tornado.ioloop import IOLoop
|
||||
|
||||
def main():
|
||||
loop = IOLoop.current()
|
||||
# Start watching collection for changes.
|
||||
loop.add_callback(watch_collection)
|
||||
try:
|
||||
loop.start()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if change_stream is not None:
|
||||
change_stream.close()
|
||||
|
||||
# asyncio
|
||||
from asyncio import get_event_loop
|
||||
|
||||
def main():
|
||||
loop = get_event_loop()
|
||||
task = loop.create_task(watch_collection)
|
||||
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if change_stream is not None:
|
||||
change_stream.close()
|
||||
|
||||
# Prevent "Task was destroyed but it is pending!"
|
||||
loop.run_until_complete(task)
|
||||
|
||||
The :class:`~MotorChangeStream` async iterable blocks
|
||||
until the next change document is returned or an error is raised. If
|
||||
@ -1248,22 +1289,15 @@ class AgnosticChangeStream(AgnosticBase):
|
||||
'collation': collation,
|
||||
'session': session}
|
||||
|
||||
def _next(self, future):
|
||||
# This method is run on a thread. asyncio prohibits future.set_exception
|
||||
# with a StopIteration, so we must handle this operation differently
|
||||
# from other async methods.
|
||||
def _next(self):
|
||||
# This method is run on a thread.
|
||||
try:
|
||||
if not self.delegate:
|
||||
self.delegate = self._collection.delegate.watch(**self._kwargs)
|
||||
|
||||
change = self.delegate.next()
|
||||
self._framework.call_soon(self.get_io_loop(),
|
||||
future.set_result,
|
||||
change)
|
||||
return self.delegate.next()
|
||||
except StopIteration:
|
||||
future.set_exception(StopAsyncIteration())
|
||||
except Exception as exc:
|
||||
future.set_exception(exc)
|
||||
raise StopAsyncIteration()
|
||||
|
||||
@coroutine_annotation(callback=False)
|
||||
def next(self):
|
||||
@ -1284,9 +1318,7 @@ class AgnosticChangeStream(AgnosticBase):
|
||||
|
||||
"""
|
||||
loop = self.get_io_loop()
|
||||
future = self._framework.get_future(loop)
|
||||
self._framework.run_on_executor(loop, self._next, future)
|
||||
return future
|
||||
return self._framework.run_on_executor(loop, self._next)
|
||||
|
||||
@coroutine_annotation(callback=False)
|
||||
def close(self):
|
||||
@ -1304,7 +1336,7 @@ class AgnosticChangeStream(AgnosticBase):
|
||||
|
||||
if PY35:
|
||||
exec(textwrap.dedent("""
|
||||
async def __aiter__(self):
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
__anext__ = next
|
||||
@ -1313,7 +1345,8 @@ class AgnosticChangeStream(AgnosticBase):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.close()
|
||||
if self.delegate:
|
||||
self.delegate.close()
|
||||
"""), globals(), locals())
|
||||
|
||||
def get_io_loop(self):
|
||||
|
||||
@ -26,11 +26,6 @@ import functools
|
||||
import multiprocessing
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
try:
|
||||
from asyncio import ensure_future
|
||||
except ImportError:
|
||||
from asyncio import async as ensure_future
|
||||
|
||||
CLASS_PREFIX = 'AsyncIO'
|
||||
|
||||
|
||||
@ -61,12 +56,42 @@ else:
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
def run_on_executor(loop, fn, self, *args, **kwargs):
|
||||
# Ensures the wrapped future is resolved on the main thread, though the
|
||||
# executor's future is resolved on a worker thread.
|
||||
return asyncio.futures.wrap_future(
|
||||
_EXECUTOR.submit(functools.partial(fn, self, *args, **kwargs)),
|
||||
loop=loop)
|
||||
def run_on_executor(loop, fn, *args, **kwargs):
|
||||
# Adapted from asyncio's wrap_future and _chain_future. Ensure the wrapped
|
||||
# future is resolved on the main thread when the executor's future is
|
||||
# resolved on a worker thread. asyncio's wrap_future does the same, but
|
||||
# throws an error if the loop is stopped. We want to avoid errors if a
|
||||
# background task completes after the loop stops, e.g. ChangeStream.next()
|
||||
# returns while the program is shutting down.
|
||||
def _set_state():
|
||||
if dest.cancelled():
|
||||
return
|
||||
|
||||
if source.cancelled():
|
||||
dest.cancel()
|
||||
else:
|
||||
exception = source.exception()
|
||||
if exception is not None:
|
||||
dest.set_exception(exception)
|
||||
else:
|
||||
result = source.result()
|
||||
dest.set_result(result)
|
||||
|
||||
def _call_check_cancel(_):
|
||||
if dest.cancelled():
|
||||
source.cancel()
|
||||
|
||||
def _call_set_state(_):
|
||||
if loop.is_closed():
|
||||
return
|
||||
|
||||
loop.call_soon_threadsafe(_set_state)
|
||||
|
||||
source = _EXECUTOR.submit(functools.partial(fn, *args, **kwargs))
|
||||
dest = asyncio.Future(loop=loop)
|
||||
dest.add_done_callback(_call_check_cancel)
|
||||
source.add_done_callback(_call_set_state)
|
||||
return dest
|
||||
|
||||
|
||||
_DEFAULT = object()
|
||||
|
||||
@ -57,11 +57,11 @@ else:
|
||||
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
def run_on_executor(loop, fn, self, *args, **kwargs):
|
||||
def run_on_executor(loop, fn, *args, **kwargs):
|
||||
# Need a Tornado Future for "await" expressions. exec_fut is resolved on a
|
||||
# worker thread, loop.add_future ensures "future" is resolved on main.
|
||||
future = concurrent.Future()
|
||||
exec_fut = _EXECUTOR.submit(fn, self, *args, **kwargs)
|
||||
exec_fut = _EXECUTOR.submit(fn, *args, **kwargs)
|
||||
|
||||
def copy(_):
|
||||
if future.done():
|
||||
@ -75,6 +75,7 @@ def run_on_executor(loop, fn, self, *args, **kwargs):
|
||||
loop.add_future(exec_fut, copy)
|
||||
return future
|
||||
|
||||
|
||||
_DEFAULT = object()
|
||||
|
||||
|
||||
|
||||
4
setup.py
4
setup.py
@ -30,7 +30,7 @@ description = 'Non-blocking MongoDB driver for Tornado or asyncio'
|
||||
|
||||
long_description = open("README.rst").read()
|
||||
|
||||
install_requires = ['pymongo>=3.4,<4']
|
||||
install_requires = ['pymongo>=3.6,<4']
|
||||
|
||||
tests_require = ['mockupdb>=1.2.1']
|
||||
|
||||
@ -152,7 +152,7 @@ if sys.version_info[0] >= 3:
|
||||
packages.append('motor.aiohttp')
|
||||
|
||||
setup(name='motor',
|
||||
version='1.2.0',
|
||||
version='1.2.6.dev0',
|
||||
packages=packages,
|
||||
description=description,
|
||||
long_description=long_description,
|
||||
|
||||
@ -20,13 +20,9 @@ import gc
|
||||
import inspect
|
||||
import os
|
||||
import unittest
|
||||
from asyncio import ensure_future
|
||||
from unittest import SkipTest
|
||||
|
||||
try:
|
||||
from asyncio import ensure_future
|
||||
except ImportError:
|
||||
from asyncio import async as ensure_future
|
||||
|
||||
from mockupdb import MockupDB
|
||||
|
||||
from motor import motor_asyncio
|
||||
|
||||
Loading…
Reference in New Issue
Block a user