This commit is contained in:
2024-12-04 13:35:57 +05:00
parent d346bf4b2a
commit 73ce681a55
7059 changed files with 1196501 additions and 0 deletions

View File

@ -0,0 +1,64 @@
from __future__ import annotations
import typing as t
from .accept import Accept as Accept
from .accept import CharsetAccept as CharsetAccept
from .accept import LanguageAccept as LanguageAccept
from .accept import MIMEAccept as MIMEAccept
from .auth import Authorization as Authorization
from .auth import WWWAuthenticate as WWWAuthenticate
from .cache_control import RequestCacheControl as RequestCacheControl
from .cache_control import ResponseCacheControl as ResponseCacheControl
from .csp import ContentSecurityPolicy as ContentSecurityPolicy
from .etag import ETags as ETags
from .file_storage import FileMultiDict as FileMultiDict
from .file_storage import FileStorage as FileStorage
from .headers import EnvironHeaders as EnvironHeaders
from .headers import Headers as Headers
from .mixins import ImmutableDictMixin as ImmutableDictMixin
from .mixins import ImmutableHeadersMixin as ImmutableHeadersMixin
from .mixins import ImmutableListMixin as ImmutableListMixin
from .mixins import ImmutableMultiDictMixin as ImmutableMultiDictMixin
from .mixins import UpdateDictMixin as UpdateDictMixin
from .range import ContentRange as ContentRange
from .range import IfRange as IfRange
from .range import Range as Range
from .structures import CallbackDict as CallbackDict
from .structures import CombinedMultiDict as CombinedMultiDict
from .structures import HeaderSet as HeaderSet
from .structures import ImmutableDict as ImmutableDict
from .structures import ImmutableList as ImmutableList
from .structures import ImmutableMultiDict as ImmutableMultiDict
from .structures import ImmutableTypeConversionDict as ImmutableTypeConversionDict
from .structures import iter_multi_items as iter_multi_items
from .structures import MultiDict as MultiDict
from .structures import TypeConversionDict as TypeConversionDict
def __getattr__(name: str) -> t.Any:
import warnings
if name == "OrderedMultiDict":
from .structures import _OrderedMultiDict
warnings.warn(
"'OrderedMultiDict' is deprecated and will be removed in Werkzeug"
" 3.2. Use 'MultiDict' instead.",
DeprecationWarning,
stacklevel=2,
)
return _OrderedMultiDict
if name == "ImmutableOrderedMultiDict":
from .structures import _ImmutableOrderedMultiDict
warnings.warn(
"'OrderedMultiDict' is deprecated and will be removed in Werkzeug"
" 3.2. Use 'ImmutableMultiDict' instead.",
DeprecationWarning,
stacklevel=2,
)
return _ImmutableOrderedMultiDict
raise AttributeError(name)

View File

@ -0,0 +1,350 @@
from __future__ import annotations
import codecs
import collections.abc as cabc
import re
import typing as t
from .structures import ImmutableList
class Accept(ImmutableList[tuple[str, float]]):
"""An :class:`Accept` object is just a list subclass for lists of
``(value, quality)`` tuples. It is automatically sorted by specificity
and quality.
All :class:`Accept` objects work similar to a list but provide extra
functionality for working with the data. Containment checks are
normalized to the rules of that header:
>>> a = CharsetAccept([('ISO-8859-1', 1), ('utf-8', 0.7)])
>>> a.best
'ISO-8859-1'
>>> 'iso-8859-1' in a
True
>>> 'UTF8' in a
True
>>> 'utf7' in a
False
To get the quality for an item you can use normal item lookup:
>>> print a['utf-8']
0.7
>>> a['utf7']
0
.. versionchanged:: 0.5
:class:`Accept` objects are forced immutable now.
.. versionchanged:: 1.0.0
:class:`Accept` internal values are no longer ordered
alphabetically for equal quality tags. Instead the initial
order is preserved.
"""
def __init__(
self, values: Accept | cabc.Iterable[tuple[str, float]] | None = ()
) -> None:
if values is None:
super().__init__()
self.provided = False
elif isinstance(values, Accept):
self.provided = values.provided
super().__init__(values)
else:
self.provided = True
values = sorted(
values, key=lambda x: (self._specificity(x[0]), x[1]), reverse=True
)
super().__init__(values)
def _specificity(self, value: str) -> tuple[bool, ...]:
"""Returns a tuple describing the value's specificity."""
return (value != "*",)
def _value_matches(self, value: str, item: str) -> bool:
"""Check if a value matches a given accept item."""
return item == "*" or item.lower() == value.lower()
@t.overload
def __getitem__(self, key: str) -> float: ...
@t.overload
def __getitem__(self, key: t.SupportsIndex) -> tuple[str, float]: ...
@t.overload
def __getitem__(self, key: slice) -> list[tuple[str, float]]: ...
def __getitem__(
self, key: str | t.SupportsIndex | slice
) -> float | tuple[str, float] | list[tuple[str, float]]:
"""Besides index lookup (getting item n) you can also pass it a string
to get the quality for the item. If the item is not in the list, the
returned quality is ``0``.
"""
if isinstance(key, str):
return self.quality(key)
return list.__getitem__(self, key)
def quality(self, key: str) -> float:
"""Returns the quality of the key.
.. versionadded:: 0.6
In previous versions you had to use the item-lookup syntax
(eg: ``obj[key]`` instead of ``obj.quality(key)``)
"""
for item, quality in self:
if self._value_matches(key, item):
return quality
return 0
def __contains__(self, value: str) -> bool: # type: ignore[override]
for item, _quality in self:
if self._value_matches(value, item):
return True
return False
def __repr__(self) -> str:
pairs_str = ", ".join(f"({x!r}, {y})" for x, y in self)
return f"{type(self).__name__}([{pairs_str}])"
def index(self, key: str | tuple[str, float]) -> int: # type: ignore[override]
"""Get the position of an entry or raise :exc:`ValueError`.
:param key: The key to be looked up.
.. versionchanged:: 0.5
This used to raise :exc:`IndexError`, which was inconsistent
with the list API.
"""
if isinstance(key, str):
for idx, (item, _quality) in enumerate(self):
if self._value_matches(key, item):
return idx
raise ValueError(key)
return list.index(self, key)
def find(self, key: str | tuple[str, float]) -> int:
"""Get the position of an entry or return -1.
:param key: The key to be looked up.
"""
try:
return self.index(key)
except ValueError:
return -1
def values(self) -> cabc.Iterator[str]:
"""Iterate over all values."""
for item in self:
yield item[0]
def to_header(self) -> str:
"""Convert the header set into an HTTP header string."""
result = []
for value, quality in self:
if quality != 1:
value = f"{value};q={quality}"
result.append(value)
return ",".join(result)
def __str__(self) -> str:
return self.to_header()
def _best_single_match(self, match: str) -> tuple[str, float] | None:
for client_item, quality in self:
if self._value_matches(match, client_item):
# self is sorted by specificity descending, we can exit
return client_item, quality
return None
@t.overload
def best_match(self, matches: cabc.Iterable[str]) -> str | None: ...
@t.overload
def best_match(self, matches: cabc.Iterable[str], default: str = ...) -> str: ...
def best_match(
self, matches: cabc.Iterable[str], default: str | None = None
) -> str | None:
"""Returns the best match from a list of possible matches based
on the specificity and quality of the client. If two items have the
same quality and specificity, the one is returned that comes first.
:param matches: a list of matches to check for
:param default: the value that is returned if none match
"""
result = default
best_quality: float = -1
best_specificity: tuple[float, ...] = (-1,)
for server_item in matches:
match = self._best_single_match(server_item)
if not match:
continue
client_item, quality = match
specificity = self._specificity(client_item)
if quality <= 0 or quality < best_quality:
continue
# better quality or same quality but more specific => better match
if quality > best_quality or specificity > best_specificity:
result = server_item
best_quality = quality
best_specificity = specificity
return result
@property
def best(self) -> str | None:
"""The best match as value."""
if self:
return self[0][0]
return None
_mime_split_re = re.compile(r"/|(?:\s*;\s*)")
def _normalize_mime(value: str) -> list[str]:
return _mime_split_re.split(value.lower())
class MIMEAccept(Accept):
"""Like :class:`Accept` but with special methods and behavior for
mimetypes.
"""
def _specificity(self, value: str) -> tuple[bool, ...]:
return tuple(x != "*" for x in _mime_split_re.split(value))
def _value_matches(self, value: str, item: str) -> bool:
# item comes from the client, can't match if it's invalid.
if "/" not in item:
return False
# value comes from the application, tell the developer when it
# doesn't look valid.
if "/" not in value:
raise ValueError(f"invalid mimetype {value!r}")
# Split the match value into type, subtype, and a sorted list of parameters.
normalized_value = _normalize_mime(value)
value_type, value_subtype = normalized_value[:2]
value_params = sorted(normalized_value[2:])
# "*/*" is the only valid value that can start with "*".
if value_type == "*" and value_subtype != "*":
raise ValueError(f"invalid mimetype {value!r}")
# Split the accept item into type, subtype, and parameters.
normalized_item = _normalize_mime(item)
item_type, item_subtype = normalized_item[:2]
item_params = sorted(normalized_item[2:])
# "*/not-*" from the client is invalid, can't match.
if item_type == "*" and item_subtype != "*":
return False
return (
(item_type == "*" and item_subtype == "*")
or (value_type == "*" and value_subtype == "*")
) or (
item_type == value_type
and (
item_subtype == "*"
or value_subtype == "*"
or (item_subtype == value_subtype and item_params == value_params)
)
)
@property
def accept_html(self) -> bool:
"""True if this object accepts HTML."""
return "text/html" in self or self.accept_xhtml # type: ignore[comparison-overlap]
@property
def accept_xhtml(self) -> bool:
"""True if this object accepts XHTML."""
return "application/xhtml+xml" in self or "application/xml" in self # type: ignore[comparison-overlap]
@property
def accept_json(self) -> bool:
"""True if this object accepts JSON."""
return "application/json" in self # type: ignore[comparison-overlap]
_locale_delim_re = re.compile(r"[_-]")
def _normalize_lang(value: str) -> list[str]:
"""Process a language tag for matching."""
return _locale_delim_re.split(value.lower())
class LanguageAccept(Accept):
"""Like :class:`Accept` but with normalization for language tags."""
def _value_matches(self, value: str, item: str) -> bool:
return item == "*" or _normalize_lang(value) == _normalize_lang(item)
@t.overload
def best_match(self, matches: cabc.Iterable[str]) -> str | None: ...
@t.overload
def best_match(self, matches: cabc.Iterable[str], default: str = ...) -> str: ...
def best_match(
self, matches: cabc.Iterable[str], default: str | None = None
) -> str | None:
"""Given a list of supported values, finds the best match from
the list of accepted values.
Language tags are normalized for the purpose of matching, but
are returned unchanged.
If no exact match is found, this will fall back to matching
the first subtag (primary language only), first with the
accepted values then with the match values. This partial is not
applied to any other language subtags.
The default is returned if no exact or fallback match is found.
:param matches: A list of supported languages to find a match.
:param default: The value that is returned if none match.
"""
# Look for an exact match first. If a client accepts "en-US",
# "en-US" is a valid match at this point.
result = super().best_match(matches)
if result is not None:
return result
# Fall back to accepting primary tags. If a client accepts
# "en-US", "en" is a valid match at this point. Need to use
# re.split to account for 2 or 3 letter codes.
fallback = Accept(
[(_locale_delim_re.split(item[0], 1)[0], item[1]) for item in self]
)
result = fallback.best_match(matches)
if result is not None:
return result
# Fall back to matching primary tags. If the client accepts
# "en", "en-US" is a valid match at this point.
fallback_matches = [_locale_delim_re.split(item, 1)[0] for item in matches]
result = super().best_match(fallback_matches)
# Return a value from the original match list. Find the first
# original value that starts with the matched primary tag.
if result is not None:
return next(item for item in matches if item.startswith(result))
return default
class CharsetAccept(Accept):
"""Like :class:`Accept` but with normalization for charsets."""
def _value_matches(self, value: str, item: str) -> bool:
def _normalize(name: str) -> str:
try:
return codecs.lookup(name).name
except LookupError:
return name.lower()
return item == "*" or _normalize(value) == _normalize(item)

