added the `attempt` decorator

This commit is contained in:
Daniil Fajnberg 2022-01-26 09:23:36 +01:00
parent 73133e3f74
commit cba554561f
3 changed files with 69 additions and 3 deletions

View File

@ -10,10 +10,14 @@ Makes it more convenient to run awaitable objects in concurrent batches.
## Decorators ## Decorators
### in_async_session ### @in_async_session
Handles starting and closing of a temporary `aiohttp.ClientSession` instance around any async function that makes use of such an object to perform requests. Handles starting and closing of a temporary `aiohttp.ClientSession` instance around any async function that makes use of such an object to perform requests.
### @attempt
Allows defining for an async function to be called repeatedly if it throws a specific kind of exception.
## Building ## Building
Clone this repo, install `build` via pip, then run `python -m build` from the repository's root directory. This should produce a `dist/` subdirectory with a wheel (build) and archive (source) distribution. Clone this repo, install `build` via pip, then run `python -m build` from the repository's root directory. This should produce a `dist/` subdirectory with a wheel (build) and archive (source) distribution.

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = webutils-df name = webutils-df
version = 0.0.5 version = 0.0.6
author = Daniil F. author = Daniil F.
author_email = mail@placeholder123.to author_email = mail@placeholder123.to
description = Miscellaneous web utilities description = Miscellaneous web utilities

View File

@ -2,7 +2,9 @@ import logging
import asyncio import asyncio
from functools import wraps from functools import wraps
from inspect import signature from inspect import signature
from typing import Callable, Awaitable, Dict, Tuple, Any, TypeVar from math import inf
from timeit import default_timer
from typing import Callable, Awaitable, Dict, Tuple, Sequence, Any, Type, Union, TypeVar
from aiohttp.client import ClientSession from aiohttp.client import ClientSession
@ -11,6 +13,7 @@ LOGGER_NAME = 'webutils'
logger = logging.getLogger(LOGGER_NAME) logger = logging.getLogger(LOGGER_NAME)
AsyncFunction = TypeVar('AsyncFunction') AsyncFunction = TypeVar('AsyncFunction')
AttemptCallbackT = Callable[[AsyncFunction, Exception, int, float, tuple, dict], Awaitable[None]]
def _get_param_idx_and_default(function: Callable, param_name: str) -> Tuple[int, Any]: def _get_param_idx_and_default(function: Callable, param_name: str) -> Tuple[int, Any]:
@ -85,6 +88,65 @@ def in_async_session(_func: AsyncFunction = None, *,
return decorator if _func is None else decorator(_func) return decorator if _func is None else decorator(_func)
def attempt(_func: AsyncFunction = None, *,
exception: Union[Type[Exception], Sequence[Type[Exception]]] = Exception,
max_attempts: float = inf,
timeout_seconds: float = inf,
seconds_between: float = 0,
callback: AttemptCallbackT = None) -> AsyncFunction:
"""
Decorator allowing an async function to be called repeatedly, if previous attempts cause specific exceptions.
Note: If no limiting arguments are passed to the decorator, the decorated function **will** be called repeatedly in
a potentially infinite loop, as long as it keeps throwing an exception.
Args:
_func:
Control parameter; allows using the decorator with or without arguments.
If this decorator is used *with any* arguments, this will always be the decorated function itself.
exception (optional):
An `Exception` (sub-)class or a sequence thereof; a failed call of the decorated function will only be
repeated if it fails with a matching exception. Defaults to `Exception`, i.e. any exception.
max_attempts (optional):
The maximum number of (re-)attempts at calling the decorated function; if it is called `max_attempts` times
and fails, the exception will be propagated. The number of attempts is unlimited by default.
timeout_seconds (optional):
Defines the cutoff time (in seconds) for the entirety of attempts at executing the decorated function;
if the attempts take longer in total and fail, the exception will be propagated. No timeout by default.
seconds_between (optional):
Sets a sleep interval (in seconds) between each attempt to call the decorated function. Defaults to 0.
callback (optional):
If passed an async function (with matching parameters), a failed **and caught** attempt will call it with
the following positional arguments (in that order):
- the decorated async function itself
- the exception class encountered and caught
- the total number of failed attempts up to that point
- the `seconds_between` argument
- positional and keyword arguments (as tuple and dictionary respectively) passed to the decorated function
Raises:
Any exceptions that do **not** match those passed to the `exception` parameter are immediately propagated.
Those that were specified in `exception` are propagated when `max_attempts` or `timeout_seconds` are reached.
"""
def decorator(function: AsyncFunction) -> AsyncFunction:
# Using `functools.wraps` to preserve information about the actual function being decorated
# More details: https://docs.python.org/3/library/functools.html#functools.wraps
@wraps(function)
async def wrapper(*args, **kwargs) -> Any:
start, failed_attempts = default_timer(), 0
while True:
try:
return await function(*args, **kwargs)
except exception as e:
failed_attempts += 1
if default_timer() - start >= timeout_seconds or failed_attempts >= max_attempts:
raise e
if callback:
await callback(function, e, failed_attempts, seconds_between, args, kwargs)
await asyncio.sleep(seconds_between)
return wrapper
return decorator if _func is None else decorator(_func)
async def gather_in_batches(batch_size: int, *aws: Awaitable, return_exceptions: bool = False) -> list: async def gather_in_batches(batch_size: int, *aws: Awaitable, return_exceptions: bool = False) -> list:
""" """
Simple extension of the `asyncio.gather` function to make it easy to run awaitable objects in concurrent batches. Simple extension of the `asyncio.gather` function to make it easy to run awaitable objects in concurrent batches.