Compare commits
No commits in common. "master" and "9dbe472bbf306352b9cd74d6fdc96b1956df8f69" have entirely different histories.
master
...
9dbe472bbf
44
README.md
44
README.md
@ -1,45 +1,3 @@
|
||||
# HTTP forms defined in YAML
|
||||
|
||||
This module allows creating simple interfaces to forms/payloads for use in HTTP POST requests by defining them in highly readable and easily maintainable YAML files.
|
||||
|
||||
## Form definition
|
||||
|
||||
A form is defined by its fields.
|
||||
|
||||
A field is defined by its _name_, which is the parameter name in the payload sent during a `POST` request, and which typically corresponds to the `name` attribute of a `<select>` or `<input>` HTML tag.
|
||||
|
||||
Optionally, a field can have an _alias_ (for internal use), a _default_ value, value _options_ (as `<select>` tags do), and may be declared _required_.
|
||||
|
||||
A form definition in YAML format will consist of the field names as top-level keys, and either nothing/`null` or the corresponding fields' definitions as key-value-pairs below them.
|
||||
|
||||
## Example
|
||||
|
||||
### Definition
|
||||
```yaml
|
||||
# definition.yaml
|
||||
|
||||
way_too_long_field_name:
|
||||
alias: short_name
|
||||
foo:
|
||||
choice_field:
|
||||
options:
|
||||
value1: text for option 1
|
||||
value2: text for option 2
|
||||
default: value1
|
||||
mandatory_field:
|
||||
alias: special
|
||||
required: true
|
||||
```
|
||||
|
||||
### Usage
|
||||
```
|
||||
>>> from yamlhttpforms import load_form
|
||||
>>> form_interface = load_form('definition.yaml')
|
||||
>>> form_interface.get_payload(short_name='abc', foo='bar', special='420')
|
||||
{'way_too_long_field_name': 'abc', 'foo': 'bar', 'choice_field': 'value1', 'mandatory_field': '420'}
|
||||
|
||||
>>> form_interface.get_payload(short_name='abc', choice_field='baz', special='420')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: "baz" is not a valid option for <SelectField: name="choice_field", default="value1", options={'value1': 'text for option 1', 'value2': 'text for option 2'}>
|
||||
```
|
||||
...
|
||||
|
@ -1,3 +1,2 @@
|
||||
-r common.txt
|
||||
aiohttp
|
||||
git+https://git.fajnberg.de/daniil/webutils-df.git
|
@ -1,2 +1,3 @@
|
||||
-r full.txt
|
||||
-r aio.txt
|
||||
-r req.txt
|
||||
coverage
|
@ -1,3 +0,0 @@
|
||||
-r aio.txt
|
||||
-r req.txt
|
||||
-r html.txt
|
@ -1,2 +0,0 @@
|
||||
-r common.txt
|
||||
beautifulsoup4
|
@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = yamlhttpforms
|
||||
version = 0.1.2
|
||||
version = 0.0.1
|
||||
author = Daniil F.
|
||||
author_email = mail@placeholder123.to
|
||||
description = HTTP forms defined in YAML
|
||||
@ -26,9 +26,6 @@ req =
|
||||
requests
|
||||
aio =
|
||||
aiohttp
|
||||
webutils-df @ git+https://git.fajnberg.de/daniil/webutils-df.git
|
||||
html =
|
||||
beautifulsoup4
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
|
@ -1 +0,0 @@
|
||||
from yamlhttpforms.form import Form, yaml_overload, load_form
|
@ -1,68 +1,41 @@
|
||||
from importlib import import_module
|
||||
from typing import Dict, Callable, Union, Optional, Any, TYPE_CHECKING
|
||||
from types import SimpleNamespace
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from aiohttp import ClientSession as AioSession
|
||||
from requests import Session as ReqSession
|
||||
from bs4.element import Tag as BS4Tag
|
||||
|
||||
from .utils import PathT, yaml_overload
|
||||
|
||||
|
||||
CallableDefaultT = Callable[[], Optional[str]]
|
||||
DefaultInitT = Union[str, Dict[str, str]]
|
||||
OptionsT = Dict[str, str]
|
||||
from yaml import safe_load
|
||||
|
||||
|
||||
class FormField:
|
||||
def __init__(self, name: str, default: DefaultInitT = None, options: OptionsT = None, required: bool = False):
|
||||
self.name: str = name
|
||||
self.default = default
|
||||
self.options: Optional[OptionsT] = None
|
||||
if options is not None:
|
||||
self.options = {str(k): str(v) for k, v in options.items()}
|
||||
self.required: bool = required
|
||||
def __init__(self, name: str, default: str = None, options: Dict[str, str] = None, hidden: bool = False):
|
||||
self.name = name
|
||||
self._default = default
|
||||
self.options = options
|
||||
self.hidden = hidden
|
||||
|
||||
def __repr__(self) -> str:
|
||||
s = '<InputField' if self.options is None else '<SelectField'
|
||||
s += f': name="{self.name}"'
|
||||
if self.hidden:
|
||||
s += ', hidden=True'
|
||||
if self._default is not None:
|
||||
s += f', default="{self.default}"'
|
||||
if self.required:
|
||||
s += ', required=True'
|
||||
if self.options is not None:
|
||||
s += f', options={self.options}'
|
||||
return s + '>'
|
||||
|
||||
def __str__(self) -> str:
|
||||
return repr(self)
|
||||
|
||||
@property
|
||||
def default(self) -> Optional[str]:
|
||||
def default(self) -> str:
|
||||
return self._default() if callable(self._default) else self._default
|
||||
|
||||
@default.setter
|
||||
def default(self, default: DefaultInitT) -> None:
|
||||
if isinstance(default, dict):
|
||||
try:
|
||||
module, function = default['module'], default['function']
|
||||
except KeyError:
|
||||
raise TypeError(f"Default for field '{self.name}' is invalid. The default must be either a string or "
|
||||
f"`None` or a dictionary with the special keys 'module' and 'function'.")
|
||||
obj = import_module(module)
|
||||
for attr in function.split('.'):
|
||||
obj = getattr(obj, attr)
|
||||
self._default = obj
|
||||
elif default is None:
|
||||
self._default = None
|
||||
else:
|
||||
self._default = str(default)
|
||||
@property
|
||||
def special(self) -> bool:
|
||||
return self.hidden and self._default is None
|
||||
|
||||
def valid_option(self, option: str) -> bool:
|
||||
return self.options is None or option in self.options.keys() or option in self.options.values()
|
||||
|
||||
def clean(self, value: Any) -> str:
|
||||
value = str(value)
|
||||
def clean_value(self, value: str) -> str:
|
||||
if self.options is None or value in self.options.keys():
|
||||
return value
|
||||
# Try to find an unambiguous match in the visible option texts:
|
||||
@ -76,45 +49,11 @@ class FormField:
|
||||
return key
|
||||
raise ValueError(f'"{value}" is not a valid option for {self}')
|
||||
|
||||
def check_with_html(self, field_tag: 'BS4Tag', check_defaults: bool = True) -> None:
|
||||
from .html import check_field_interface
|
||||
check_field_interface(field_tag, self, check_defaults=check_defaults)
|
||||
|
||||
def get_value_from_html_form(self, form_tag: 'BS4Tag') -> Optional[str]:
|
||||
from .html import get_field_value
|
||||
try:
|
||||
return get_field_value(form_tag, self.name)
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
|
||||
class Form:
|
||||
|
||||
@staticmethod
|
||||
def fields_from_dict(definition: Dict[str, Optional[dict]]) -> Dict[str, FormField]:
|
||||
class Form(SimpleNamespace):
|
||||
def __init__(self, definition: Dict[str, Dict], full_payload: bool = True):
|
||||
"""
|
||||
Takes a dictionary defining form fields and creates `FormFields` objects from it.
|
||||
Every key in `definition` is interpreted as the field's name.
|
||||
The corresponding value can be `None` or a dictionary that can be unpacked into the FormField constructor call
|
||||
alongside the name.
|
||||
The special key `alias` in a field's dictionary is also allowed. If it is present, the corresponding value
|
||||
will be used as the key in the output dictionary; otherwise the field's name is used as the key.
|
||||
The constructed `FormField` objects are the values in the output dictionary.
|
||||
"""
|
||||
field_dict = {}
|
||||
for name, field_def in definition.items():
|
||||
if field_def is None:
|
||||
field_def = {}
|
||||
if isinstance(field_def, dict):
|
||||
alias = field_def.pop('alias', name)
|
||||
field_dict[alias] = FormField(name, **field_def)
|
||||
else:
|
||||
raise TypeError("Field definitions must be either dictionaries or `None`")
|
||||
return field_dict
|
||||
|
||||
def __init__(self, definition: Dict[str, Dict], full_payload: bool = True, url: str = None):
|
||||
"""
|
||||
Creates a form instance from a definition dictionary. Each element in the dictionary must define a field.
|
||||
...
|
||||
|
||||
Args:
|
||||
definition:
|
||||
@ -124,23 +63,22 @@ class Form:
|
||||
If True (default), the fields' predefined default values will be inserted into the payload,
|
||||
wherever no value (or `None`) is explicitly passed for that field.
|
||||
If False, fields for which no values were explicitly passed, will not be included in the payload.
|
||||
The exception is required fields; these will cause an error, if no value is provided.
|
||||
url (optional):
|
||||
Can be set in advance to the url that requests using this form's payload should be made to.
|
||||
The exception is special fields (i.e. hidden fields with no default value);
|
||||
these will cause an error, if no value is provided.
|
||||
"""
|
||||
self.fields: Dict[str, FormField] = self.fields_from_dict(definition)
|
||||
self.full_payload_always: bool = full_payload
|
||||
self.url: Optional[str] = url
|
||||
|
||||
def __repr__(self) -> str:
|
||||
fields = ', '.join(f"'{alias}': {field}" for alias, field in self.fields.items())
|
||||
return f'Form({fields})'
|
||||
self._full_payload_always = full_payload
|
||||
for alias, field_def in definition.items():
|
||||
setattr(self, alias, FormField(**field_def))
|
||||
|
||||
@property
|
||||
def required_fields(self) -> Dict[str, FormField]:
|
||||
return {alias: field for alias, field in self.fields.items() if field.required}
|
||||
def fields(self) -> Dict[str, FormField]:
|
||||
return {k: v for k, v in self.__dict__.items() if isinstance(v, FormField)}
|
||||
|
||||
def get_payload(self, **kwargs: str) -> Dict[str, str]:
|
||||
@property
|
||||
def special_fields(self) -> Dict[str, FormField]:
|
||||
return {k: v for k, v in self.__dict__.items() if isinstance(v, FormField) if v.special}
|
||||
|
||||
def get_payload(self, **kwargs) -> Dict[str, str]:
|
||||
"""
|
||||
Creates a request payload from the form's fields as a dictionary,
|
||||
where the keys represent the name attributes of the form fields and the values the actual values to be passed.
|
||||
@ -150,10 +88,7 @@ class Form:
|
||||
|
||||
Args:
|
||||
kwargs (optional):
|
||||
Every key must correspond to an alias or name of a field in the internal dictionary of fields;
|
||||
otherwise that key-value-pair is ignored.
|
||||
If both a field's alias and name are different and both are present as keys in `kwargs`,
|
||||
the alias-key takes precedence.
|
||||
Every key must correspond to a key in the internal fields dictionary.
|
||||
Values will be passed into the payload (if they pass validation).
|
||||
Select fields with predefined options will only allow one of the options to be passed.
|
||||
If `None` is passed as a value, the corresponding field's default value will be used in the payload.
|
||||
@ -162,91 +97,23 @@ class Form:
|
||||
Validated name-value-mapping to be used for HTTP requests from the form's fields.
|
||||
"""
|
||||
payload = {}
|
||||
for alias, field in self.fields.items():
|
||||
if alias in kwargs.keys():
|
||||
value = kwargs[alias]
|
||||
payload[field.name] = field.default if value is None else field.clean(value)
|
||||
elif alias != field.name and field.name in kwargs.keys():
|
||||
value = kwargs[field.name]
|
||||
payload[field.name] = field.default if value is None else field.clean(value)
|
||||
elif field.required:
|
||||
raise ValueError(f"`{alias}` is a required field, but no argument was passed.")
|
||||
elif self.full_payload_always:
|
||||
for key, field in self.fields.items():
|
||||
if key in kwargs.keys():
|
||||
value = kwargs[key]
|
||||
if value is None:
|
||||
payload[field.name] = field.default
|
||||
else:
|
||||
payload[field.name] = field.clean_value(value)
|
||||
elif field.special:
|
||||
raise ValueError(f"`{key}` is a special field (hidden, no-defeault), but no argument was passed.")
|
||||
elif self._full_payload_always:
|
||||
payload[field.name] = field.default
|
||||
return payload
|
||||
|
||||
async def post_aio(self, _aiohttp_session_obj: 'AioSession' = None, **kwargs: str) -> str:
|
||||
"""
|
||||
Uses `aiohttp` to perform a POST request to `.url` with the form's payload generated using `kwargs`.
|
||||
|
||||
Args:
|
||||
_aiohttp_session_obj (optional):
|
||||
Can be set to a pre-existing `aiohttp.ClientSession` instance that should be used for the request.
|
||||
kwargs:
|
||||
Passed directly into `.get_payload`.
|
||||
|
||||
Returns:
|
||||
The response text from the request.
|
||||
"""
|
||||
if self.url is None:
|
||||
raise AttributeError("`url` attribute not set")
|
||||
from aiohttp import ClientSession
|
||||
from webutils_df import in_async_session
|
||||
|
||||
@in_async_session
|
||||
async def post(url: str, data: dict, session: ClientSession = None) -> str:
|
||||
async with session.post(url, data=data) as response:
|
||||
return await response.text()
|
||||
return await post(self.url, self.get_payload(**kwargs), session=_aiohttp_session_obj)
|
||||
|
||||
def post_req(self, _requests_session_obj: 'ReqSession' = None, **kwargs: str) -> str:
|
||||
"""
|
||||
Uses `requests` to perform a POST request to `.url` with the form's payload generated using `kwargs`.
|
||||
|
||||
Args:
|
||||
_requests_session_obj (optional):
|
||||
Can be set to a pre-existing `requests.Session` instance that should be used for the request.
|
||||
kwargs:
|
||||
Passed directly into `.get_payload`.
|
||||
|
||||
Returns:
|
||||
The response text from the request.
|
||||
"""
|
||||
if self.url is None:
|
||||
raise AttributeError("`url` attribute not set")
|
||||
if _requests_session_obj is not None:
|
||||
return _requests_session_obj.post(self.url, data=self.get_payload(**kwargs)).text
|
||||
from requests import post
|
||||
return post(self.url, data=self.get_payload(**kwargs)).text
|
||||
|
||||
def check_with_html(self, form_tag: 'BS4Tag', check_defaults: bool = True) -> None:
|
||||
from .html import check_form_interface
|
||||
check_form_interface(form_tag, self, check_defaults=check_defaults)
|
||||
|
||||
def get_values_from_html_form(self, form_tag: 'BS4Tag', required_fields_only: bool = True) -> Dict[str, str]:
|
||||
from .html import get_field_values
|
||||
return get_field_values(form_tag, self, required_fields_only=required_fields_only)
|
||||
|
||||
|
||||
def load_form(*def_paths: PathT, dir_sort_key: Callable[[PathT], Any] = None, dir_sort_reverse: bool = False,
|
||||
full_payload: bool = True, url: str = None) -> Form:
|
||||
"""
|
||||
Creates a form instance from an arbitrary number of definition files in YAML format.
|
||||
|
||||
Args:
|
||||
def_paths:
|
||||
Paths to the YAML files containing the form definitions; see the `paths` parameter in `yaml_overload`
|
||||
dir_sort_key (optional):
|
||||
See `yaml_overload`
|
||||
dir_sort_reverse (optional):
|
||||
See `yaml_overload`
|
||||
full_payload (optional):
|
||||
See the `Form` constructor
|
||||
url (optional):
|
||||
See the `Form` constructor
|
||||
"""
|
||||
return Form(
|
||||
definition=yaml_overload(*def_paths, dir_sort_key=dir_sort_key, dir_sort_reverse=dir_sort_reverse),
|
||||
full_payload=full_payload,
|
||||
url=url
|
||||
)
|
||||
def load_form(*def_paths: Path, full_payload: bool = True) -> Form:
|
||||
definition = {}
|
||||
for path in def_paths:
|
||||
with open(path, 'r') as f:
|
||||
definition.update(safe_load(f))
|
||||
return Form(definition, full_payload=full_payload)
|
||||
|
@ -1,174 +0,0 @@
|
||||
import sys
|
||||
from typing import Dict, TYPE_CHECKING
|
||||
|
||||
from bs4.element import Tag
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .form import FormField, Form
|
||||
|
||||
|
||||
INPUT, SELECT, OPTION = 'input', 'select', 'option'
|
||||
NAME, VALUE, SELECTED = 'name', 'value', 'selected'
|
||||
|
||||
NON_PRINTABLE_TO_NONE = {code: None for code in range(sys.maxunicode + 1) if not chr(code).isprintable()}
|
||||
|
||||
|
||||
class WrongInterface(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FieldInterfaceWrong(WrongInterface):
|
||||
pass
|
||||
|
||||
|
||||
class SelectOptionsWrong(FieldInterfaceWrong):
|
||||
pass
|
||||
|
||||
|
||||
class IncompleteForm(WrongInterface):
|
||||
pass
|
||||
|
||||
|
||||
class UnknownField(WrongInterface):
|
||||
pass
|
||||
|
||||
|
||||
def printable_only(string: str) -> str:
|
||||
return string.translate(NON_PRINTABLE_TO_NONE)
|
||||
|
||||
|
||||
def check_select_field_options(field_tag: Tag, field_interface: 'FormField', check_defaults: bool = True) -> None:
|
||||
"""
|
||||
Compares the `options` and `default` attributes of a `'FormField'` object with the options of its HTML counterpart.
|
||||
|
||||
Args:
|
||||
field_tag: The `bs4.Tag` object representing the <select> HTML tag
|
||||
field_interface: The `'FormField'` to check against the HTML tag
|
||||
check_defaults (optional): If set to `False`, pre-selected options are not compared to the defined `default`
|
||||
|
||||
Raises:
|
||||
`SelectOptionsWrong`
|
||||
if the `options` dictionary is not entirely equal to a dictionary with the <option> tag `values` as keys
|
||||
and their visible text as values
|
||||
`FieldInterfaceWrong`
|
||||
if the `default` is not equal to the value of the <option> tag which has the `selected` attribute
|
||||
"""
|
||||
html_options = {(tag[VALUE], printable_only(tag.get_text(strip=True))) for tag in field_tag.find_all(OPTION)}
|
||||
interface_options = set(field_interface.options.items())
|
||||
missing_in_interface = html_options - interface_options
|
||||
not_in_html = interface_options - html_options
|
||||
s = ""
|
||||
if missing_in_interface:
|
||||
s += "\nThe following <options> HTML tags were not found in the interface:\n"
|
||||
s += "\n".join(str(tup) for tup in missing_in_interface)
|
||||
if not_in_html:
|
||||
s += "\nThe following options were defined, but are not present in the HTML:\n"
|
||||
s += "\n".join(str(tup) for tup in not_in_html)
|
||||
if s:
|
||||
raise SelectOptionsWrong(f"Wrong options in field '{field_interface.name}'." + s)
|
||||
if check_defaults:
|
||||
default_option = field_tag.find(lambda tag: tag.name == OPTION and tag.has_attr(SELECTED))
|
||||
if default_option is not None and default_option[VALUE] != field_interface.default:
|
||||
raise FieldInterfaceWrong(f"Default option '{default_option[VALUE]}' missing for {field_interface}")
|
||||
|
||||
|
||||
def check_field_interface(field_tag: Tag, field_interface: 'FormField', check_defaults: bool = True) -> None:
|
||||
"""
|
||||
Compares all attributes of a `'FormField'` object with its HTML counterpart.
|
||||
Calls `check_select_field_options` on select fields.
|
||||
|
||||
Args:
|
||||
field_tag: The `bs4.Tag` object representing the <input> or <select> HTML tag
|
||||
field_interface: The `'FormField'` to check against the HTML tag
|
||||
check_defaults (optional): If set to `False`, pre-set field values or pre-selected options are not checked
|
||||
|
||||
Raises:
|
||||
`FieldInterfaceWrong`
|
||||
when dealing with an <input> tag, but `options` were defined on the interface, or
|
||||
when the `default` does not match the tag's `value` attribute
|
||||
"""
|
||||
assert field_tag[NAME] == field_interface.name
|
||||
if field_tag.name == INPUT:
|
||||
if field_interface.options is not None:
|
||||
raise FieldInterfaceWrong(f"Options provided for input field '{field_interface.name}'")
|
||||
if check_defaults:
|
||||
default = field_tag.attrs.get(VALUE)
|
||||
if field_interface.default != default:
|
||||
raise FieldInterfaceWrong(f"Input field default not correct on '{field_interface}'")
|
||||
if field_tag.name == SELECT:
|
||||
check_select_field_options(field_tag, field_interface, check_defaults=check_defaults)
|
||||
|
||||
|
||||
def check_form_interface(form_tag: Tag, form_interface: 'Form', check_defaults: bool = True) -> None:
|
||||
"""
|
||||
Compares all fields of a `Form` object with the fields found in its HTML counterpart.
|
||||
Calls `check_field_interface` on each field.
|
||||
|
||||
Args:
|
||||
form_tag: The `bs4.Tag` object representing the <form> HTML tag
|
||||
form_interface: The `Form` to check against the HTML tag
|
||||
check_defaults (optional): If set to `False`, pre-set field values or pre-selected options are not checked
|
||||
|
||||
Raises:
|
||||
`UnknownField`
|
||||
if any of the fields' name does not correspond to the `name` attribute of a field in the HTML form
|
||||
`IncompleteForm`
|
||||
if a field in the HTML form has not been defined in the form interface
|
||||
"""
|
||||
field_tags: Dict[str, Tag] = {tag[NAME]: tag for tag in form_tag.find_all(INPUT) + form_tag.find_all(SELECT)}
|
||||
for field in form_interface.fields.values():
|
||||
tag = field_tags.pop(field.name, None)
|
||||
if tag is None:
|
||||
raise UnknownField(f"The defined field does not exist in the form: {field}")
|
||||
check_field_interface(tag, field, check_defaults=check_defaults)
|
||||
if len(field_tags) > 0:
|
||||
raise IncompleteForm(f"Form interface missing fields: {list(field_tags.keys())}")
|
||||
|
||||
|
||||
def get_field_value(form_tag: Tag, field_name: str) -> str:
|
||||
"""
|
||||
Returns the string value of the `value` attribute of a form field with a specified name.
|
||||
|
||||
Args:
|
||||
form_tag: The `bs4.Tag` object representing the <form> HTML tag
|
||||
field_name: The value of the `name` attribute of the form field of interest
|
||||
|
||||
Raises:
|
||||
`ValueError`
|
||||
if no field with the specified name exists in the HTML form
|
||||
`AttributeError`
|
||||
if the field with the specified name does not have a `value` attribute
|
||||
"""
|
||||
def form_field_has_the_name(tag: Tag):
|
||||
return tag.name in {INPUT, SELECT} and tag.has_attr(NAME) and tag[NAME] == field_name
|
||||
field = form_tag.find(form_field_has_the_name)
|
||||
if field is None:
|
||||
raise ValueError(f"No form field with the name '{field_name}' found")
|
||||
if not field.has_attr(VALUE):
|
||||
raise AttributeError(f"Field with the name '{field_name}' has no `value` attribute")
|
||||
return field[VALUE]
|
||||
|
||||
|
||||
def get_field_values(form_tag: Tag, form_interface: 'Form', required_fields_only: bool = True) -> Dict[str, str]:
|
||||
"""
|
||||
Goes through the fields of a `Form` object and tries to retrieve a value for each from an HTML <form>.
|
||||
|
||||
Args:
|
||||
form_tag: The `bs4.Tag` object representing the <form> HTML tag
|
||||
form_interface: The `Form` for which to get the field values
|
||||
required_fields_only (optional): If `True` (default), values will be retrieved only for the required fields
|
||||
|
||||
Returns:
|
||||
Dictionary with each key-value-pair representing a (required) field for which a `value` attribute existed in the
|
||||
HTML form, with the key being identical to that of the field in the `Form.required_fields` dictionary and the
|
||||
value being the actual `value` attribute's string value
|
||||
"""
|
||||
output_dict = {}
|
||||
for key, field in form_interface.fields.items():
|
||||
if required_fields_only and not field.required:
|
||||
continue
|
||||
try:
|
||||
output_dict[key] = get_field_value(form_tag, field.name)
|
||||
except AttributeError:
|
||||
pass
|
||||
return output_dict
|
@ -1,67 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import Callable, IO, Union, Any
|
||||
|
||||
from yaml import SafeLoader, load, ScalarNode
|
||||
|
||||
|
||||
PathT = Union[Path, str]
|
||||
|
||||
INCLUDE_TAG = '!include'
|
||||
|
||||
|
||||
class RecursiveSafeLoader(SafeLoader):
|
||||
"""
|
||||
Custom `SafeLoader` for YAML streams that allows recursively referencing other files.
|
||||
This is done by honoring the special `INCLUDE_TAG` followed by a path to a readable YAML file.
|
||||
"""
|
||||
def __init__(self, stream: IO) -> None:
|
||||
try:
|
||||
self._root = Path(stream.name).parent
|
||||
except AttributeError:
|
||||
self._root = Path()
|
||||
super().__init__(stream)
|
||||
|
||||
def include(self, node: ScalarNode) -> Any:
|
||||
"""
|
||||
To be used in the constructor of a node that has the `INCLUDE_TAG` in it.
|
||||
The path following that tag can be either relative (to the file's own parent directory) or absolute.
|
||||
Since the constructor uses this class to load the node, arbitrary nesting of YAML file inclusions is possible.
|
||||
"""
|
||||
path = Path(self.construct_scalar(node))
|
||||
if not path.is_absolute():
|
||||
path = Path(self._root, path)
|
||||
with open(path, 'r') as f:
|
||||
return load(f, RecursiveSafeLoader)
|
||||
|
||||
|
||||
RecursiveSafeLoader.add_constructor(INCLUDE_TAG, RecursiveSafeLoader.include)
|
||||
|
||||
|
||||
def yaml_overload(*paths: PathT, dir_sort_key: Callable[[PathT], Any] = None, dir_sort_reverse: bool = False) -> dict:
|
||||
"""
|
||||
Loads YAML files from any number of paths, recursively going through any directories.
|
||||
|
||||
Args:
|
||||
paths:
|
||||
Each argument should be a path to either a YAML file or a directory containing YAML files;
|
||||
only files with the extension `.yaml` are loaded.
|
||||
dir_sort_key (optional):
|
||||
If one of the paths is a directory, its contents are sorted, before recursively passing them into this
|
||||
function; to apply a specific comparison key for each sub-path, a callable can be used here, which is
|
||||
passed into the builtin `sorted` function.
|
||||
dir_sort_reverse (optional):
|
||||
Same as with the parameter above, this argument is also passed into the `sorted` function.
|
||||
|
||||
Returns:
|
||||
Dictionary comprised of the contents of iteratively loaded YAML files.
|
||||
NOTE: Since it is updated each time a file is loaded, their load order matters!
|
||||
"""
|
||||
output_dict = {}
|
||||
for path in paths:
|
||||
path = Path(path)
|
||||
if path.is_dir():
|
||||
output_dict.update(yaml_overload(*sorted(path.iterdir(), key=dir_sort_key, reverse=dir_sort_reverse)))
|
||||
elif path.suffix == '.yaml':
|
||||
with open(path, 'r') as f:
|
||||
output_dict.update(load(f, RecursiveSafeLoader))
|
||||
return output_dict
|
Loading…
x
Reference in New Issue
Block a user