satisfy formatter, linter, and strict mypy
This commit is contained in:
parent
20477c6357
commit
0ee5eb41d1
@ -2,6 +2,7 @@
|
||||
non-XML syntax that supports inline expressions and an optional
|
||||
sandboxed environment.
|
||||
"""
|
||||
|
||||
from .bccache import BytecodeCache as BytecodeCache
|
||||
from .bccache import FileSystemBytecodeCache as FileSystemBytecodeCache
|
||||
from .bccache import MemcachedBytecodeCache as MemcachedBytecodeCache
|
||||
|
||||
@ -47,7 +47,7 @@ def async_variant(normal_func): # type: ignore
|
||||
if need_eval_context:
|
||||
wrapper = pass_eval_context(wrapper)
|
||||
|
||||
wrapper.jinja_async_variant = True
|
||||
wrapper.jinja_async_variant = True # type: ignore[attr-defined]
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@ -5,6 +5,7 @@ slows down your application too much.
|
||||
Situations where this is useful are often forking web applications that
|
||||
are initialized on the first request.
|
||||
"""
|
||||
|
||||
import errno
|
||||
import fnmatch
|
||||
import marshal
|
||||
@ -20,14 +21,15 @@ from types import CodeType
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
class _MemcachedClient(te.Protocol):
|
||||
def get(self, key: str) -> bytes:
|
||||
...
|
||||
def get(self, key: str) -> bytes: ...
|
||||
|
||||
def set(self, key: str, value: bytes, timeout: t.Optional[int] = None) -> None:
|
||||
...
|
||||
def set(
|
||||
self, key: str, value: bytes, timeout: t.Optional[int] = None
|
||||
) -> None: ...
|
||||
|
||||
|
||||
bc_version = 5
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""Compiles nodes from the parser into Python code."""
|
||||
|
||||
import typing as t
|
||||
from contextlib import contextmanager
|
||||
from functools import update_wrapper
|
||||
@ -24,6 +25,7 @@ from .visitor import NodeVisitor
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
|
||||
@ -60,8 +62,7 @@ def _make_binop(op: str) -> t.Callable[["CodeGenerator", nodes.BinExpr, "Frame"]
|
||||
@optimizeconst
|
||||
def visitor(self: "CodeGenerator", node: nodes.BinExpr, frame: Frame) -> None:
|
||||
if (
|
||||
self.environment.sandboxed
|
||||
and op in self.environment.intercepted_binops # type: ignore
|
||||
self.environment.sandboxed and op in self.environment.intercepted_binops # type: ignore
|
||||
):
|
||||
self.write(f"environment.call_binop(context, {op!r}, ")
|
||||
self.visit(node.left, frame)
|
||||
@ -84,8 +85,7 @@ def _make_unop(
|
||||
@optimizeconst
|
||||
def visitor(self: "CodeGenerator", node: nodes.UnaryExpr, frame: Frame) -> None:
|
||||
if (
|
||||
self.environment.sandboxed
|
||||
and op in self.environment.intercepted_unops # type: ignore
|
||||
self.environment.sandboxed and op in self.environment.intercepted_unops # type: ignore
|
||||
):
|
||||
self.write(f"environment.call_unop(context, {op!r}, ")
|
||||
self.visit(node.node, frame)
|
||||
@ -133,7 +133,7 @@ def has_safe_repr(value: t.Any) -> bool:
|
||||
if type(value) in {tuple, list, set, frozenset}:
|
||||
return all(has_safe_repr(v) for v in value)
|
||||
|
||||
if type(value) is dict:
|
||||
if type(value) is dict: # noqa E721
|
||||
return all(has_safe_repr(k) and has_safe_repr(v) for k, v in value.items())
|
||||
|
||||
return False
|
||||
@ -551,10 +551,13 @@ class CodeGenerator(NodeVisitor):
|
||||
for node in nodes:
|
||||
visitor.visit(node)
|
||||
|
||||
for id_map, names, dependency in (self.filters, visitor.filters, "filters"), (
|
||||
self.tests,
|
||||
visitor.tests,
|
||||
"tests",
|
||||
for id_map, names, dependency in (
|
||||
(self.filters, visitor.filters, "filters"),
|
||||
(
|
||||
self.tests,
|
||||
visitor.tests,
|
||||
"tests",
|
||||
),
|
||||
):
|
||||
for name in sorted(names):
|
||||
if name not in id_map:
|
||||
@ -829,7 +832,8 @@ class CodeGenerator(NodeVisitor):
|
||||
assert frame is None, "no root frame allowed"
|
||||
eval_ctx = EvalContext(self.environment, self.name)
|
||||
|
||||
from .runtime import exported, async_exported
|
||||
from .runtime import async_exported
|
||||
from .runtime import exported
|
||||
|
||||
if self.environment.is_async:
|
||||
exported_names = sorted(exported + async_exported)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""Classes for managing templates and their runtime and compile time
|
||||
options.
|
||||
"""
|
||||
|
||||
import os
|
||||
import typing
|
||||
import typing as t
|
||||
@ -20,10 +21,10 @@ from .defaults import BLOCK_END_STRING
|
||||
from .defaults import BLOCK_START_STRING
|
||||
from .defaults import COMMENT_END_STRING
|
||||
from .defaults import COMMENT_START_STRING
|
||||
from .defaults import DEFAULT_FILTERS
|
||||
from .defaults import DEFAULT_FILTERS # type: ignore[attr-defined]
|
||||
from .defaults import DEFAULT_NAMESPACE
|
||||
from .defaults import DEFAULT_POLICIES
|
||||
from .defaults import DEFAULT_TESTS
|
||||
from .defaults import DEFAULT_TESTS # type: ignore[attr-defined]
|
||||
from .defaults import KEEP_TRAILING_NEWLINE
|
||||
from .defaults import LINE_COMMENT_PREFIX
|
||||
from .defaults import LINE_STATEMENT_PREFIX
|
||||
@ -55,6 +56,7 @@ from .utils import missing
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .bccache import BytecodeCache
|
||||
from .ext import Extension
|
||||
from .loaders import BaseLoader
|
||||
@ -79,7 +81,7 @@ def get_spontaneous_environment(cls: t.Type[_env_bound], *args: t.Any) -> _env_b
|
||||
|
||||
def create_cache(
|
||||
size: int,
|
||||
) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]:
|
||||
) -> t.Optional[t.MutableMapping[t.Tuple["weakref.ref[t.Any]", str], "Template"]]:
|
||||
"""Return the cache class for the given size."""
|
||||
if size == 0:
|
||||
return None
|
||||
@ -91,13 +93,13 @@ def create_cache(
|
||||
|
||||
|
||||
def copy_cache(
|
||||
cache: t.Optional[t.MutableMapping],
|
||||
) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]:
|
||||
cache: t.Optional[t.MutableMapping[t.Any, t.Any]],
|
||||
) -> t.Optional[t.MutableMapping[t.Tuple["weakref.ref[t.Any]", str], "Template"]]:
|
||||
"""Create an empty copy of the given cache."""
|
||||
if cache is None:
|
||||
return None
|
||||
|
||||
if type(cache) is dict:
|
||||
if type(cache) is dict: # noqa E721
|
||||
return {}
|
||||
|
||||
return LRUCache(cache.capacity) # type: ignore
|
||||
@ -670,7 +672,7 @@ class Environment:
|
||||
stream = ext.filter_stream(stream) # type: ignore
|
||||
|
||||
if not isinstance(stream, TokenStream):
|
||||
stream = TokenStream(stream, name, filename) # type: ignore
|
||||
stream = TokenStream(stream, name, filename)
|
||||
|
||||
return stream
|
||||
|
||||
@ -711,8 +713,7 @@ class Environment:
|
||||
filename: t.Optional[str] = None,
|
||||
raw: "te.Literal[False]" = False,
|
||||
defer_init: bool = False,
|
||||
) -> CodeType:
|
||||
...
|
||||
) -> CodeType: ...
|
||||
|
||||
@typing.overload
|
||||
def compile(
|
||||
@ -722,8 +723,7 @@ class Environment:
|
||||
filename: t.Optional[str] = None,
|
||||
raw: "te.Literal[True]" = ...,
|
||||
defer_init: bool = False,
|
||||
) -> str:
|
||||
...
|
||||
) -> str: ...
|
||||
|
||||
@internalcode
|
||||
def compile(
|
||||
@ -814,7 +814,7 @@ class Environment:
|
||||
|
||||
def compile_templates(
|
||||
self,
|
||||
target: t.Union[str, os.PathLike],
|
||||
target: t.Union[str, "os.PathLike[str]"],
|
||||
extensions: t.Optional[t.Collection[str]] = None,
|
||||
filter_func: t.Optional[t.Callable[[str], bool]] = None,
|
||||
zip: t.Optional[str] = "deflated",
|
||||
@ -858,7 +858,10 @@ class Environment:
|
||||
f.write(data.encode("utf8"))
|
||||
|
||||
if zip is not None:
|
||||
from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED, ZIP_STORED
|
||||
from zipfile import ZIP_DEFLATED
|
||||
from zipfile import ZIP_STORED
|
||||
from zipfile import ZipFile
|
||||
from zipfile import ZipInfo
|
||||
|
||||
zip_file = ZipFile(
|
||||
target, "w", dict(deflated=ZIP_DEFLATED, stored=ZIP_STORED)[zip]
|
||||
@ -1417,7 +1420,9 @@ class Template:
|
||||
"""
|
||||
ctx = self.new_context(vars, shared, locals)
|
||||
return TemplateModule(
|
||||
self, ctx, [x async for x in self.root_render_func(ctx)] # type: ignore
|
||||
self,
|
||||
ctx,
|
||||
[x async for x in self.root_render_func(ctx)], # type: ignore
|
||||
)
|
||||
|
||||
@internalcode
|
||||
@ -1588,7 +1593,7 @@ class TemplateStream:
|
||||
|
||||
def dump(
|
||||
self,
|
||||
fp: t.Union[str, t.IO],
|
||||
fp: t.Union[str, t.IO[bytes]],
|
||||
encoding: t.Optional[str] = None,
|
||||
errors: t.Optional[str] = "strict",
|
||||
) -> None:
|
||||
@ -1606,22 +1611,25 @@ class TemplateStream:
|
||||
if encoding is None:
|
||||
encoding = "utf-8"
|
||||
|
||||
fp = open(fp, "wb")
|
||||
real_fp: t.IO[bytes] = open(fp, "wb")
|
||||
close = True
|
||||
else:
|
||||
real_fp = fp
|
||||
|
||||
try:
|
||||
if encoding is not None:
|
||||
iterable = (x.encode(encoding, errors) for x in self) # type: ignore
|
||||
else:
|
||||
iterable = self # type: ignore
|
||||
|
||||
if hasattr(fp, "writelines"):
|
||||
fp.writelines(iterable)
|
||||
if hasattr(real_fp, "writelines"):
|
||||
real_fp.writelines(iterable)
|
||||
else:
|
||||
for item in iterable:
|
||||
fp.write(item)
|
||||
real_fp.write(item)
|
||||
finally:
|
||||
if close:
|
||||
fp.close()
|
||||
real_fp.close()
|
||||
|
||||
def disable_buffering(self) -> None:
|
||||
"""Disable the output buffering."""
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""Extension API for adding custom tags and behavior."""
|
||||
|
||||
import pprint
|
||||
import re
|
||||
import typing as t
|
||||
@ -18,23 +19,23 @@ from .utils import pass_context
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .lexer import Token
|
||||
from .lexer import TokenStream
|
||||
from .parser import Parser
|
||||
|
||||
class _TranslationsBasic(te.Protocol):
|
||||
def gettext(self, message: str) -> str:
|
||||
...
|
||||
def gettext(self, message: str) -> str: ...
|
||||
|
||||
def ngettext(self, singular: str, plural: str, n: int) -> str:
|
||||
pass
|
||||
|
||||
class _TranslationsContext(_TranslationsBasic):
|
||||
def pgettext(self, context: str, message: str) -> str:
|
||||
...
|
||||
def pgettext(self, context: str, message: str) -> str: ...
|
||||
|
||||
def npgettext(self, context: str, singular: str, plural: str, n: int) -> str:
|
||||
...
|
||||
def npgettext(
|
||||
self, context: str, singular: str, plural: str, n: int
|
||||
) -> str: ...
|
||||
|
||||
_SupportedTranslations = t.Union[_TranslationsBasic, _TranslationsContext]
|
||||
|
||||
@ -218,7 +219,7 @@ def _make_new_pgettext(func: t.Callable[[str, str], str]) -> t.Callable[..., str
|
||||
|
||||
|
||||
def _make_new_npgettext(
|
||||
func: t.Callable[[str, str, str, int], str]
|
||||
func: t.Callable[[str, str, str, int], str],
|
||||
) -> t.Callable[..., str]:
|
||||
@pass_context
|
||||
def npgettext(
|
||||
@ -294,14 +295,14 @@ class InternationalizationExtension(Extension):
|
||||
pgettext = translations.pgettext
|
||||
else:
|
||||
|
||||
def pgettext(c: str, s: str) -> str:
|
||||
def pgettext(c: str, s: str) -> str: # type: ignore[misc]
|
||||
return s
|
||||
|
||||
if hasattr(translations, "npgettext"):
|
||||
npgettext = translations.npgettext
|
||||
else:
|
||||
|
||||
def npgettext(c: str, s: str, p: str, n: int) -> str:
|
||||
def npgettext(c: str, s: str, p: str, n: int) -> str: # type: ignore[misc]
|
||||
return s if n == 1 else p
|
||||
|
||||
self._install_callables(
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""Built-in template filters used with the ``|`` operator."""
|
||||
|
||||
import math
|
||||
import random
|
||||
import re
|
||||
@ -28,6 +29,7 @@ from .utils import urlize
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
from .nodes import EvalContext
|
||||
from .runtime import Context
|
||||
@ -122,7 +124,7 @@ def make_multi_attrgetter(
|
||||
|
||||
|
||||
def _prepare_attribute_parts(
|
||||
attr: t.Optional[t.Union[str, int]]
|
||||
attr: t.Optional[t.Union[str, int]],
|
||||
) -> t.List[t.Union[str, int]]:
|
||||
if attr is None:
|
||||
return []
|
||||
@ -142,7 +144,7 @@ def do_forceescape(value: "t.Union[str, HasHTML]") -> Markup:
|
||||
|
||||
|
||||
def do_urlencode(
|
||||
value: t.Union[str, t.Mapping[str, t.Any], t.Iterable[t.Tuple[str, t.Any]]]
|
||||
value: t.Union[str, t.Mapping[str, t.Any], t.Iterable[t.Tuple[str, t.Any]]],
|
||||
) -> str:
|
||||
"""Quote data for use in a URL path or query using UTF-8.
|
||||
|
||||
@ -552,7 +554,7 @@ def do_default(
|
||||
@pass_eval_context
|
||||
def sync_do_join(
|
||||
eval_ctx: "EvalContext",
|
||||
value: t.Iterable,
|
||||
value: t.Iterable[t.Any],
|
||||
d: str = "",
|
||||
attribute: t.Optional[t.Union[str, int]] = None,
|
||||
) -> str:
|
||||
@ -610,7 +612,7 @@ def sync_do_join(
|
||||
@async_variant(sync_do_join) # type: ignore
|
||||
async def do_join(
|
||||
eval_ctx: "EvalContext",
|
||||
value: t.Union[t.AsyncIterable, t.Iterable],
|
||||
value: t.Union[t.AsyncIterable[t.Any], t.Iterable[t.Any]],
|
||||
d: str = "",
|
||||
attribute: t.Optional[t.Union[str, int]] = None,
|
||||
) -> str:
|
||||
@ -1160,7 +1162,7 @@ def do_round(
|
||||
|
||||
class _GroupTuple(t.NamedTuple):
|
||||
grouper: t.Any
|
||||
list: t.List
|
||||
list: t.List[t.Any]
|
||||
|
||||
# Use the regular tuple repr to hide this subclass if users print
|
||||
# out the value during debugging.
|
||||
@ -1356,13 +1358,11 @@ def do_mark_unsafe(value: str) -> str:
|
||||
|
||||
|
||||
@typing.overload
|
||||
def do_reverse(value: str) -> str:
|
||||
...
|
||||
def do_reverse(value: str) -> str: ...
|
||||
|
||||
|
||||
@typing.overload
|
||||
def do_reverse(value: "t.Iterable[V]") -> "t.Iterable[V]":
|
||||
...
|
||||
def do_reverse(value: "t.Iterable[V]") -> "t.Iterable[V]": ...
|
||||
|
||||
|
||||
def do_reverse(value: t.Union[str, t.Iterable[V]]) -> t.Union[str, t.Iterable[V]]:
|
||||
@ -1416,26 +1416,28 @@ def do_attr(
|
||||
|
||||
@typing.overload
|
||||
def sync_do_map(
|
||||
context: "Context", value: t.Iterable, name: str, *args: t.Any, **kwargs: t.Any
|
||||
) -> t.Iterable:
|
||||
...
|
||||
context: "Context",
|
||||
value: t.Iterable[t.Any],
|
||||
name: str,
|
||||
*args: t.Any,
|
||||
**kwargs: t.Any,
|
||||
) -> t.Iterable[t.Any]: ...
|
||||
|
||||
|
||||
@typing.overload
|
||||
def sync_do_map(
|
||||
context: "Context",
|
||||
value: t.Iterable,
|
||||
value: t.Iterable[t.Any],
|
||||
*,
|
||||
attribute: str = ...,
|
||||
default: t.Optional[t.Any] = None,
|
||||
) -> t.Iterable:
|
||||
...
|
||||
) -> t.Iterable[t.Any]: ...
|
||||
|
||||
|
||||
@pass_context
|
||||
def sync_do_map(
|
||||
context: "Context", value: t.Iterable, *args: t.Any, **kwargs: t.Any
|
||||
) -> t.Iterable:
|
||||
context: "Context", value: t.Iterable[t.Any], *args: t.Any, **kwargs: t.Any
|
||||
) -> t.Iterable[t.Any]:
|
||||
"""Applies a filter on a sequence of objects or looks up an attribute.
|
||||
This is useful when dealing with lists of objects but you are really
|
||||
only interested in a certain value of it.
|
||||
@ -1485,32 +1487,30 @@ def sync_do_map(
|
||||
@typing.overload
|
||||
def do_map(
|
||||
context: "Context",
|
||||
value: t.Union[t.AsyncIterable, t.Iterable],
|
||||
value: t.Union[t.AsyncIterable[t.Any], t.Iterable[t.Any]],
|
||||
name: str,
|
||||
*args: t.Any,
|
||||
**kwargs: t.Any,
|
||||
) -> t.Iterable:
|
||||
...
|
||||
) -> t.Iterable[t.Any]: ...
|
||||
|
||||
|
||||
@typing.overload
|
||||
def do_map(
|
||||
context: "Context",
|
||||
value: t.Union[t.AsyncIterable, t.Iterable],
|
||||
value: t.Union[t.AsyncIterable[t.Any], t.Iterable[t.Any]],
|
||||
*,
|
||||
attribute: str = ...,
|
||||
default: t.Optional[t.Any] = None,
|
||||
) -> t.Iterable:
|
||||
...
|
||||
) -> t.Iterable[t.Any]: ...
|
||||
|
||||
|
||||
@async_variant(sync_do_map) # type: ignore
|
||||
async def do_map(
|
||||
context: "Context",
|
||||
value: t.Union[t.AsyncIterable, t.Iterable],
|
||||
value: t.Union[t.AsyncIterable[t.Any], t.Iterable[t.Any]],
|
||||
*args: t.Any,
|
||||
**kwargs: t.Any,
|
||||
) -> t.AsyncIterable:
|
||||
) -> t.AsyncIterable[t.Any]:
|
||||
if value:
|
||||
func = prepare_map(context, args, kwargs)
|
||||
|
||||
@ -1703,7 +1703,7 @@ def do_tojson(
|
||||
|
||||
|
||||
def prepare_map(
|
||||
context: "Context", args: t.Tuple, kwargs: t.Dict[str, t.Any]
|
||||
context: "Context", args: t.Tuple[t.Any, ...], kwargs: t.Dict[str, t.Any]
|
||||
) -> t.Callable[[t.Any], t.Any]:
|
||||
if not args and "attribute" in kwargs:
|
||||
attribute = kwargs.pop("attribute")
|
||||
@ -1732,7 +1732,7 @@ def prepare_map(
|
||||
|
||||
def prepare_select_or_reject(
|
||||
context: "Context",
|
||||
args: t.Tuple,
|
||||
args: t.Tuple[t.Any, ...],
|
||||
kwargs: t.Dict[str, t.Any],
|
||||
modfunc: t.Callable[[t.Any], t.Any],
|
||||
lookup_attr: bool,
|
||||
@ -1767,7 +1767,7 @@ def prepare_select_or_reject(
|
||||
def select_or_reject(
|
||||
context: "Context",
|
||||
value: "t.Iterable[V]",
|
||||
args: t.Tuple,
|
||||
args: t.Tuple[t.Any, ...],
|
||||
kwargs: t.Dict[str, t.Any],
|
||||
modfunc: t.Callable[[t.Any], t.Any],
|
||||
lookup_attr: bool,
|
||||
@ -1783,7 +1783,7 @@ def select_or_reject(
|
||||
async def async_select_or_reject(
|
||||
context: "Context",
|
||||
value: "t.Union[t.AsyncIterable[V], t.Iterable[V]]",
|
||||
args: t.Tuple,
|
||||
args: t.Tuple[t.Any, ...],
|
||||
kwargs: t.Dict[str, t.Any],
|
||||
modfunc: t.Callable[[t.Any], t.Any],
|
||||
lookup_attr: bool,
|
||||
|
||||
@ -3,6 +3,7 @@ is used to do some preprocessing. It filters out invalid operators like
|
||||
the bitshift operators we don't allow in templates. It separates
|
||||
template code and python code in expressions.
|
||||
"""
|
||||
|
||||
import re
|
||||
import typing as t
|
||||
from ast import literal_eval
|
||||
@ -15,6 +16,7 @@ from .utils import LRUCache
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
# cache for the lexers. Exists in order to be able to have multiple
|
||||
@ -447,7 +449,7 @@ def get_lexer(environment: "Environment") -> "Lexer":
|
||||
return lexer
|
||||
|
||||
|
||||
class OptionalLStrip(tuple):
|
||||
class OptionalLStrip(tuple): # type: ignore[type-arg]
|
||||
"""A special tuple for marking a point in the state that can have
|
||||
lstrip applied.
|
||||
"""
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""API and implementations for loading templates from different data
|
||||
sources.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import os
|
||||
import posixpath
|
||||
@ -177,7 +178,9 @@ class FileSystemLoader(BaseLoader):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
searchpath: t.Union[str, os.PathLike, t.Sequence[t.Union[str, os.PathLike]]],
|
||||
searchpath: t.Union[
|
||||
str, "os.PathLike[str]", t.Sequence[t.Union[str, "os.PathLike[str]"]]
|
||||
],
|
||||
encoding: str = "utf-8",
|
||||
followlinks: bool = False,
|
||||
) -> None:
|
||||
@ -601,7 +604,10 @@ class ModuleLoader(BaseLoader):
|
||||
has_source_access = False
|
||||
|
||||
def __init__(
|
||||
self, path: t.Union[str, os.PathLike, t.Sequence[t.Union[str, os.PathLike]]]
|
||||
self,
|
||||
path: t.Union[
|
||||
str, "os.PathLike[str]", t.Sequence[t.Union[str, "os.PathLike[str]"]]
|
||||
],
|
||||
) -> None:
|
||||
package_name = f"_jinja2_module_templates_{id(self):x}"
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""Functions that expose information about templates that might be
|
||||
interesting for introspection.
|
||||
"""
|
||||
|
||||
import typing as t
|
||||
|
||||
from . import nodes
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
some node tree helper functions used by the parser and compiler in order
|
||||
to normalize nodes.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import operator
|
||||
import typing as t
|
||||
@ -13,6 +14,7 @@ from .utils import _PassArg
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
_NodeBound = t.TypeVar("_NodeBound", bound="Node")
|
||||
@ -56,7 +58,7 @@ class NodeType(type):
|
||||
|
||||
def __new__(mcs, name, bases, d): # type: ignore
|
||||
for attr in "fields", "attributes":
|
||||
storage = []
|
||||
storage: t.List[t.Tuple[str, ...]] = []
|
||||
storage.extend(getattr(bases[0] if bases else object, attr, ()))
|
||||
storage.extend(d.get(attr, ()))
|
||||
assert len(bases) <= 1, "multiple inheritance not allowed"
|
||||
|
||||
@ -7,6 +7,7 @@ want. For example, loop unrolling doesn't work because unrolled loops
|
||||
would have a different scope. The solution would be a second syntax tree
|
||||
that stored the scoping rules.
|
||||
"""
|
||||
|
||||
import typing as t
|
||||
|
||||
from . import nodes
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""Parse tokens from the lexer into nodes for the compiler."""
|
||||
|
||||
import typing
|
||||
import typing as t
|
||||
|
||||
@ -10,6 +11,7 @@ from .lexer import describe_token_expr
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
_ImportInclude = t.TypeVar("_ImportInclude", nodes.Import, nodes.Include)
|
||||
@ -457,8 +459,7 @@ class Parser:
|
||||
@typing.overload
|
||||
def parse_assign_target(
|
||||
self, with_tuple: bool = ..., name_only: "te.Literal[True]" = ...
|
||||
) -> nodes.Name:
|
||||
...
|
||||
) -> nodes.Name: ...
|
||||
|
||||
@typing.overload
|
||||
def parse_assign_target(
|
||||
@ -467,8 +468,7 @@ class Parser:
|
||||
name_only: bool = False,
|
||||
extra_end_rules: t.Optional[t.Tuple[str, ...]] = None,
|
||||
with_namespace: bool = False,
|
||||
) -> t.Union[nodes.NSRef, nodes.Name, nodes.Tuple]:
|
||||
...
|
||||
) -> t.Union[nodes.NSRef, nodes.Name, nodes.Tuple]: ...
|
||||
|
||||
def parse_assign_target(
|
||||
self,
|
||||
@ -861,7 +861,14 @@ class Parser:
|
||||
|
||||
return nodes.Slice(lineno=lineno, *args) # noqa: B026
|
||||
|
||||
def parse_call_args(self) -> t.Tuple:
|
||||
def parse_call_args(
|
||||
self,
|
||||
) -> t.Tuple[
|
||||
t.List[nodes.Expr],
|
||||
t.List[nodes.Keyword],
|
||||
t.Optional[nodes.Expr],
|
||||
t.Optional[nodes.Expr],
|
||||
]:
|
||||
token = self.stream.expect("lparen")
|
||||
args = []
|
||||
kwargs = []
|
||||
@ -952,7 +959,7 @@ class Parser:
|
||||
next(self.stream)
|
||||
name += "." + self.stream.expect("name").value
|
||||
dyn_args = dyn_kwargs = None
|
||||
kwargs = []
|
||||
kwargs: t.List[nodes.Keyword] = []
|
||||
if self.stream.current.type == "lparen":
|
||||
args, kwargs, dyn_args, dyn_kwargs = self.parse_call_args()
|
||||
elif self.stream.current.type in {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""The runtime functions and state used by compiled templates."""
|
||||
|
||||
import functools
|
||||
import sys
|
||||
import typing as t
|
||||
@ -28,7 +29,9 @@ F = t.TypeVar("F", bound=t.Callable[..., t.Any])
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
import logging
|
||||
|
||||
import typing_extensions as te
|
||||
|
||||
from .environment import Environment
|
||||
|
||||
class LoopRenderFunc(te.Protocol):
|
||||
@ -37,8 +40,7 @@ if t.TYPE_CHECKING:
|
||||
reciter: t.Iterable[V],
|
||||
loop_render_func: "LoopRenderFunc",
|
||||
depth: int = 0,
|
||||
) -> str:
|
||||
...
|
||||
) -> str: ...
|
||||
|
||||
|
||||
# these variables are exported to the template runtime
|
||||
@ -259,7 +261,10 @@ class Context:
|
||||
|
||||
@internalcode
|
||||
def call(
|
||||
__self, __obj: t.Callable, *args: t.Any, **kwargs: t.Any # noqa: B902
|
||||
__self,
|
||||
__obj: t.Callable[..., t.Any],
|
||||
*args: t.Any,
|
||||
**kwargs: t.Any, # noqa: B902
|
||||
) -> t.Union[t.Any, "Undefined"]:
|
||||
"""Call the callable with the arguments and keyword arguments
|
||||
provided but inject the active context or environment as first
|
||||
@ -586,7 +591,7 @@ class AsyncLoopContext(LoopContext):
|
||||
|
||||
@staticmethod
|
||||
def _to_iterator( # type: ignore
|
||||
iterable: t.Union[t.Iterable[V], t.AsyncIterable[V]]
|
||||
iterable: t.Union[t.Iterable[V], t.AsyncIterable[V]],
|
||||
) -> t.AsyncIterator[V]:
|
||||
return auto_aiter(iterable)
|
||||
|
||||
|
||||
@ -1,14 +1,15 @@
|
||||
"""A sandbox layer that ensures unsafe operations cannot be performed.
|
||||
Useful when the template itself comes from an untrusted source.
|
||||
"""
|
||||
|
||||
import operator
|
||||
import types
|
||||
import typing as t
|
||||
from _string import formatter_field_name_split # type: ignore
|
||||
from collections import abc
|
||||
from collections import deque
|
||||
from string import Formatter
|
||||
|
||||
from _string import formatter_field_name_split # type: ignore
|
||||
from markupsafe import EscapeFormatter
|
||||
from markupsafe import Markup
|
||||
|
||||
@ -37,7 +38,7 @@ UNSAFE_COROUTINE_ATTRIBUTES = {"cr_frame", "cr_code"}
|
||||
#: unsafe attributes on async generators
|
||||
UNSAFE_ASYNC_GENERATOR_ATTRIBUTES = {"ag_code", "ag_frame"}
|
||||
|
||||
_mutable_spec: t.Tuple[t.Tuple[t.Type, t.FrozenSet[str]], ...] = (
|
||||
_mutable_spec: t.Tuple[t.Tuple[t.Type[t.Any], t.FrozenSet[str]], ...] = (
|
||||
(
|
||||
abc.MutableSet,
|
||||
frozenset(
|
||||
@ -80,7 +81,7 @@ _mutable_spec: t.Tuple[t.Tuple[t.Type, t.FrozenSet[str]], ...] = (
|
||||
)
|
||||
|
||||
|
||||
def inspect_format_method(callable: t.Callable) -> t.Optional[str]:
|
||||
def inspect_format_method(callable: t.Callable[..., t.Any]) -> t.Optional[str]:
|
||||
if not isinstance(
|
||||
callable, (types.MethodType, types.BuiltinMethodType)
|
||||
) or callable.__name__ not in ("format", "format_map"):
|
||||
@ -350,7 +351,7 @@ class SandboxedEnvironment(Environment):
|
||||
s: str,
|
||||
args: t.Tuple[t.Any, ...],
|
||||
kwargs: t.Dict[str, t.Any],
|
||||
format_func: t.Optional[t.Callable] = None,
|
||||
format_func: t.Optional[t.Callable[..., t.Any]] = None,
|
||||
) -> str:
|
||||
"""If a format call is detected, then this is routed through this
|
||||
method so that our safety sandbox can be used for it.
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
"""Built-in template tests used with the ``is`` operator."""
|
||||
|
||||
import operator
|
||||
import typing as t
|
||||
from collections import abc
|
||||
@ -169,7 +170,7 @@ def test_sequence(value: t.Any) -> bool:
|
||||
"""
|
||||
try:
|
||||
len(value)
|
||||
value.__getitem__
|
||||
value.__getitem__ # noqa B018
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@ -204,7 +205,7 @@ def test_escaped(value: t.Any) -> bool:
|
||||
return hasattr(value, "__html__")
|
||||
|
||||
|
||||
def test_in(value: t.Any, seq: t.Container) -> bool:
|
||||
def test_in(value: t.Any, seq: t.Container[t.Any]) -> bool:
|
||||
"""Check if value is in seq.
|
||||
|
||||
.. versionadded:: 2.10
|
||||
|
||||
@ -152,7 +152,7 @@ def import_string(import_name: str, silent: bool = False) -> t.Any:
|
||||
raise
|
||||
|
||||
|
||||
def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO]:
|
||||
def open_if_exists(filename: str, mode: str = "rb") -> t.Optional[t.IO[t.Any]]:
|
||||
"""Returns a file descriptor for the filename if that file exists,
|
||||
otherwise ``None``.
|
||||
"""
|
||||
@ -450,7 +450,7 @@ class LRUCache:
|
||||
self.__dict__.update(d)
|
||||
self._postinit()
|
||||
|
||||
def __getnewargs__(self) -> t.Tuple:
|
||||
def __getnewargs__(self) -> t.Tuple[t.Any, ...]:
|
||||
return (self.capacity,)
|
||||
|
||||
def copy(self) -> "LRUCache":
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""API for traversing the AST nodes. Implemented by the compiler and
|
||||
meta introspection.
|
||||
"""
|
||||
|
||||
import typing as t
|
||||
|
||||
from .nodes import Node
|
||||
@ -9,8 +10,7 @@ if t.TYPE_CHECKING:
|
||||
import typing_extensions as te
|
||||
|
||||
class VisitCallable(te.Protocol):
|
||||
def __call__(self, node: Node, *args: t.Any, **kwargs: t.Any) -> t.Any:
|
||||
...
|
||||
def __call__(self, node: Node, *args: t.Any, **kwargs: t.Any) -> t.Any: ...
|
||||
|
||||
|
||||
class NodeVisitor:
|
||||
|
||||
@ -150,7 +150,8 @@ class TestExtendedAPI:
|
||||
assert t.render(foo="<foo>") == "<foo>"
|
||||
|
||||
def test_sandbox_max_range(self, env):
|
||||
from jinja2.sandbox import SandboxedEnvironment, MAX_RANGE
|
||||
from jinja2.sandbox import MAX_RANGE
|
||||
from jinja2.sandbox import SandboxedEnvironment
|
||||
|
||||
env = SandboxedEnvironment()
|
||||
t = env.from_string("{% for item in range(total) %}{{ item }}{% endfor %}")
|
||||
@ -264,7 +265,7 @@ class TestUndefined:
|
||||
|
||||
def test_undefined_and_special_attributes(self):
|
||||
with pytest.raises(AttributeError):
|
||||
Undefined("Foo").__dict__
|
||||
Undefined("Foo").__dict__ # noqa B018
|
||||
|
||||
def test_undefined_attribute_error(self):
|
||||
# Django's LazyObject turns the __class__ attribute into a
|
||||
|
||||
@ -365,11 +365,10 @@ class TestInheritance:
|
||||
|
||||
class TestBugFix:
|
||||
def test_fixed_macro_scoping_bug(self, env):
|
||||
assert (
|
||||
Environment(
|
||||
loader=DictLoader(
|
||||
{
|
||||
"test.html": """\
|
||||
assert Environment(
|
||||
loader=DictLoader(
|
||||
{
|
||||
"test.html": """\
|
||||
{% extends 'details.html' %}
|
||||
|
||||
{% macro my_macro() %}
|
||||
@ -380,7 +379,7 @@ class TestBugFix:
|
||||
{{ my_macro() }}
|
||||
{% endblock %}
|
||||
""",
|
||||
"details.html": """\
|
||||
"details.html": """\
|
||||
{% extends 'standard.html' %}
|
||||
|
||||
{% macro my_macro() %}
|
||||
@ -396,17 +395,12 @@ class TestBugFix:
|
||||
{% endblock %}
|
||||
{% endblock %}
|
||||
""",
|
||||
"standard.html": """
|
||||
"standard.html": """
|
||||
{% block content %} {% endblock %}
|
||||
""",
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
.get_template("test.html")
|
||||
.render()
|
||||
.split()
|
||||
== ["outer_box", "my_macro"]
|
||||
)
|
||||
).get_template("test.html").render().split() == ["outer_box", "my_macro"]
|
||||
|
||||
def test_double_extends(self, env):
|
||||
"""Ensures that a template with more than 1 {% extends ... %} usage
|
||||
|
||||
@ -599,6 +599,7 @@ class TestBug:
|
||||
|
||||
def test_markup_and_chainable_undefined(self):
|
||||
from markupsafe import Markup
|
||||
|
||||
from jinja2.runtime import ChainableUndefined
|
||||
|
||||
assert str(Markup(ChainableUndefined())) == ""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user