View File

@ -0,0 +1,317 @@
from __future__ import annotations
import base64
import binascii
import collections.abc as cabc
import typing as t
from ..http import dump_header
from ..http import parse_dict_header
from ..http import quote_header_value
from .structures import CallbackDict
if t.TYPE_CHECKING:
import typing_extensions as te
class Authorization:
"""Represents the parts of an ``Authorization`` request header.
:attr:`.Request.authorization` returns an instance if the header is set.
An instance can be used with the test :class:`.Client` request methods' ``auth``
parameter to send the header in test requests.
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` will be
set. The ``Basic`` scheme's token is decoded into the ``username`` and ``password``
parameters.
For convenience, ``auth["key"]`` and ``auth.key`` both access the key in the
:attr:`parameters` dict, along with ``auth.get("key")`` and ``"key" in auth``.
.. versionchanged:: 2.3
The ``token`` parameter and attribute was added to support auth schemes that use
a token instead of parameters, such as ``Bearer``.
.. versionchanged:: 2.3
The object is no longer a ``dict``.
.. versionchanged:: 0.5
The object is an immutable dict.
"""
def __init__(
self,
auth_type: str,
data: dict[str, str | None] | None = None,
token: str | None = None,
) -> None:
self.type = auth_type
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``."""
if data is None:
data = {}
self.parameters = data
"""A dict of parameters parsed from the header. Either this or :attr:`token`
will have a value for a given scheme.
"""
self.token = token
"""A token parsed from the header. Either this or :attr:`parameters` will have a
value for a given scheme.
.. versionadded:: 2.3
"""
def __getattr__(self, name: str) -> str | None:
return self.parameters.get(name)
def __getitem__(self, name: str) -> str | None:
return self.parameters.get(name)
def get(self, key: str, default: str | None = None) -> str | None:
return self.parameters.get(key, default)
def __contains__(self, key: str) -> bool:
return key in self.parameters
def __eq__(self, other: object) -> bool:
if not isinstance(other, Authorization):
return NotImplemented
return (
other.type == self.type
and other.token == self.token
and other.parameters == self.parameters
)
@classmethod
def from_header(cls, value: str | None) -> te.Self | None:
"""Parse an ``Authorization`` header value and return an instance, or ``None``
if the value is empty.
:param value: The header value to parse.
.. versionadded:: 2.3
"""
if not value:
return None
scheme, _, rest = value.partition(" ")
scheme = scheme.lower()
rest = rest.strip()
if scheme == "basic":
try:
username, _, password = base64.b64decode(rest).decode().partition(":")
except (binascii.Error, UnicodeError):
return None
return cls(scheme, {"username": username, "password": password})
if "=" in rest.rstrip("="):
# = that is not trailing, this is parameters.
return cls(scheme, parse_dict_header(rest), None)
# No = or only trailing =, this is a token.
return cls(scheme, None, rest)
def to_header(self) -> str:
"""Produce an ``Authorization`` header value representing this data.
.. versionadded:: 2.0
"""
if self.type == "basic":
value = base64.b64encode(
f"{self.username}:{self.password}".encode()
).decode("ascii")
return f"Basic {value}"
if self.token is not None:
return f"{self.type.title()} {self.token}"
return f"{self.type.title()} {dump_header(self.parameters)}"
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.to_header()}>"
class WWWAuthenticate:
"""Represents the parts of a ``WWW-Authenticate`` response header.
Set :attr:`.Response.www_authenticate` to an instance of list of instances to set
values for this header in the response. Modifying this instance will modify the
header value.
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` should be
set. The ``Basic`` scheme will encode ``username`` and ``password`` parameters to a
token.
For convenience, ``auth["key"]`` and ``auth.key`` both act on the :attr:`parameters`
dict, and can be used to get, set, or delete parameters. ``auth.get("key")`` and
``"key" in auth`` are also provided.
.. versionchanged:: 2.3
The ``token`` parameter and attribute was added to support auth schemes that use
a token instead of parameters, such as ``Bearer``.
.. versionchanged:: 2.3
The object is no longer a ``dict``.
.. versionchanged:: 2.3
The ``on_update`` parameter was removed.
"""
def __init__(
self,
auth_type: str,
values: dict[str, str | None] | None = None,
token: str | None = None,
):
self._type = auth_type.lower()
self._parameters: dict[str, str | None] = CallbackDict(
values, lambda _: self._trigger_on_update()
)
self._token = token
self._on_update: cabc.Callable[[WWWAuthenticate], None] | None = None
def _trigger_on_update(self) -> None:
if self._on_update is not None:
self._on_update(self)
@property
def type(self) -> str:
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``."""
return self._type
@type.setter
def type(self, value: str) -> None:
self._type = value
self._trigger_on_update()
@property
def parameters(self) -> dict[str, str | None]:
"""A dict of parameters for the header. Only one of this or :attr:`token` should
have a value for a given scheme.
"""
return self._parameters
@parameters.setter
def parameters(self, value: dict[str, str]) -> None:
self._parameters = CallbackDict(value, lambda _: self._trigger_on_update())
self._trigger_on_update()
@property
def token(self) -> str | None:
"""A dict of parameters for the header. Only one of this or :attr:`token` should
have a value for a given scheme.
"""
return self._token
@token.setter
def token(self, value: str | None) -> None:
"""A token for the header. Only one of this or :attr:`parameters` should have a
value for a given scheme.
.. versionadded:: 2.3
"""
self._token = value
self._trigger_on_update()
def __getitem__(self, key: str) -> str | None:
return self.parameters.get(key)
def __setitem__(self, key: str, value: str | None) -> None:
if value is None:
if key in self.parameters:
del self.parameters[key]
else:
self.parameters[key] = value
self._trigger_on_update()
def __delitem__(self, key: str) -> None:
if key in self.parameters:
del self.parameters[key]
self._trigger_on_update()
def __getattr__(self, name: str) -> str | None:
return self[name]
def __setattr__(self, name: str, value: str | None) -> None:
if name in {"_type", "_parameters", "_token", "_on_update"}:
super().__setattr__(name, value)
else:
self[name] = value
def __delattr__(self, name: str) -> None:
del self[name]
def __contains__(self, key: str) -> bool:
return key in self.parameters
def __eq__(self, other: object) -> bool:
if not isinstance(other, WWWAuthenticate):
return NotImplemented
return (
other.type == self.type
and other.token == self.token
and other.parameters == self.parameters
)
def get(self, key: str, default: str | None = None) -> str | None:
return self.parameters.get(key, default)
@classmethod
def from_header(cls, value: str | None) -> te.Self | None:
"""Parse a ``WWW-Authenticate`` header value and return an instance, or ``None``
if the value is empty.
:param value: The header value to parse.
.. versionadded:: 2.3
"""
if not value:
return None
scheme, _, rest = value.partition(" ")
scheme = scheme.lower()
rest = rest.strip()
if "=" in rest.rstrip("="):
# = that is not trailing, this is parameters.
return cls(scheme, parse_dict_header(rest), None)
# No = or only trailing =, this is a token.
return cls(scheme, None, rest)
def to_header(self) -> str:
"""Produce a ``WWW-Authenticate`` header value representing this data."""
if self.token is not None:
return f"{self.type.title()} {self.token}"
if self.type == "digest":
items = []
for key, value in self.parameters.items():
if key in {"realm", "domain", "nonce", "opaque", "qop"}:
value = quote_header_value(value, allow_token=False)
else:
value = quote_header_value(value)
items.append(f"{key}={value}")
return f"Digest {', '.join(items)}"
return f"{self.type.title()} {dump_header(self.parameters)}"
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.to_header()}>"

View File

@ -0,0 +1,273 @@
from __future__ import annotations
import collections.abc as cabc
import typing as t
from inspect import cleandoc
from .mixins import ImmutableDictMixin
from .structures import CallbackDict
def cache_control_property(
key: str, empty: t.Any, type: type[t.Any] | None, *, doc: str | None = None
) -> t.Any:
"""Return a new property object for a cache header. Useful if you
want to add support for a cache extension in a subclass.
:param key: The attribute name present in the parsed cache-control header dict.
:param empty: The value to use if the key is present without a value.
:param type: The type to convert the string value to instead of a string. If
conversion raises a ``ValueError``, the returned value is ``None``.
:param doc: The docstring for the property. If not given, it is generated
based on the other params.
.. versionchanged:: 3.1
Added the ``doc`` param.
.. versionchanged:: 2.0
Renamed from ``cache_property``.
"""
if doc is None:
parts = [f"The ``{key}`` attribute."]
if type is bool:
parts.append("A ``bool``, either present or not.")
else:
if type is None:
parts.append("A ``str``,")
else:
parts.append(f"A ``{type.__name__}``,")
if empty is not None:
parts.append(f"``{empty!r}`` if present with no value,")
parts.append("or ``None`` if not present.")
doc = " ".join(parts)
return property(
lambda x: x._get_cache_value(key, empty, type),
lambda x, v: x._set_cache_value(key, v, type),
lambda x: x._del_cache_value(key),
doc=cleandoc(doc),
)
class _CacheControl(CallbackDict[str, t.Optional[str]]):
"""Subclass of a dict that stores values for a Cache-Control header. It
has accessors for all the cache-control directives specified in RFC 2616.
The class does not differentiate between request and response directives.
Because the cache-control directives in the HTTP header use dashes the
python descriptors use underscores for that.
To get a header of the :class:`CacheControl` object again you can convert
the object into a string or call the :meth:`to_header` method. If you plan
to subclass it and add your own items have a look at the sourcecode for
that class.
.. versionchanged:: 3.1
Dict values are always ``str | None``. Setting properties will
convert the value to a string. Setting a non-bool property to
``False`` is equivalent to setting it to ``None``. Getting typed
properties will return ``None`` if conversion raises
``ValueError``, rather than the string.
.. versionchanged:: 2.1
Setting int properties such as ``max_age`` will convert the
value to an int.
.. versionchanged:: 0.4
Setting ``no_cache`` or ``private`` to ``True`` will set the
implicit value ``"*"``.
"""
no_store: bool = cache_control_property("no-store", None, bool)
max_age: int | None = cache_control_property("max-age", None, int)
no_transform: bool = cache_control_property("no-transform", None, bool)
stale_if_error: int | None = cache_control_property("stale-if-error", None, int)
def __init__(
self,
values: cabc.Mapping[str, t.Any] | cabc.Iterable[tuple[str, t.Any]] | None = (),
on_update: cabc.Callable[[_CacheControl], None] | None = None,
):
super().__init__(values, on_update)
self.provided = values is not None
def _get_cache_value(
self, key: str, empty: t.Any, type: type[t.Any] | None
) -> t.Any:
"""Used internally by the accessor properties."""
if type is bool:
return key in self
if key not in self:
return None
if (value := self[key]) is None:
return empty
if type is not None:
try:
value = type(value)
except ValueError:
return None
return value
def _set_cache_value(
self, key: str, value: t.Any, type: type[t.Any] | None
) -> None:
"""Used internally by the accessor properties."""
if type is bool:
if value:
self[key] = None
else:
self.pop(key, None)
elif value is None or value is False:
self.pop(key, None)
elif value is True:
self[key] = None
else:
if type is not None:
value = type(value)
self[key] = str(value)
def _del_cache_value(self, key: str) -> None:
"""Used internally by the accessor properties."""
if key in self:
del self[key]
def to_header(self) -> str:
"""Convert the stored values into a cache control header."""
return http.dump_header(self)
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
kv_str = " ".join(f"{k}={v!r}" for k, v in sorted(self.items()))
return f"<{type(self).__name__} {kv_str}>"
cache_property = staticmethod(cache_control_property)
class RequestCacheControl(ImmutableDictMixin[str, t.Optional[str]], _CacheControl): # type: ignore[misc]
"""A cache control for requests. This is immutable and gives access
to all the request-relevant cache control headers.
To get a header of the :class:`RequestCacheControl` object again you can
convert the object into a string or call the :meth:`to_header` method. If
you plan to subclass it and add your own items have a look at the sourcecode
for that class.
.. versionchanged:: 3.1
Dict values are always ``str | None``. Setting properties will
convert the value to a string. Setting a non-bool property to
``False`` is equivalent to setting it to ``None``. Getting typed
properties will return ``None`` if conversion raises
``ValueError``, rather than the string.
.. versionchanged:: 3.1
``max_age`` is ``None`` if present without a value, rather
than ``-1``.
.. versionchanged:: 3.1
``no_cache`` is a boolean, it is ``True`` instead of ``"*"``
when present.
.. versionchanged:: 3.1
``max_stale`` is ``True`` if present without a value, rather
than ``"*"``.
.. versionchanged:: 3.1
``no_transform`` is a boolean. Previously it was mistakenly
always ``None``.
.. versionchanged:: 3.1
``min_fresh`` is ``None`` if present without a value, rather
than ``"*"``.
.. versionchanged:: 2.1
Setting int properties such as ``max_age`` will convert the
value to an int.
.. versionadded:: 0.5
Response-only properties are not present on this request class.
"""
no_cache: bool = cache_control_property("no-cache", None, bool)
max_stale: int | t.Literal[True] | None = cache_control_property(
"max-stale",
True,
int,
)
min_fresh: int | None = cache_control_property("min-fresh", None, int)
only_if_cached: bool = cache_control_property("only-if-cached", None, bool)
class ResponseCacheControl(_CacheControl):
"""A cache control for responses. Unlike :class:`RequestCacheControl`
this is mutable and gives access to response-relevant cache control
headers.
To get a header of the :class:`ResponseCacheControl` object again you can
convert the object into a string or call the :meth:`to_header` method. If
you plan to subclass it and add your own items have a look at the sourcecode
for that class.
.. versionchanged:: 3.1
Dict values are always ``str | None``. Setting properties will
convert the value to a string. Setting a non-bool property to
``False`` is equivalent to setting it to ``None``. Getting typed
properties will return ``None`` if conversion raises
``ValueError``, rather than the string.
.. versionchanged:: 3.1
``no_cache`` is ``True`` if present without a value, rather than
``"*"``.
.. versionchanged:: 3.1
``private`` is ``True`` if present without a value, rather than
``"*"``.
.. versionchanged:: 3.1
``no_transform`` is a boolean. Previously it was mistakenly
always ``None``.
.. versionchanged:: 3.1
Added the ``must_understand``, ``stale_while_revalidate``, and
``stale_if_error`` properties.
.. versionchanged:: 2.1.1
``s_maxage`` converts the value to an int.
.. versionchanged:: 2.1
Setting int properties such as ``max_age`` will convert the
value to an int.
.. versionadded:: 0.5
Request-only properties are not present on this response class.
"""
no_cache: str | t.Literal[True] | None = cache_control_property(
"no-cache", True, None
)
public: bool = cache_control_property("public", None, bool)
private: str | t.Literal[True] | None = cache_control_property(
"private", True, None
)
must_revalidate: bool = cache_control_property("must-revalidate", None, bool)
proxy_revalidate: bool = cache_control_property("proxy-revalidate", None, bool)
s_maxage: int | None = cache_control_property("s-maxage", None, int)
immutable: bool = cache_control_property("immutable", None, bool)
must_understand: bool = cache_control_property("must-understand", None, bool)
stale_while_revalidate: int | None = cache_control_property(
"stale-while-revalidate", None, int
)
# circular dependencies
from .. import http

View File

@ -0,0 +1,100 @@
from __future__ import annotations
import collections.abc as cabc
import typing as t
from .structures import CallbackDict
def csp_property(key: str) -> t.Any:
"""Return a new property object for a content security policy header.
Useful if you want to add support for a csp extension in a
subclass.
"""
return property(
lambda x: x._get_value(key),
lambda x, v: x._set_value(key, v),
lambda x: x._del_value(key),
f"accessor for {key!r}",
)
class ContentSecurityPolicy(CallbackDict[str, str]):
"""Subclass of a dict that stores values for a Content Security Policy
header. It has accessors for all the level 3 policies.
Because the csp directives in the HTTP header use dashes the
python descriptors use underscores for that.
To get a header of the :class:`ContentSecuirtyPolicy` object again
you can convert the object into a string or call the
:meth:`to_header` method. If you plan to subclass it and add your
own items have a look at the sourcecode for that class.
.. versionadded:: 1.0.0
Support for Content Security Policy headers was added.
"""
base_uri: str | None = csp_property("base-uri")
child_src: str | None = csp_property("child-src")
connect_src: str | None = csp_property("connect-src")
default_src: str | None = csp_property("default-src")
font_src: str | None = csp_property("font-src")
form_action: str | None = csp_property("form-action")
frame_ancestors: str | None = csp_property("frame-ancestors")
frame_src: str | None = csp_property("frame-src")
img_src: str | None = csp_property("img-src")
manifest_src: str | None = csp_property("manifest-src")
media_src: str | None = csp_property("media-src")
navigate_to: str | None = csp_property("navigate-to")
object_src: str | None = csp_property("object-src")
prefetch_src: str | None = csp_property("prefetch-src")
plugin_types: str | None = csp_property("plugin-types")
report_to: str | None = csp_property("report-to")
report_uri: str | None = csp_property("report-uri")
sandbox: str | None = csp_property("sandbox")
script_src: str | None = csp_property("script-src")
script_src_attr: str | None = csp_property("script-src-attr")
script_src_elem: str | None = csp_property("script-src-elem")
style_src: str | None = csp_property("style-src")
style_src_attr: str | None = csp_property("style-src-attr")
style_src_elem: str | None = csp_property("style-src-elem")
worker_src: str | None = csp_property("worker-src")
def __init__(
self,
values: cabc.Mapping[str, str] | cabc.Iterable[tuple[str, str]] | None = (),
on_update: cabc.Callable[[ContentSecurityPolicy], None] | None = None,
) -> None:
super().__init__(values, on_update)
self.provided = values is not None
def _get_value(self, key: str) -> str | None:
"""Used internally by the accessor properties."""
return self.get(key)
def _set_value(self, key: str, value: str | None) -> None:
"""Used internally by the accessor properties."""
if value is None:
self.pop(key, None)
else:
self[key] = value
def _del_value(self, key: str) -> None:
"""Used internally by the accessor properties."""
if key in self:
del self[key]
def to_header(self) -> str:
"""Convert the stored values into a cache control header."""
from ..http import dump_csp_header
return dump_csp_header(self)
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
kv_str = " ".join(f"{k}={v!r}" for k, v in sorted(self.items()))
return f"<{type(self).__name__} {kv_str}>"

View File

@ -0,0 +1,106 @@
from __future__ import annotations
import collections.abc as cabc
class ETags(cabc.Collection[str]):
"""A set that can be used to check if one etag is present in a collection
of etags.
"""
def __init__(
self,
strong_etags: cabc.Iterable[str] | None = None,
weak_etags: cabc.Iterable[str] | None = None,
star_tag: bool = False,
):
if not star_tag and strong_etags:
self._strong = frozenset(strong_etags)
else:
self._strong = frozenset()
self._weak = frozenset(weak_etags or ())
self.star_tag = star_tag
def as_set(self, include_weak: bool = False) -> set[str]:
"""Convert the `ETags` object into a python set. Per default all the
weak etags are not part of this set."""
rv = set(self._strong)
if include_weak:
rv.update(self._weak)
return rv
def is_weak(self, etag: str) -> bool:
"""Check if an etag is weak."""
return etag in self._weak
def is_strong(self, etag: str) -> bool:
"""Check if an etag is strong."""
return etag in self._strong
def contains_weak(self, etag: str) -> bool:
"""Check if an etag is part of the set including weak and strong tags."""
return self.is_weak(etag) or self.contains(etag)
def contains(self, etag: str) -> bool:
"""Check if an etag is part of the set ignoring weak tags.
It is also possible to use the ``in`` operator.
"""
if self.star_tag:
return True
return self.is_strong(etag)
def contains_raw(self, etag: str) -> bool:
"""When passed a quoted tag it will check if this tag is part of the
set. If the tag is weak it is checked against weak and strong tags,
otherwise strong only."""
from ..http import unquote_etag
etag, weak = unquote_etag(etag)
if weak:
return self.contains_weak(etag)
return self.contains(etag)
def to_header(self) -> str:
"""Convert the etags set into a HTTP header string."""
if self.star_tag:
return "*"
return ", ".join(
[f'"{x}"' for x in self._strong] + [f'W/"{x}"' for x in self._weak]
)
def __call__(
self,
etag: str | None = None,
data: bytes | None = None,
include_weak: bool = False,
) -> bool:
if etag is None:
if data is None:
raise TypeError("'data' is required when 'etag' is not given.")
from ..http import generate_etag
etag = generate_etag(data)
if include_weak:
if etag in self._weak:
return True
return etag in self._strong
def __bool__(self) -> bool:
return bool(self.star_tag or self._strong or self._weak)
def __str__(self) -> str:
return self.to_header()
def __len__(self) -> int:
return len(self._strong)
def __iter__(self) -> cabc.Iterator[str]:
return iter(self._strong)
def __contains__(self, etag: str) -> bool: # type: ignore[override]
return self.contains(etag)
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"

View File

@ -0,0 +1,209 @@
from __future__ import annotations
import collections.abc as cabc
import mimetypes
import os
import typing as t
from io import BytesIO
from os import fsdecode
from os import fspath
from .._internal import _plain_int
from .headers import Headers
from .structures import MultiDict
class FileStorage:
"""The :class:`FileStorage` class is a thin wrapper over incoming files.
It is used by the request object to represent uploaded files. All the
attributes of the wrapper stream are proxied by the file storage so
it's possible to do ``storage.read()`` instead of the long form
``storage.stream.read()``.
"""
def __init__(
self,
stream: t.IO[bytes] | None = None,
filename: str | None = None,
name: str | None = None,
content_type: str | None = None,
content_length: int | None = None,
headers: Headers | None = None,
):
self.name = name
self.stream = stream or BytesIO()
# If no filename is provided, attempt to get the filename from
# the stream object. Python names special streams like
# ``<stderr>`` with angular brackets, skip these streams.
if filename is None:
filename = getattr(stream, "name", None)
if filename is not None:
filename = fsdecode(filename)
if filename and filename[0] == "<" and filename[-1] == ">":
filename = None
else:
filename = fsdecode(filename)
self.filename = filename
if headers is None:
headers = Headers()
self.headers = headers
if content_type is not None:
headers["Content-Type"] = content_type
if content_length is not None:
headers["Content-Length"] = str(content_length)
def _parse_content_type(self) -> None:
if not hasattr(self, "_parsed_content_type"):
self._parsed_content_type = http.parse_options_header(self.content_type)
@property
def content_type(self) -> str | None:
"""The content-type sent in the header. Usually not available"""
return self.headers.get("content-type")
@property
def content_length(self) -> int:
"""The content-length sent in the header. Usually not available"""
if "content-length" in self.headers:
try:
return _plain_int(self.headers["content-length"])
except ValueError:
pass
return 0
@property
def mimetype(self) -> str:
"""Like :attr:`content_type`, but without parameters (eg, without
charset, type etc.) and always lowercase. For example if the content
type is ``text/HTML; charset=utf-8`` the mimetype would be
``'text/html'``.
.. versionadded:: 0.7
"""
self._parse_content_type()
return self._parsed_content_type[0].lower()
@property
def mimetype_params(self) -> dict[str, str]:
"""The mimetype parameters as dict. For example if the content
type is ``text/html; charset=utf-8`` the params would be
``{'charset': 'utf-8'}``.
.. versionadded:: 0.7
"""
self._parse_content_type()
return self._parsed_content_type[1]
def save(
self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384
) -> None:
"""Save the file to a destination path or file object. If the
destination is a file object you have to close it yourself after the
call. The buffer size is the number of bytes held in memory during
the copy process. It defaults to 16KB.
For secure file saving also have a look at :func:`secure_filename`.
:param dst: a filename, :class:`os.PathLike`, or open file
object to write to.
:param buffer_size: Passed as the ``length`` parameter of
:func:`shutil.copyfileobj`.
.. versionchanged:: 1.0
Supports :mod:`pathlib`.
"""
from shutil import copyfileobj
close_dst = False
if hasattr(dst, "__fspath__"):
dst = fspath(dst)
if isinstance(dst, str):
dst = open(dst, "wb")
close_dst = True
try:
copyfileobj(self.stream, dst, buffer_size)
finally:
if close_dst:
dst.close()
def close(self) -> None:
"""Close the underlying file if possible."""
try:
self.stream.close()
except Exception:
pass
def __bool__(self) -> bool:
return bool(self.filename)
def __getattr__(self, name: str) -> t.Any:
try:
return getattr(self.stream, name)
except AttributeError:
# SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase,
# get the attribute from its backing file instead.
if hasattr(self.stream, "_file"):
return getattr(self.stream._file, name)
raise
def __iter__(self) -> cabc.Iterator[bytes]:
return iter(self.stream)
def __repr__(self) -> str:
return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
class FileMultiDict(MultiDict[str, FileStorage]):
"""A special :class:`MultiDict` that has convenience methods to add
files to it. This is used for :class:`EnvironBuilder` and generally
useful for unittesting.
.. versionadded:: 0.5
"""
def add_file(
self,
name: str,
file: str | os.PathLike[str] | t.IO[bytes] | FileStorage,
filename: str | None = None,
content_type: str | None = None,
) -> None:
"""Adds a new file to the dict. `file` can be a file name or
a :class:`file`-like or a :class:`FileStorage` object.
:param name: the name of the field.
:param file: a filename or :class:`file`-like object
:param filename: an optional filename
:param content_type: an optional content type
"""
if isinstance(file, FileStorage):
self.add(name, file)
return
if isinstance(file, (str, os.PathLike)):
if filename is None:
filename = os.fspath(file)
file_obj: t.IO[bytes] = open(file, "rb")
else:
file_obj = file # type: ignore[assignment]
if filename and content_type is None:
content_type = (
mimetypes.guess_type(filename)[0] or "application/octet-stream"
)
self.add(name, FileStorage(file_obj, filename, name, content_type))
# circular dependencies
from .. import http

View File

@ -0,0 +1,662 @@
from __future__ import annotations
import collections.abc as cabc
import re
import typing as t
from .._internal import _missing
from ..exceptions import BadRequestKeyError
from .mixins import ImmutableHeadersMixin
from .structures import iter_multi_items
from .structures import MultiDict
if t.TYPE_CHECKING:
import typing_extensions as te
from _typeshed.wsgi import WSGIEnvironment
T = t.TypeVar("T")
class Headers:
"""An object that stores some headers. It has a dict-like interface,
but is ordered, can store the same key multiple times, and iterating
yields ``(key, value)`` pairs instead of only keys.
This data structure is useful if you want a nicer way to handle WSGI
headers which are stored as tuples in a list.
From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
and will render a page for a ``400 BAD REQUEST`` if caught in a
catch-all for HTTP exceptions.
Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
class, with the exception of `__getitem__`. :mod:`wsgiref` will return
`None` for ``headers['missing']``, whereas :class:`Headers` will raise
a :class:`KeyError`.
To create a new ``Headers`` object, pass it a list, dict, or
other ``Headers`` object with default values. These values are
validated the same way values added later are.
:param defaults: The list of default values for the :class:`Headers`.
.. versionchanged:: 3.1
Implement ``|`` and ``|=`` operators.
.. versionchanged:: 2.1.0
Default values are validated the same as values added later.
.. versionchanged:: 0.9
This data structure now stores unicode values similar to how the
multi dicts do it. The main difference is that bytes can be set as
well which will automatically be latin1 decoded.
.. versionchanged:: 0.9
The :meth:`linked` function was removed without replacement as it
was an API that does not support the changes to the encoding model.
"""
def __init__(
self,
defaults: (
Headers
| MultiDict[str, t.Any]
| cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
| cabc.Iterable[tuple[str, t.Any]]
| None
) = None,
) -> None:
self._list: list[tuple[str, str]] = []
if defaults is not None:
self.extend(defaults)
@t.overload
def __getitem__(self, key: str) -> str: ...
@t.overload
def __getitem__(self, key: int) -> tuple[str, str]: ...
@t.overload
def __getitem__(self, key: slice) -> te.Self: ...
def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self:
if isinstance(key, str):
return self._get_key(key)
if isinstance(key, int):
return self._list[key]
return self.__class__(self._list[key])
def _get_key(self, key: str) -> str:
ikey = key.lower()
for k, v in self._list:
if k.lower() == ikey:
return v
raise BadRequestKeyError(key)
def __eq__(self, other: object) -> bool:
if other.__class__ is not self.__class__:
return NotImplemented
def lowered(item: tuple[str, ...]) -> tuple[str, ...]:
return item[0].lower(), *item[1:]
return set(map(lowered, other._list)) == set(map(lowered, self._list)) # type: ignore[attr-defined]
__hash__ = None # type: ignore[assignment]
@t.overload
def get(self, key: str) -> str | None: ...
@t.overload
def get(self, key: str, default: str) -> str: ...
@t.overload
def get(self, key: str, default: T) -> str | T: ...
@t.overload
def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ...
@t.overload
def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ...
def get( # type: ignore[misc]
self,
key: str,
default: str | T | None = None,
type: cabc.Callable[[str], T] | None = None,
) -> str | T | None:
"""Return the default value if the requested data doesn't exist.
If `type` is provided and is a callable it should convert the value,
return it or raise a :exc:`ValueError` if that is not possible. In
this case the function will return the default as if the value was not
found:
>>> d = Headers([('Content-Length', '42')])
>>> d.get('Content-Length', type=int)
42
:param key: The key to be looked up.
:param default: The default value to be returned if the key can't
be looked up. If not further specified `None` is
returned.
:param type: A callable that is used to cast the value in the
:class:`Headers`. If a :exc:`ValueError` is raised
by this callable the default value is returned.
.. versionchanged:: 3.0
The ``as_bytes`` parameter was removed.
.. versionchanged:: 0.9
The ``as_bytes`` parameter was added.
"""
try:
rv = self._get_key(key)
except KeyError:
return default
if type is None:
return rv
try:
return type(rv)
except ValueError:
return default
@t.overload
def getlist(self, key: str) -> list[str]: ...
@t.overload
def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ...
def getlist(
self, key: str, type: cabc.Callable[[str], T] | None = None
) -> list[str] | list[T]:
"""Return the list of items for a given key. If that key is not in the
:class:`Headers`, the return value will be an empty list. Just like
:meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
be converted with the callable defined there.
:param key: The key to be looked up.
:param type: A callable that is used to cast the value in the
:class:`Headers`. If a :exc:`ValueError` is raised
by this callable the value will be removed from the list.
:return: a :class:`list` of all the values for the key.
.. versionchanged:: 3.0
The ``as_bytes`` parameter was removed.
.. versionchanged:: 0.9
The ``as_bytes`` parameter was added.
"""
ikey = key.lower()
if type is not None:
result = []
for k, v in self:
if k.lower() == ikey:
try:
result.append(type(v))
except ValueError:
continue
return result
return [v for k, v in self if k.lower() == ikey]
def get_all(self, name: str) -> list[str]:
"""Return a list of all the values for the named field.
This method is compatible with the :mod:`wsgiref`
:meth:`~wsgiref.headers.Headers.get_all` method.
"""
return self.getlist(name)
def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]:
for key, value in self:
if lower:
key = key.lower()
yield key, value
def keys(self, lower: bool = False) -> t.Iterable[str]:
for key, _ in self.items(lower):
yield key
def values(self) -> t.Iterable[str]:
for _, value in self.items():
yield value
def extend(
self,
arg: (
Headers
| MultiDict[str, t.Any]
| cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
| cabc.Iterable[tuple[str, t.Any]]
| None
) = None,
/,
**kwargs: str,
) -> None:
"""Extend headers in this object with items from another object
containing header items as well as keyword arguments.
To replace existing keys instead of extending, use
:meth:`update` instead.
If provided, the first argument can be another :class:`Headers`
object, a :class:`MultiDict`, :class:`dict`, or iterable of
pairs.
.. versionchanged:: 1.0
Support :class:`MultiDict`. Allow passing ``kwargs``.
"""
if arg is not None:
for key, value in iter_multi_items(arg):
self.add(key, value)
for key, value in iter_multi_items(kwargs):
self.add(key, value)
def __delitem__(self, key: str | int | slice) -> None:
if isinstance(key, str):
self._del_key(key)
return
del self._list[key]
def _del_key(self, key: str) -> None:
key = key.lower()
new = []
for k, v in self._list:
if k.lower() != key:
new.append((k, v))
self._list[:] = new
def remove(self, key: str) -> None:
"""Remove a key.
:param key: The key to be removed.
"""
return self._del_key(key)
@t.overload
def pop(self) -> tuple[str, str]: ...
@t.overload
def pop(self, key: str) -> str: ...
@t.overload
def pop(self, key: int | None = ...) -> tuple[str, str]: ...
@t.overload
def pop(self, key: str, default: str) -> str: ...
@t.overload
def pop(self, key: str, default: T) -> str | T: ...
def pop(
self,
key: str | int | None = None,
default: str | T = _missing, # type: ignore[assignment]
) -> str | tuple[str, str] | T:
"""Removes and returns a key or index.
:param key: The key to be popped. If this is an integer the item at
that position is removed, if it's a string the value for
that key is. If the key is omitted or `None` the last
item is removed.
:return: an item.
"""
if key is None:
return self._list.pop()
if isinstance(key, int):
return self._list.pop(key)
try:
rv = self._get_key(key)
except KeyError:
if default is not _missing:
return default
raise
self.remove(key)
return rv
def popitem(self) -> tuple[str, str]:
"""Removes a key or index and returns a (key, value) item."""
return self._list.pop()
def __contains__(self, key: str) -> bool:
"""Check if a key is present."""
try:
self._get_key(key)
except KeyError:
return False
return True
def __iter__(self) -> t.Iterator[tuple[str, str]]:
"""Yield ``(key, value)`` tuples."""
return iter(self._list)
def __len__(self) -> int:
return len(self._list)
def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
"""Add a new header tuple to the list.
Keyword arguments can specify additional parameters for the header
value, with underscores converted to dashes::
>>> d = Headers()
>>> d.add('Content-Type', 'text/plain')
>>> d.add('Content-Disposition', 'attachment', filename='foo.png')
The keyword argument dumping uses :func:`dump_options_header`
behind the scenes.
.. versionchanged:: 0.4.1
keyword arguments were added for :mod:`wsgiref` compatibility.
"""
if kwargs:
value = _options_header_vkw(value, kwargs)
value_str = _str_header_value(value)
self._list.append((key, value_str))
def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
"""Add a new header tuple to the list.
An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
:meth:`~wsgiref.headers.Headers.add_header` method.
"""
self.add(key, value, **kwargs)
def clear(self) -> None:
"""Clears all headers."""
self._list.clear()
def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
"""Remove all header tuples for `key` and add a new one. The newly
added key either appears at the end of the list if there was no
entry or replaces the first one.
Keyword arguments can specify additional parameters for the header
value, with underscores converted to dashes. See :meth:`add` for
more information.
.. versionchanged:: 0.6.1
:meth:`set` now accepts the same arguments as :meth:`add`.
:param key: The key to be inserted.
:param value: The value to be inserted.
"""
if kwargs:
value = _options_header_vkw(value, kwargs)
value_str = _str_header_value(value)
if not self._list:
self._list.append((key, value_str))
return
iter_list = iter(self._list)
ikey = key.lower()
for idx, (old_key, _) in enumerate(iter_list):
if old_key.lower() == ikey:
# replace first occurrence
self._list[idx] = (key, value_str)
break
else:
# no existing occurrences
self._list.append((key, value_str))
return
# remove remaining occurrences
self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey]
def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None:
"""Remove any existing values for a header and add new ones.
:param key: The header key to set.
:param values: An iterable of values to set for the key.
.. versionadded:: 1.0
"""
if values:
values_iter = iter(values)
self.set(key, next(values_iter))
for value in values_iter:
self.add(key, value)
else:
self.remove(key)
def setdefault(self, key: str, default: t.Any) -> str:
"""Return the first value for the key if it is in the headers,
otherwise set the header to the value given by ``default`` and
return that.
:param key: The header key to get.
:param default: The value to set for the key if it is not in the
headers.
"""
try:
return self._get_key(key)
except KeyError:
pass
self.set(key, default)
return self._get_key(key)
def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]:
"""Return the list of values for the key if it is in the
headers, otherwise set the header to the list of values given
by ``default`` and return that.
Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
list will not affect the headers.
:param key: The header key to get.
:param default: An iterable of values to set for the key if it
is not in the headers.
.. versionadded:: 1.0
"""
if key not in self:
self.setlist(key, default)
return self.getlist(key)
@t.overload
def __setitem__(self, key: str, value: t.Any) -> None: ...
@t.overload
def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ...
@t.overload
def __setitem__(
self, key: slice, value: cabc.Iterable[tuple[str, t.Any]]
) -> None: ...
def __setitem__(
self,
key: str | int | slice,
value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]],
) -> None:
"""Like :meth:`set` but also supports index/slice based setting."""
if isinstance(key, str):
self.set(key, value)
elif isinstance(key, int):
self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index]
else:
self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[misc]
def update(
self,
arg: (
Headers
| MultiDict[str, t.Any]
| cabc.Mapping[
str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
]
| cabc.Iterable[tuple[str, t.Any]]
| None
) = None,
/,
**kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any],
) -> None:
"""Replace headers in this object with items from another
headers object and keyword arguments.
To extend existing keys instead of replacing, use :meth:`extend`
instead.
If provided, the first argument can be another :class:`Headers`
object, a :class:`MultiDict`, :class:`dict`, or iterable of
pairs.
.. versionadded:: 1.0
"""
if arg is not None:
if isinstance(arg, (Headers, MultiDict)):
for key in arg.keys():
self.setlist(key, arg.getlist(key))
elif isinstance(arg, cabc.Mapping):
for key, value in arg.items():
if isinstance(value, (list, tuple, set)):
self.setlist(key, value)
else:
self.set(key, value)
else:
for key, value in arg:
self.set(key, value)
for key, value in kwargs.items():
if isinstance(value, (list, tuple, set)):
self.setlist(key, value)
else:
self.set(key, value)
def __or__(
self,
other: cabc.Mapping[
str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
],
) -> te.Self:
if not isinstance(other, cabc.Mapping):
return NotImplemented
rv = self.copy()
rv.update(other)
return rv
def __ior__(
self,
other: (
cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]]
| cabc.Iterable[tuple[str, t.Any]]
),
) -> te.Self:
if not isinstance(other, (cabc.Mapping, cabc.Iterable)):
return NotImplemented
self.update(other)
return self
def to_wsgi_list(self) -> list[tuple[str, str]]:
"""Convert the headers into a list suitable for WSGI.
:return: list
"""
return list(self)
def copy(self) -> te.Self:
return self.__class__(self._list)
def __copy__(self) -> te.Self:
return self.copy()
def __str__(self) -> str:
"""Returns formatted headers suitable for HTTP transmission."""
strs = []
for key, value in self.to_wsgi_list():
strs.append(f"{key}: {value}")
strs.append("\r\n")
return "\r\n".join(strs)
def __repr__(self) -> str:
return f"{type(self).__name__}({list(self)!r})"
def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str:
return http.dump_options_header(
value, {k.replace("_", "-"): v for k, v in kw.items()}
)
_newline_re = re.compile(r"[\r\n]")
def _str_header_value(value: t.Any) -> str:
if not isinstance(value, str):
value = str(value)
if _newline_re.search(value) is not None:
raise ValueError("Header values must not contain newline characters.")
return value # type: ignore[no-any-return]
class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc]
"""Read only version of the headers from a WSGI environment. This
provides the same interface as `Headers` and is constructed from
a WSGI environment.
From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
HTTP exceptions.
"""
def __init__(self, environ: WSGIEnvironment) -> None:
super().__init__()
self.environ = environ
def __eq__(self, other: object) -> bool:
if not isinstance(other, EnvironHeaders):
return NotImplemented
return self.environ is other.environ
__hash__ = None # type: ignore[assignment]
def __getitem__(self, key: str) -> str: # type: ignore[override]
return self._get_key(key)
def _get_key(self, key: str) -> str:
if not isinstance(key, str):
raise BadRequestKeyError(key)
key = key.upper().replace("-", "_")
if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
return self.environ[key] # type: ignore[no-any-return]
return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return]
def __len__(self) -> int:
return sum(1 for _ in self)
def __iter__(self) -> cabc.Iterator[tuple[str, str]]:
for key, value in self.environ.items():
if key.startswith("HTTP_") and key not in {
"HTTP_CONTENT_TYPE",
"HTTP_CONTENT_LENGTH",
}:
yield key[5:].replace("_", "-").title(), value
elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
yield key.replace("_", "-").title(), value
def copy(self) -> t.NoReturn:
raise TypeError(f"cannot create {type(self).__name__!r} copies")
def __or__(self, other: t.Any) -> t.NoReturn:
raise TypeError(f"cannot create {type(self).__name__!r} copies")
# circular dependencies
from .. import http

View File

@ -0,0 +1,317 @@
from __future__ import annotations
import collections.abc as cabc
import typing as t
from functools import update_wrapper
from itertools import repeat
from .._internal import _missing
if t.TYPE_CHECKING:
import typing_extensions as te
K = t.TypeVar("K")
V = t.TypeVar("V")
T = t.TypeVar("T")
F = t.TypeVar("F", bound=cabc.Callable[..., t.Any])
def _immutable_error(self: t.Any) -> t.NoReturn:
raise TypeError(f"{type(self).__name__!r} objects are immutable")
class ImmutableListMixin:
"""Makes a :class:`list` immutable.
.. versionadded:: 0.5
:private:
"""
_hash_cache: int | None = None
def __hash__(self) -> int:
if self._hash_cache is not None:
return self._hash_cache
rv = self._hash_cache = hash(tuple(self)) # type: ignore[arg-type]
return rv
def __reduce_ex__(self, protocol: t.SupportsIndex) -> t.Any:
return type(self), (list(self),) # type: ignore[call-overload]
def __delitem__(self, key: t.Any) -> t.NoReturn:
_immutable_error(self)
def __iadd__(self, other: t.Any) -> t.NoReturn:
_immutable_error(self)
def __imul__(self, other: t.Any) -> t.NoReturn:
_immutable_error(self)
def __setitem__(self, key: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def append(self, item: t.Any) -> t.NoReturn:
_immutable_error(self)
def remove(self, item: t.Any) -> t.NoReturn:
_immutable_error(self)
def extend(self, iterable: t.Any) -> t.NoReturn:
_immutable_error(self)
def insert(self, pos: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def pop(self, index: t.Any = -1) -> t.NoReturn:
_immutable_error(self)
def reverse(self: t.Any) -> t.NoReturn:
_immutable_error(self)
def sort(self, key: t.Any = None, reverse: t.Any = False) -> t.NoReturn:
_immutable_error(self)
class ImmutableDictMixin(t.Generic[K, V]):
"""Makes a :class:`dict` immutable.
.. versionchanged:: 3.1
Disallow ``|=`` operator.
.. versionadded:: 0.5
:private:
"""
_hash_cache: int | None = None
@classmethod
@t.overload
def fromkeys(
cls, keys: cabc.Iterable[K], value: None
) -> ImmutableDictMixin[K, t.Any | None]: ...
@classmethod
@t.overload
def fromkeys(cls, keys: cabc.Iterable[K], value: V) -> ImmutableDictMixin[K, V]: ...
@classmethod
def fromkeys(
cls, keys: cabc.Iterable[K], value: V | None = None
) -> ImmutableDictMixin[K, t.Any | None] | ImmutableDictMixin[K, V]:
instance = super().__new__(cls)
instance.__init__(zip(keys, repeat(value))) # type: ignore[misc]
return instance
def __reduce_ex__(self, protocol: t.SupportsIndex) -> t.Any:
return type(self), (dict(self),) # type: ignore[call-overload]
def _iter_hashitems(self) -> t.Iterable[t.Any]:
return self.items() # type: ignore[attr-defined,no-any-return]
def __hash__(self) -> int:
if self._hash_cache is not None:
return self._hash_cache
rv = self._hash_cache = hash(frozenset(self._iter_hashitems()))
return rv
def setdefault(self, key: t.Any, default: t.Any = None) -> t.NoReturn:
_immutable_error(self)
def update(self, arg: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def __ior__(self, other: t.Any) -> t.NoReturn:
_immutable_error(self)
def pop(self, key: t.Any, default: t.Any = None) -> t.NoReturn:
_immutable_error(self)
def popitem(self) -> t.NoReturn:
_immutable_error(self)
def __setitem__(self, key: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def __delitem__(self, key: t.Any) -> t.NoReturn:
_immutable_error(self)
def clear(self) -> t.NoReturn:
_immutable_error(self)
class ImmutableMultiDictMixin(ImmutableDictMixin[K, V]):
"""Makes a :class:`MultiDict` immutable.
.. versionadded:: 0.5
:private:
"""
def __reduce_ex__(self, protocol: t.SupportsIndex) -> t.Any:
return type(self), (list(self.items(multi=True)),) # type: ignore[attr-defined]
def _iter_hashitems(self) -> t.Iterable[t.Any]:
return self.items(multi=True) # type: ignore[attr-defined,no-any-return]
def add(self, key: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def popitemlist(self) -> t.NoReturn:
_immutable_error(self)
def poplist(self, key: t.Any) -> t.NoReturn:
_immutable_error(self)
def setlist(self, key: t.Any, new_list: t.Any) -> t.NoReturn:
_immutable_error(self)
def setlistdefault(self, key: t.Any, default_list: t.Any = None) -> t.NoReturn:
_immutable_error(self)
class ImmutableHeadersMixin:
"""Makes a :class:`Headers` immutable. We do not mark them as
hashable though since the only usecase for this datastructure
in Werkzeug is a view on a mutable structure.
.. versionchanged:: 3.1
Disallow ``|=`` operator.
.. versionadded:: 0.5
:private:
"""
def __delitem__(self, key: t.Any, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def __setitem__(self, key: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def set(self, key: t.Any, value: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def setlist(self, key: t.Any, values: t.Any) -> t.NoReturn:
_immutable_error(self)
def add(self, key: t.Any, value: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def add_header(self, key: t.Any, value: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def remove(self, key: t.Any) -> t.NoReturn:
_immutable_error(self)
def extend(self, arg: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def update(self, arg: t.Any, /, **kwargs: t.Any) -> t.NoReturn:
_immutable_error(self)
def __ior__(self, other: t.Any) -> t.NoReturn:
_immutable_error(self)
def insert(self, pos: t.Any, value: t.Any) -> t.NoReturn:
_immutable_error(self)
def pop(self, key: t.Any = None, default: t.Any = _missing) -> t.NoReturn:
_immutable_error(self)
def popitem(self) -> t.NoReturn:
_immutable_error(self)
def setdefault(self, key: t.Any, default: t.Any) -> t.NoReturn:
_immutable_error(self)
def setlistdefault(self, key: t.Any, default: t.Any) -> t.NoReturn:
_immutable_error(self)
def _always_update(f: F) -> F:
def wrapper(
self: UpdateDictMixin[t.Any, t.Any], /, *args: t.Any, **kwargs: t.Any
) -> t.Any:
rv = f(self, *args, **kwargs)
if self.on_update is not None:
self.on_update(self)
return rv
return update_wrapper(wrapper, f) # type: ignore[return-value]
class UpdateDictMixin(dict[K, V]):
"""Makes dicts call `self.on_update` on modifications.
.. versionchanged:: 3.1
Implement ``|=`` operator.
.. versionadded:: 0.5
:private:
"""
on_update: cabc.Callable[[te.Self], None] | None = None
def setdefault(self: te.Self, key: K, default: V | None = None) -> V:
modified = key not in self
rv = super().setdefault(key, default) # type: ignore[arg-type]
if modified and self.on_update is not None:
self.on_update(self)
return rv
@t.overload
def pop(self: te.Self, key: K) -> V: ...
@t.overload
def pop(self: te.Self, key: K, default: V) -> V: ...
@t.overload
def pop(self: te.Self, key: K, default: T) -> T: ...
def pop(
self: te.Self,
key: K,
default: V | T = _missing, # type: ignore[assignment]
) -> V | T:
modified = key in self
if default is _missing:
rv = super().pop(key)
else:
rv = super().pop(key, default) # type: ignore[arg-type]
if modified and self.on_update is not None:
self.on_update(self)
return rv
@_always_update
def __setitem__(self, key: K, value: V) -> None:
super().__setitem__(key, value)
@_always_update
def __delitem__(self, key: K) -> None:
super().__delitem__(key)
@_always_update
def clear(self) -> None:
super().clear()
@_always_update
def popitem(self) -> tuple[K, V]:
return super().popitem()
@_always_update
def update( # type: ignore[override]
self,
arg: cabc.Mapping[K, V] | cabc.Iterable[tuple[K, V]] | None = None,
/,
**kwargs: V,
) -> None:
if arg is None:
super().update(**kwargs)
else:
super().update(arg, **kwargs)
@_always_update
def __ior__( # type: ignore[override]
self, other: cabc.Mapping[K, V] | cabc.Iterable[tuple[K, V]]
) -> te.Self:
return super().__ior__(other)

View File

@ -0,0 +1,214 @@
from __future__ import annotations
import collections.abc as cabc
import typing as t
from datetime import datetime
if t.TYPE_CHECKING:
import typing_extensions as te
T = t.TypeVar("T")
class IfRange:
"""Very simple object that represents the `If-Range` header in parsed
form. It will either have neither a etag or date or one of either but
never both.
.. versionadded:: 0.7
"""
def __init__(self, etag: str | None = None, date: datetime | None = None):
#: The etag parsed and unquoted. Ranges always operate on strong
#: etags so the weakness information is not necessary.
self.etag = etag
#: The date in parsed format or `None`.
self.date = date
def to_header(self) -> str:
"""Converts the object back into an HTTP header."""
if self.date is not None:
return http.http_date(self.date)
if self.etag is not None:
return http.quote_etag(self.etag)
return ""
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
class Range:
"""Represents a ``Range`` header. All methods only support only
bytes as the unit. Stores a list of ranges if given, but the methods
only work if only one range is provided.
:raise ValueError: If the ranges provided are invalid.
.. versionchanged:: 0.15
The ranges passed in are validated.
.. versionadded:: 0.7
"""
def __init__(
self, units: str, ranges: cabc.Sequence[tuple[int, int | None]]
) -> None:
#: The units of this range. Usually "bytes".
self.units = units
#: A list of ``(begin, end)`` tuples for the range header provided.
#: The ranges are non-inclusive.
self.ranges = ranges
for start, end in ranges:
if start is None or (end is not None and (start < 0 or start >= end)):
raise ValueError(f"{(start, end)} is not a valid range.")
def range_for_length(self, length: int | None) -> tuple[int, int] | None:
"""If the range is for bytes, the length is not None and there is
exactly one range and it is satisfiable it returns a ``(start, stop)``
tuple, otherwise `None`.
"""
if self.units != "bytes" or length is None or len(self.ranges) != 1:
return None
start, end = self.ranges[0]
if end is None:
end = length
if start < 0:
start += length
if http.is_byte_range_valid(start, end, length):
return start, min(end, length)
return None
def make_content_range(self, length: int | None) -> ContentRange | None:
"""Creates a :class:`~werkzeug.datastructures.ContentRange` object
from the current range and given content length.
"""
rng = self.range_for_length(length)
if rng is not None:
return ContentRange(self.units, rng[0], rng[1], length)
return None
def to_header(self) -> str:
"""Converts the object back into an HTTP header."""
ranges = []
for begin, end in self.ranges:
if end is None:
ranges.append(f"{begin}-" if begin >= 0 else str(begin))
else:
ranges.append(f"{begin}-{end - 1}")
return f"{self.units}={','.join(ranges)}"
def to_content_range_header(self, length: int | None) -> str | None:
"""Converts the object into `Content-Range` HTTP header,
based on given length
"""
range = self.range_for_length(length)
if range is not None:
return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
return None
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
class _CallbackProperty(t.Generic[T]):
def __set_name__(self, owner: type[ContentRange], name: str) -> None:
self.attr = f"_{name}"
@t.overload
def __get__(self, instance: None, owner: None) -> te.Self: ...
@t.overload
def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ...
def __get__(
self, instance: ContentRange | None, owner: type[ContentRange] | None
) -> te.Self | T:
if instance is None:
return self
return instance.__dict__[self.attr] # type: ignore[no-any-return]
def __set__(self, instance: ContentRange, value: T) -> None:
instance.__dict__[self.attr] = value
if instance.on_update is not None:
instance.on_update(instance)
class ContentRange:
"""Represents the content range header.
.. versionadded:: 0.7
"""
def __init__(
self,
units: str | None,
start: int | None,
stop: int | None,
length: int | None = None,
on_update: cabc.Callable[[ContentRange], None] | None = None,
) -> None:
self.on_update = on_update
self.set(start, stop, length, units)
#: The units to use, usually "bytes"
units: str | None = _CallbackProperty() # type: ignore[assignment]
#: The start point of the range or `None`.
start: int | None = _CallbackProperty() # type: ignore[assignment]
#: The stop point of the range (non-inclusive) or `None`. Can only be
#: `None` if also start is `None`.
stop: int | None = _CallbackProperty() # type: ignore[assignment]
#: The length of the range or `None`.
length: int | None = _CallbackProperty() # type: ignore[assignment]
def set(
self,
start: int | None,
stop: int | None,
length: int | None = None,
units: str | None = "bytes",
) -> None:
"""Simple method to update the ranges."""
assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
self._units: str | None = units
self._start: int | None = start
self._stop: int | None = stop
self._length: int | None = length
if self.on_update is not None:
self.on_update(self)
def unset(self) -> None:
"""Sets the units to `None` which indicates that the header should
no longer be used.
"""
self.set(None, None, units=None)
def to_header(self) -> str:
if self._units is None:
return ""
if self._length is None:
length: str | int = "*"
else:
length = self._length
if self._start is None:
return f"{self._units} */{length}"
return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator]
def __bool__(self) -> bool:
return self._units is not None
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
# circular dependencies
from .. import http

File diff suppressed because it is too large Load Diff