Extend jinja environment and loaders by adding async APIs

This commit is contained in:
fatimah alhumaidhi 2026-01-04 12:26:48 +03:00
parent 5ef70112a1
commit 681b7810f7
5 changed files with 550 additions and 2 deletions

View File

@ -134,6 +134,13 @@ class BytecodeCache:
"""
raise NotImplementedError()
async def load_bytecode_async(self, bucket: Bucket) -> None:
"""Subclasses have to override this method to load bytecode into a
bucket asynchronously. If they are not able to find code in the cache for the
bucket, it must not do anything.
"""
return self.load_bytecode(bucket)
def dump_bytecode(self, bucket: Bucket) -> None:
"""Subclasses have to override this method to write the bytecode
from a bucket back to the cache. If it unable to do so it must not
@ -141,6 +148,13 @@ class BytecodeCache:
"""
raise NotImplementedError()
async def dump_bytecode_async(self, bucket: Bucket) -> None:
"""Subclasses have to override this method to write the bytecode
from a bucket back to the cache asynchronously. If it unable to do so it must not
fail silently but raise an exception.
"""
return self.dump_bytecode(bucket)
def clear(self) -> None:
"""Clears the cache. This method is not used by Jinja but should be
implemented to allow applications to clear the bytecode cache used
@ -176,11 +190,24 @@ class BytecodeCache:
self.load_bytecode(bucket)
return bucket
async def get_bucket_async(
self, environment: Environment, name: str, filename: str | None, source: str
) -> Bucket:
"""Asynchronously return a cache bucket for the given template. All arguments are
mandatory but filename may be `None`.
"""
return self.get_bucket(self, environment, name, filename, source)
def set_bucket(self, bucket: Bucket) -> None:
"""Put the bucket into the cache."""
self.dump_bytecode(bucket)
async def set_bucket_async(self, bucket: Bucket) -> None:
"""Asynchronously put the bucket into the cache."""
self.set_bucket(bucket)
class FileSystemBytecodeCache(BytecodeCache):
"""A bytecode cache that stores bytecode on the filesystem. It accepts
two arguments: The directory where the cache items are stored and a

View File

@ -1990,3 +1990,107 @@ class CodeGenerator(NodeVisitor):
self.visit(child, frame)
frame.eval_ctx.revert(saved_ctx)
self.writeline(f"context.eval_ctx.revert({old_ctx_name})")
class AsyncCodeGenerator(CodeGenerator):
def choose_async(self, async_value: str = "async ", sync_value: str = "") -> str:
return async_value # AsyncEnvironemet is always async
def visit_Extends(self, node: nodes.Extends, frame: Frame) -> None:
"""Calls the extender."""
if not frame.toplevel:
self.fail("cannot use extend from a non top-level scope", node.lineno)
# if the number of extends statements in general is zero so
# far, we don't have to add a check if something extended
# the template before this one.
if self.extends_so_far > 0:
# if we have a known extends we just add a template runtime
# error into the generated code. We could catch that at compile
# time too, but i welcome it not to confuse users by throwing the
# same error at different times just "because we can".
if not self.has_known_extends:
self.writeline("if parent_template is not None:")
self.indent()
self.writeline('raise TemplateRuntimeError("extended multiple times")')
# if we have a known extends already we don't need that code here
# as we know that the template execution will end here.
if self.has_known_extends:
raise CompilerExit()
else:
self.outdent()
self.writeline("parent_template = await environment.get_template(", node) # awaitable for AsyncEnvironemet
self.visit(node.template, frame)
self.write(f", {self.name!r})")
self.writeline("for name, parent_block in parent_template.blocks.items():")
self.indent()
self.writeline("context.blocks.setdefault(name, []).append(parent_block)")
self.outdent()
# if this extends statement was in the root level we can take
# advantage of that information and simplify the generated code
# in the top level from this point onwards
if frame.rootlevel:
self.has_known_extends = True
# and now we have one more
self.extends_so_far += 1
def visit_Include(self, node: nodes.Include, frame: Frame) -> None:
if node.ignore_missing:
self.writeline("try:")
self.indent()
func_name = "get_or_select_template"
if isinstance(node.template, nodes.Const):
if isinstance(node.template.value, str):
func_name = "get_template"
elif isinstance(node.template.value, (tuple, list)):
func_name = "select_template"
elif isinstance(node.template, (nodes.Tuple, nodes.List)):
func_name = "select_template"
self.writeline(f"template = await environment.{func_name}(", node) # awaitable for AsyncEnvironemet
self.visit(node.template, frame)
self.write(f", {self.name!r})")
if node.ignore_missing:
self.outdent()
self.writeline("except TemplateNotFound:")
self.indent()
self.writeline("pass")
self.outdent()
self.writeline("else:")
self.indent()
def loop_body() -> None:
self.indent()
self.simple_write("event", frame)
self.outdent()
if node.with_context:
self.writeline(
f"gen = template.root_render_func("
"template.new_context(context.get_all(), True,"
f" {self.dump_local_context(frame)}))"
)
self.writeline("try:")
self.indent()
self.writeline(f"{self.choose_async()}for event in gen:")
loop_body()
self.outdent()
self.writeline(
f"finally: {self.choose_async('await gen.aclose()', 'gen.close()')}"
)
else:
self.writeline(
"for event in (await template._get_default_module_async())"
"._body_stream:"
)
loop_body()
if node.ignore_missing:
self.outdent()

View File

@ -17,6 +17,7 @@ from markupsafe import Markup
from . import nodes
from .compiler import CodeGenerator
from .compiler import AsyncCodeGenerator
from .compiler import generate
from .defaults import BLOCK_END_STRING
from .defaults import BLOCK_START_STRING
@ -895,6 +896,87 @@ class Environment:
log_function("Finished compiling templates")
async def compile_templates_async(
self,
target: t.Union[str, "os.PathLike[str]"],
extensions: t.Collection[str] | None = None,
filter_func: t.Callable[[str], bool] | None = None,
zip: str | None = "deflated",
log_function: t.Callable[[str], None] | None = None,
ignore_errors: bool = True,
) -> None:
"""Asynchronously finds all the templates the loader can find, compiles them
and stores them in `target`. If `zip` is `None`, instead of in a
zipfile, the templates will be stored in a directory.
By default a deflate zip algorithm is used. To switch to
the stored algorithm, `zip` can be set to ``'stored'``.
`extensions` and `filter_func` are passed to :meth:`list_templates_async`.
Each template returned will be compiled to the target folder or
zipfile.
By default template compilation errors are ignored. In case a
log function is provided, errors are logged. If you want template
syntax errors to abort the compilation you can set `ignore_errors`
to `False` and you will get an exception on syntax errors.
.. versionadded:: 2.4
"""
from .loaders import ModuleLoader
if log_function is None:
def log_function(x: str) -> None:
pass
assert log_function is not None
assert self.loader is not None, "No loader configured."
def write_file(filename: str, data: str) -> None:
if zip:
info = ZipInfo(filename)
info.external_attr = 0o755 << 16
zip_file.writestr(info, data)
else:
with open(os.path.join(target, filename), "wb") as f:
f.write(data.encode("utf8"))
if zip is not None:
from zipfile import ZIP_DEFLATED
from zipfile import ZIP_STORED
from zipfile import ZipFile
from zipfile import ZipInfo
zip_file = ZipFile(
target, "w", dict(deflated=ZIP_DEFLATED, stored=ZIP_STORED)[zip]
)
log_function(f"Compiling into Zip archive {target!r}")
else:
if not os.path.isdir(target):
os.makedirs(target)
log_function(f"Compiling into folder {target!r}")
try:
async for name in await self.list_templates_async(extensions, filter_func):
source, filename, _ = await self.loader.get_source_async(self, name)
try:
code = self.compile(source, name, filename, True, True)
except TemplateSyntaxError as e:
if not ignore_errors:
raise
log_function(f'Could not compile "{name}": {e}')
continue
filename = ModuleLoader.get_module_filename(name)
write_file(filename, code)
log_function(f'Compiled "{name}" as {filename}')
finally:
if zip:
zip_file.close()
log_function("Finished compiling templates")
def list_templates(
self,
extensions: t.Collection[str] | None = None,
@ -932,6 +1014,43 @@ class Environment:
return names
async def list_templates_async(
self,
extensions: t.Collection[str] | None = None,
filter_func: t.Callable[[str], bool] | None = None,
) -> list[str]:
"""Asynchronously returns a list of templates for this environment. This requires
that the loader supports the loader's
:meth:`~BaseLoader.list_templates_async` method.
If there are other files in the template folder besides the
actual templates, the returned list can be filtered. There are two
ways: either `extensions` is set to a list of file extensions for
templates, or a `filter_func` can be provided which is a callable that
is passed a template name and should return `True` if it should end up
in the result list.
If the loader does not support that, a :exc:`TypeError` is raised.
.. versionadded:: 2.4
"""
assert self.loader is not None, "No loader configured."
names = await self.loader.list_templates_async()
if extensions is not None:
if filter_func is not None:
raise TypeError(
"either extensions or filter_func can be passed, but not both"
)
def filter_func(x: str) -> bool:
return "." in x and x.rsplit(".", 1)[1] in extensions
if filter_func is not None:
names = [name for name in names if filter_func(name)]
return names
def handle_exception(self, source: str | None = None) -> "te.NoReturn":
"""Exception handling helper. This is used internally to either raise
rewritten exceptions or return a rendered traceback for the template.
@ -977,6 +1096,31 @@ class Environment:
self.cache[cache_key] = template
return template
@internalcode
async def _load_template_async(
self,
name: str,
globals: t.MutableMapping[str, t.Any] | None,
) -> "Template":
if self.loader is None:
raise TypeError("no loader for this environment specified")
cache_key = (weakref.ref(self.loader), name)
if self.cache is not None:
template = self.cache.get(cache_key)
if template is not None and (
not self.auto_reload or template.is_up_to_date
):
if globals:
template.globals.update(globals)
return template
template = await self.loader.load_async(self, name, self.make_globals(globals))
if self.cache is not None:
self.cache[cache_key] = template
return template
@internalcode
def get_template(
self,
@ -1014,6 +1158,43 @@ class Environment:
return self._load_template(name, globals)
@internalcode
async def get_template_async(
self,
name: t.Union[str, "Template"],
parent: str | None = None,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
"""Asynchronously load a template by name with :attr:`loader` and return a
:class:`Template`. If the template does not exist a
:exc:`TemplateNotFound` exception is raised.
:param name: Name of the template to load. When loading
templates from the filesystem, "/" is used as the path
separator, even on Windows.
:param parent: The name of the parent template importing this
template. :meth:`join_path` can be used to implement name
transformations with this.
:param globals: Extend the environment :attr:`globals` with
these extra variables available for all renders of this
template. If the template has already been loaded and
cached, its globals are updated with any new items.
.. versionchanged:: 3.0
If a template is loaded from cache, ``globals`` will update
the template's globals instead of ignoring the new values.
.. versionchanged:: 2.4
If ``name`` is a :class:`Template` object it is returned
unchanged.
"""
if isinstance(name, Template):
return name
if parent is not None:
name = self.join_path(name, parent)
return await self._load_template_async(name, globals)
@internalcode
def select_template(
self,
@ -1068,6 +1249,60 @@ class Environment:
pass
raise TemplatesNotFound(names) # type: ignore
@internalcode
async def select_template_async(
self,
names: t.Iterable[t.Union[str, "Template"]],
parent: str | None = None,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
"""Like :meth:`get_template_async`, but asynchronously tries loading multiple names.
If none of the names can be loaded a :exc:`TemplatesNotFound`
exception is raised.
:param names: List of template names to try loading in order.
:param parent: The name of the parent template importing this
template. :meth:`join_path` can be used to implement name
transformations with this.
:param globals: Extend the environment :attr:`globals` with
these extra variables available for all renders of this
template. If the template has already been loaded and
cached, its globals are updated with any new items.
.. versionchanged:: 3.0
If a template is loaded from cache, ``globals`` will update
the template's globals instead of ignoring the new values.
.. versionchanged:: 2.11
If ``names`` is :class:`Undefined`, an :exc:`UndefinedError`
is raised instead. If no templates were found and ``names``
contains :class:`Undefined`, the message is more helpful.
.. versionchanged:: 2.4
If ``names`` contains a :class:`Template` object it is
returned unchanged.
.. versionadded:: 2.3
"""
if isinstance(names, Undefined):
names._fail_with_undefined_error()
if not names:
raise TemplatesNotFound(
message="Tried to select from an empty list of templates."
)
for name in names:
if isinstance(name, Template):
return name
if parent is not None:
name = self.join_path(name, parent)
try:
return await self._load_template_async(name, globals)
except (TemplateNotFound, UndefinedError):
pass
raise TemplatesNotFound(names) # type: ignore
@internalcode
def get_or_select_template(
self,
@ -1086,6 +1321,24 @@ class Environment:
return template_name_or_list
return self.select_template(template_name_or_list, parent, globals)
@internalcode
async def get_or_select_template_async(
self,
template_name_or_list: t.Union[str, "Template", list[t.Union[str, "Template"]]],
parent: str | None = None,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
"""Use :meth:`select_template_async` if an iterable of template names
is given, or :meth:`get_template_async` if one name is given.
.. versionadded:: 2.3
"""
if isinstance(template_name_or_list, (str, Undefined)):
return await self.get_template_async(template_name_or_list, parent, globals)
elif isinstance(template_name_or_list, Template):
return template_name_or_list
return await self.select_template_async(template_name_or_list, parent, globals)
def from_string(
self,
source: str | nodes.Template,
@ -1129,6 +1382,17 @@ class Environment:
return ChainMap(d, self.globals)
class AsyncEnvironment(Environment):
code_generator_class: type["CodeGenerator"] = AsyncCodeGenerator
get_template = Environment.get_template_async
select_template = Environment.select_template_async
get_or_select_template = Environment.get_or_select_template_async
list_templates = Environment.list_templates_async
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs, enable_async=True)
class Template:
"""A compiled template that can be rendered.
@ -1184,7 +1448,7 @@ class Template:
enable_async: bool = False,
) -> t.Any: # it returns a `Template`, but this breaks the sphinx build...
env = get_spontaneous_environment(
cls.environment_class, # type: ignore
type(self.environment),
block_start_string,
block_end_string,
variable_start_string,

View File

@ -12,6 +12,7 @@ import zipimport
from collections import abc
from hashlib import sha1
from importlib import import_module
from inspect import iscoroutinefunction
from types import ModuleType
from .exceptions import TemplateNotFound
@ -98,12 +99,40 @@ class BaseLoader:
)
raise TemplateNotFound(template)
async def get_source_async(
self, environment: "Environment", template: str
) -> tuple[str, str | None, t.Callable[[], bool] | None]:
"""Asynchronously get the template source, filename and reload helper for a template.
It's passed the environment and template name and has to return a
tuple in the form ``(source, filename, uptodate)`` or raise a
`TemplateNotFound` error if it can't locate the template.
The source part of the returned tuple must be the source of the
template as a string. The filename should be the name of the
file on the filesystem if it was loaded from there, otherwise
``None``. The filename is used by Python for the tracebacks
if no loader extension is used.
The last item in the tuple is the `uptodate` function. If auto
reloading is enabled it's always called to check if the template
changed. No arguments are passed so the function must store the
old state somewhere (for example in a closure). If it returns `False`
the template will be reloaded.
"""
return self.get_source(environment, template)
def list_templates(self) -> list[str]:
"""Iterates over all templates. If the loader does not support that
it should raise a :exc:`TypeError` which is the default behavior.
"""
raise TypeError("this loader cannot iterate over all templates")
async def list_templates_async(self) -> list[str]:
"""Asynchronously iterates over all templates. If the loader does not support that
it should raise a :exc:`TypeError` which is the default behavior.
"""
return self.list_templates()
@internalcode
def load(
self,
@ -148,6 +177,50 @@ class BaseLoader:
environment, code, globals, uptodate
)
@internalcode
async def load_async(
self,
environment: "Environment",
name: str,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
"""Asynchronously loads a template. This method looks up the template in the cache
or loads one by calling :meth:`get_source_async`. Subclasses should not
override this method as loaders working on collections of other
loaders (such as :class:`PrefixLoader` or :class:`ChoiceLoader`)
will not call this method but `get_source_async` directly.
"""
code = None
if globals is None:
globals = {}
# first we try to get the source for this template together
# with the filename and the uptodate function.
source, filename, uptodate = await self.get_source_async(environment, name)
# try to load the code from the bytecode cache if there is a
# bytecode cache configured.
bcc = environment.bytecode_cache
if bcc is not None:
bucket = await bcc.get_bucket_async(environment, name, filename, source)
code = bucket.code
# if we don't have code so far (not cached, no longer up to
# date) etc. we compile the template
if code is None:
code = environment.compile(source, name, filename)
# if the bytecode cache is available and the bucket doesn't
# have a code so far, we give the bucket the new code and put
# it back to the bytecode cache.
if bcc is not None and bucket.code is None:
bucket.code = code
await bcc.set_bucket_async(bucket)
return environment.template_class.from_code(
environment, code, globals, uptodate
)
class FileSystemLoader(BaseLoader):
"""Load templates from a directory in the file system.
@ -475,10 +548,12 @@ class FunctionLoader(BaseLoader):
self,
load_func: t.Callable[
[str],
str | tuple[str, str | None, t.Callable[[], bool] | None] | None,
str | tuple[str, str | None, t.Callable[[], bool] | None] | None
| t.Awaitable[str | tuple[str, str | None, t.Callable[[], bool] | None] | None],
],
) -> None:
self.load_func = load_func
self.is_async = iscoroutinefunction(self.load_func)
def get_source(
self, environment: "Environment", template: str
@ -493,6 +568,22 @@ class FunctionLoader(BaseLoader):
return rv
async def get_source_async(
self, environment: "Environment", template: str
) -> tuple[str, str | None, t.Callable[[], bool] | None]:
if self.is_async:
rv = await self.load_func(template)
else:
rv = self.load_func(template)
if rv is None:
raise TemplateNotFound(template)
if isinstance(rv, str):
return rv, None, None
return rv
class PrefixLoader(BaseLoader):
"""A loader that is passed a dict of loaders where each loader is bound
@ -534,6 +625,15 @@ class PrefixLoader(BaseLoader):
# (the one that includes the prefix)
raise TemplateNotFound(template) from e
async def get_source_async(
self, environment: "Environment", template: str
) -> tuple[str, str | None, t.Callable[[], bool] | None]:
loader, name = self.get_loader(template)
try:
return await loader.get_source_async(environment, name)
except TemplateNotFound as e:
raise TemplateNotFound(template) from e
@internalcode
def load(
self,
@ -549,6 +649,19 @@ class PrefixLoader(BaseLoader):
# (the one that includes the prefix)
raise TemplateNotFound(name) from e
@internalcode
async def load_async(
self,
environment: "Environment",
name: str,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
loader, local_name = self.get_loader(name)
try:
return await loader.load_async(environment, local_name, globals)
except TemplateNotFound as e:
raise TemplateNotFound(name) from e
def list_templates(self) -> list[str]:
result = []
for prefix, loader in self.mapping.items():
@ -556,6 +669,13 @@ class PrefixLoader(BaseLoader):
result.append(prefix + self.delimiter + template)
return result
async def list_templates_async(self) -> list[str]:
result = []
for prefix, loader in self.mapping.items():
async for template in await loader.list_templates_async():
result.append(prefix + self.delimiter + template)
return result
class ChoiceLoader(BaseLoader):
"""This loader works like the `PrefixLoader` just that no prefix is
@ -584,6 +704,16 @@ class ChoiceLoader(BaseLoader):
pass
raise TemplateNotFound(template)
async def get_source_async(
self, environment: "Environment", template: str
) -> tuple[str, str | None, t.Callable[[], bool] | None]:
for loader in self.loaders:
try:
return await loader.get_source_async(environment, template)
except TemplateNotFound:
pass
raise TemplateNotFound(template)
@internalcode
def load(
self,
@ -598,12 +728,32 @@ class ChoiceLoader(BaseLoader):
pass
raise TemplateNotFound(name)
@internalcode
async def load_async(
self,
environment: "Environment",
name: str,
globals: t.MutableMapping[str, t.Any] | None = None,
) -> "Template":
for loader in self.loaders:
try:
return await loader.load_async(environment, name, globals)
except TemplateNotFound:
pass
raise TemplateNotFound(name)
def list_templates(self) -> list[str]:
found = set()
for loader in self.loaders:
found.update(loader.list_templates())
return sorted(found)
async def list_templates_async(self) -> list[str]:
found = set()
for loader in self.loaders:
found.update(await loader.list_templates_async())
return sorted(found)
class _TemplateModule(ModuleType):
"""Like a normal module but with support for weak references"""

View File

@ -15,6 +15,7 @@ from markupsafe import EscapeFormatter
from markupsafe import Markup
from .environment import Environment
from .environment import AsyncEnvironment
from .exceptions import SecurityError
from .runtime import Context
from .runtime import Undefined
@ -398,6 +399,8 @@ class SandboxedEnvironment(Environment):
raise SecurityError(f"{__obj!r} is not safely callable")
return __context.call(__obj, *args, **kwargs)
class AsyncSandboxedEnvironment(AsyncEnvironment, SandboxedEnvironment):
pass
class ImmutableSandboxedEnvironment(SandboxedEnvironment):
"""Works exactly like the regular `SandboxedEnvironment` but does not