Compare commits

...

20 Commits
master ... r1.2

Author SHA1 Message Date
A. Jesse Jiryu Davis
0b9ce1eaeb Version -> dev0 2018-07-11 14:23:57 -04:00
A. Jesse Jiryu Davis
aa78ed129e
BUMP 1.2.5
Signed-off-by: A. Jesse Jiryu Davis <jesse@mongodb.com>
2018-07-11 14:23:16 -04:00
A. Jesse Jiryu Davis
aced74e749 MOTOR-248 Don't use "async" as variable name
In Python 3.7, "async" becomes a full-fledged keyword.
2018-07-11 14:22:00 -04:00
A. Jesse Jiryu Davis
792adfbba6 Version -> dev0 2018-07-11 03:17:30 -04:00
A. Jesse Jiryu Davis
95f08f0398
BUMP 1.2.4
Signed-off-by: A. Jesse Jiryu Davis <jesse@mongodb.com>
2018-07-11 03:00:32 -04:00
A. Jesse Jiryu Davis
efa3cf485f MOTOR-255 Change streams use new async iter style 2018-07-08 11:58:11 -07:00
A. Jesse Jiryu Davis
34daa766f7 Version -> dev0 2018-05-21 15:06:31 -04:00
A. Jesse Jiryu Davis
b0f443cfd7
BUMP 1.2.3
Signed-off-by: A. Jesse Jiryu Davis <jesse@mongodb.com>
2018-05-21 15:05:25 -04:00
A. Jesse Jiryu Davis
44eb6b6d35 MOTOR-200 - Warn about old TLS versions 2018-05-21 15:01:51 -04:00
A. Jesse Jiryu Davis
73d3536416 Update extension for latest Sphinx 2018-05-21 14:58:19 -04:00
A. Jesse Jiryu Davis
bb74bf3df4 Version -> dev0 2018-05-21 14:52:10 -04:00
A. Jesse Jiryu Davis
b6f1204856
BUMP 1.2.2
Signed-off-by: A. Jesse Jiryu Davis <jesse@mongodb.com>
2018-05-21 14:40:13 -04:00
Shane Harvey
8f89f1ebe1
MOTOR-223 Motor 1.2 requires PyMongo 3.6+ (#39) 2018-05-21 11:03:40 -07:00
A. Jesse Jiryu Davis
b9db28d744 Version -> 1.2.2.dev0 2018-01-18 06:05:17 -05:00
A. Jesse Jiryu Davis
68136d8546
BUMP 1.2.1
Signed-off-by: A. Jesse Jiryu Davis <jesse@mongodb.com>
2018-01-18 05:57:27 -05:00
A. Jesse Jiryu Davis
45900540d1 Update docs copyright notice 2018-01-18 05:52:57 -05:00
A. Jesse Jiryu Davis
408c292b79 MOTOR-185 Example clean shutdown w/ change stream 2018-01-18 05:52:42 -05:00
A. Jesse Jiryu Davis
e121cfae80 MOTOR-185 Allow clean shutdown with change stream 2018-01-18 05:52:34 -05:00
A. Jesse Jiryu Davis
08ef305847 MOTOR-185 Deadlock closing Change Stream 2018-01-18 05:52:27 -05:00
A. Jesse Jiryu Davis
78271b9693 Version -> 1.2.1.dev0 2018-01-18 05:52:06 -05:00
11 changed files with 192 additions and 56 deletions

View File

@ -3,6 +3,40 @@ Changelog
.. currentmodule:: motor.motor_tornado
Motor 1.2.5
-----------
Fix a Python 3.7 compatibility bug caused by importing "async", which is a
keyword in Python 3.7. Drop support for Python 3.4.3 and older.
Motor 1.2.4
-----------
Fix a Python 3.7 compatibility bug in the :class:`MotorChangeStream` class
returned by :meth:`MotorCollection.watch`. It is now possible to use change
streams in ``async for`` loops in Python 3.7.
Motor 1.2.3
-----------
Compatibility with latest Sphinx and document how to use the latest TLS
protocols.
Motor 1.2.2
-----------
Motor 1.2.x requires PyMongo 3.6 or later. The dependency was properly
documented, but not enforced in ``setup.py``. PyMongo 3.6 is now an install-time
requirement; thanks to Shane Harvey for the fix.
Motor 1.2.1
-----------
An asyncio application that created a Change Stream with
:meth:`MotorCollection.watch` and shut down while the Change Stream was open
would print several errors. I have rewritten :meth:`MotorChangeStream.next`
and some Motor internals to allow clean shutdown with asyncio.
Motor 1.2
---------

View File

@ -31,7 +31,7 @@ master_doc = 'index'
# General information about the project.
project = u'Motor'
copyright = u'2016 MongoDB, Inc.'
copyright = u'2016-present MongoDB, Inc.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the

View File

@ -1,6 +1,45 @@
Configuration
=============
TLS Protocol Version
''''''''''''''''''''
Industry best practices, and some regulations, require the use
of TLS 1.1 or newer. Though no application changes are required for
Motor to make use of the newest protocols, some operating systems or
versions may not provide an OpenSSL version new enough to support them.
Users of macOS older than 10.13 (High Sierra) will need to install Python
from `python.org`_, `homebrew`_, `macports`_, or another similar source.
Users of Linux or other non-macOS Unix can check their OpenSSL version like
this::
$ openssl version
If the version number is less than 1.0.1 support for TLS 1.1 or newer is not
available. Contact your operating system vendor for a solution or upgrade to
a newer distribution.
You can check your Python interpreter by installing the `requests`_ module
and executing the following command::
python -c "import requests; print(requests.get('https://www.howsmyssl.com/a/check', verify=False).json()['tls_version'])"
You should see "TLS 1.X" where X is >= 1.
You can read more about TLS versions and their security implications here:
`<https://www.owasp.org/index.php/Transport_Layer_Protection_Cheat_Sheet#Rule_-_Only_Support_Strong_Protocols>`_
.. _python.org: https://www.python.org/downloads/
.. _homebrew: https://brew.sh/
.. _macports: https://www.macports.org/
.. _requests: https://pypi.python.org/pypi/requests
Thread Pool Size
''''''''''''''''
Motor uses the Python standard library's :class:`~concurrent.futures.ThreadPoolExecutor` to defer network
operations to threads. By default, the executor uses at most five threads per CPU core on your
system; to override the default set the environment variable ``MOTOR_MAX_WORKERS``.

View File

@ -83,9 +83,15 @@ class ChangesHandler(tornado.websocket.WebSocketHandler):
ChangesHandler.update_cache(change)
change_stream = None
async def watch(collection):
async for change in collection.watch():
ChangesHandler.on_change(change)
global change_stream
async with collection.watch() as change_stream:
async for change in change_stream:
ChangesHandler.on_change(change)
def main():
@ -103,7 +109,13 @@ def main():
loop = tornado.ioloop.IOLoop.current()
# Start watching collection for changes.
loop.add_callback(watch, collection)
loop.start()
try:
loop.start()
except KeyboardInterrupt:
pass
finally:
if change_stream is not None:
change_stream.close()
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
# Copyright 2009-2015 MongoDB, Inc.
# Copyright 2009-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,8 +15,8 @@
"""MongoDB specific extensions to Sphinx."""
from docutils import nodes
from docutils.parsers import rst
from sphinx import addnodes
from sphinx.util.compat import Directive
class mongodoc(nodes.Admonition, nodes.Element):
@ -28,7 +28,7 @@ class mongoref(nodes.reference):
def visit_mongodoc_node(self, node):
self.visit_admonition(node)
self.visit_admonition(node, "seealso")
def depart_mongodoc_node(self, node):
@ -48,7 +48,7 @@ def depart_mongoref_node(self, node):
self.body.append('\n')
class MongodocDirective(Directive):
class MongodocDirective(rst.Directive):
has_content = True
required_arguments = 0
@ -58,7 +58,7 @@ class MongodocDirective(Directive):
def run(self):
node = mongodoc()
title = 'See general MongoDB documentation'
title = 'The MongoDB documentation on'
node += nodes.title(title, title)
self.state.nested_parse(self.content, self.content_offset, node)
return [node]
@ -94,6 +94,4 @@ def setup(app):
html=(visit_mongoref_node, depart_mongoref_node))
app.add_directive("mongodoc", MongodocDirective)
app.connect("doctree-resolved", process_mongodoc_nodes)
return {'parallel_write_safe': True, 'parallel_read_safe': True}

View File

@ -20,12 +20,10 @@ import pymongo
from motor.motor_py3_compat import text_type
version_tuple = (1, 2, 0)
version_tuple = (1, 2, 6, 'dev0')
def get_version_string():
if isinstance(version_tuple[-1], text_type):
return '.'.join(map(str, version_tuple[:-1])) + version_tuple[-1]
return '.'.join(map(str, version_tuple))

View File

@ -462,15 +462,56 @@ class AgnosticCollection(AgnosticBaseProperties):
Returns a :class:`~MotorChangeStream` cursor which iterates over changes
on this collection. Introduced in MongoDB 3.6.
A change stream continues waiting indefinitely for matching change
events. Code like the following allows a program to cancel the change
stream and exit.
.. code-block:: python3
async with db.collection.watch() as stream:
async for change in stream:
print(change)
change_stream = None
Using the change stream in an "async with" block as shown above ensures
it is canceled promptly if your code breaks from the loop or throws an
exception.
async def watch_collection():
global change_stream
# Using the change stream in an "async with" block
# ensures it is canceled promptly if your code breaks
# from the loop or throws an exception.
async with db.collection.watch() as change_stream:
async for change in stream:
print(change)
# Tornado
from tornado.ioloop import IOLoop
def main():
loop = IOLoop.current()
# Start watching collection for changes.
loop.add_callback(watch_collection)
try:
loop.start()
except KeyboardInterrupt:
pass
finally:
if change_stream is not None:
change_stream.close()
# asyncio
from asyncio import get_event_loop
def main():
loop = get_event_loop()
task = loop.create_task(watch_collection)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
if change_stream is not None:
change_stream.close()
# Prevent "Task was destroyed but it is pending!"
loop.run_until_complete(task)
The :class:`~MotorChangeStream` async iterable blocks
until the next change document is returned or an error is raised. If
@ -1248,22 +1289,15 @@ class AgnosticChangeStream(AgnosticBase):
'collation': collation,
'session': session}
def _next(self, future):
# This method is run on a thread. asyncio prohibits future.set_exception
# with a StopIteration, so we must handle this operation differently
# from other async methods.
def _next(self):
# This method is run on a thread.
try:
if not self.delegate:
self.delegate = self._collection.delegate.watch(**self._kwargs)
change = self.delegate.next()
self._framework.call_soon(self.get_io_loop(),
future.set_result,
change)
return self.delegate.next()
except StopIteration:
future.set_exception(StopAsyncIteration())
except Exception as exc:
future.set_exception(exc)
raise StopAsyncIteration()
@coroutine_annotation(callback=False)
def next(self):
@ -1284,9 +1318,7 @@ class AgnosticChangeStream(AgnosticBase):
"""
loop = self.get_io_loop()
future = self._framework.get_future(loop)
self._framework.run_on_executor(loop, self._next, future)
return future
return self._framework.run_on_executor(loop, self._next)
@coroutine_annotation(callback=False)
def close(self):
@ -1304,7 +1336,7 @@ class AgnosticChangeStream(AgnosticBase):
if PY35:
exec(textwrap.dedent("""
async def __aiter__(self):
def __aiter__(self):
return self
__anext__ = next
@ -1313,7 +1345,8 @@ class AgnosticChangeStream(AgnosticBase):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
if self.delegate:
self.delegate.close()
"""), globals(), locals())
def get_io_loop(self):

View File

@ -26,11 +26,6 @@ import functools
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
try:
from asyncio import ensure_future
except ImportError:
from asyncio import async as ensure_future
CLASS_PREFIX = 'AsyncIO'
@ -61,12 +56,42 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def run_on_executor(loop, fn, self, *args, **kwargs):
# Ensures the wrapped future is resolved on the main thread, though the
# executor's future is resolved on a worker thread.
return asyncio.futures.wrap_future(
_EXECUTOR.submit(functools.partial(fn, self, *args, **kwargs)),
loop=loop)
def run_on_executor(loop, fn, *args, **kwargs):
# Adapted from asyncio's wrap_future and _chain_future. Ensure the wrapped
# future is resolved on the main thread when the executor's future is
# resolved on a worker thread. asyncio's wrap_future does the same, but
# throws an error if the loop is stopped. We want to avoid errors if a
# background task completes after the loop stops, e.g. ChangeStream.next()
# returns while the program is shutting down.
def _set_state():
if dest.cancelled():
return
if source.cancelled():
dest.cancel()
else:
exception = source.exception()
if exception is not None:
dest.set_exception(exception)
else:
result = source.result()
dest.set_result(result)
def _call_check_cancel(_):
if dest.cancelled():
source.cancel()
def _call_set_state(_):
if loop.is_closed():
return
loop.call_soon_threadsafe(_set_state)
source = _EXECUTOR.submit(functools.partial(fn, *args, **kwargs))
dest = asyncio.Future(loop=loop)
dest.add_done_callback(_call_check_cancel)
source.add_done_callback(_call_set_state)
return dest
_DEFAULT = object()

View File

@ -57,11 +57,11 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
def run_on_executor(loop, fn, self, *args, **kwargs):
def run_on_executor(loop, fn, *args, **kwargs):
# Need a Tornado Future for "await" expressions. exec_fut is resolved on a
# worker thread, loop.add_future ensures "future" is resolved on main.
future = concurrent.Future()
exec_fut = _EXECUTOR.submit(fn, self, *args, **kwargs)
exec_fut = _EXECUTOR.submit(fn, *args, **kwargs)
def copy(_):
if future.done():
@ -75,6 +75,7 @@ def run_on_executor(loop, fn, self, *args, **kwargs):
loop.add_future(exec_fut, copy)
return future
_DEFAULT = object()

View File

@ -30,7 +30,7 @@ description = 'Non-blocking MongoDB driver for Tornado or asyncio'
long_description = open("README.rst").read()
install_requires = ['pymongo>=3.4,<4']
install_requires = ['pymongo>=3.6,<4']
tests_require = ['mockupdb>=1.2.1']
@ -152,7 +152,7 @@ if sys.version_info[0] >= 3:
packages.append('motor.aiohttp')
setup(name='motor',
version='1.2.0',
version='1.2.6.dev0',
packages=packages,
description=description,
long_description=long_description,

View File

@ -20,13 +20,9 @@ import gc
import inspect
import os
import unittest
from asyncio import ensure_future
from unittest import SkipTest
try:
from asyncio import ensure_future
except ImportError:
from asyncio import async as ensure_future
from mockupdb import MockupDB
from motor import motor_asyncio