From cba554561f99ed343bb9172eb1623690d0b02794 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Wed, 26 Jan 2022 09:23:36 +0100 Subject: [PATCH] added the `attempt` decorator --- README.md | 6 ++++- setup.cfg | 2 +- src/webutils/util.py | 64 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 09e29d2..d0c9e12 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,14 @@ Makes it more convenient to run awaitable objects in concurrent batches. ## Decorators -### in_async_session +### @in_async_session Handles starting and closing of a temporary `aiohttp.ClientSession` instance around any async function that makes use of such an object to perform requests. +### @attempt + +Allows defining for an async function to be called repeatedly if it throws a specific kind of exception. + ## Building Clone this repo, install `build` via pip, then run `python -m build` from the repository's root directory. This should produce a `dist/` subdirectory with a wheel (build) and archive (source) distribution. diff --git a/setup.cfg b/setup.cfg index 528bc2a..f3f9430 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = webutils-df -version = 0.0.5 +version = 0.0.6 author = Daniil F. author_email = mail@placeholder123.to description = Miscellaneous web utilities diff --git a/src/webutils/util.py b/src/webutils/util.py index 43a6ca1..7ed5def 100644 --- a/src/webutils/util.py +++ b/src/webutils/util.py @@ -2,7 +2,9 @@ import logging import asyncio from functools import wraps from inspect import signature -from typing import Callable, Awaitable, Dict, Tuple, Any, TypeVar +from math import inf +from timeit import default_timer +from typing import Callable, Awaitable, Dict, Tuple, Sequence, Any, Type, Union, TypeVar from aiohttp.client import ClientSession @@ -11,6 +13,7 @@ LOGGER_NAME = 'webutils' logger = logging.getLogger(LOGGER_NAME) AsyncFunction = TypeVar('AsyncFunction') +AttemptCallbackT = Callable[[AsyncFunction, Exception, int, float, tuple, dict], Awaitable[None]] def _get_param_idx_and_default(function: Callable, param_name: str) -> Tuple[int, Any]: @@ -85,6 +88,65 @@ def in_async_session(_func: AsyncFunction = None, *, return decorator if _func is None else decorator(_func) +def attempt(_func: AsyncFunction = None, *, + exception: Union[Type[Exception], Sequence[Type[Exception]]] = Exception, + max_attempts: float = inf, + timeout_seconds: float = inf, + seconds_between: float = 0, + callback: AttemptCallbackT = None) -> AsyncFunction: + """ + Decorator allowing an async function to be called repeatedly, if previous attempts cause specific exceptions. + Note: If no limiting arguments are passed to the decorator, the decorated function **will** be called repeatedly in + a potentially infinite loop, as long as it keeps throwing an exception. + + Args: + _func: + Control parameter; allows using the decorator with or without arguments. + If this decorator is used *with any* arguments, this will always be the decorated function itself. + exception (optional): + An `Exception` (sub-)class or a sequence thereof; a failed call of the decorated function will only be + repeated if it fails with a matching exception. Defaults to `Exception`, i.e. any exception. + max_attempts (optional): + The maximum number of (re-)attempts at calling the decorated function; if it is called `max_attempts` times + and fails, the exception will be propagated. The number of attempts is unlimited by default. + timeout_seconds (optional): + Defines the cutoff time (in seconds) for the entirety of attempts at executing the decorated function; + if the attempts take longer in total and fail, the exception will be propagated. No timeout by default. + seconds_between (optional): + Sets a sleep interval (in seconds) between each attempt to call the decorated function. Defaults to 0. + callback (optional): + If passed an async function (with matching parameters), a failed **and caught** attempt will call it with + the following positional arguments (in that order): + - the decorated async function itself + - the exception class encountered and caught + - the total number of failed attempts up to that point + - the `seconds_between` argument + - positional and keyword arguments (as tuple and dictionary respectively) passed to the decorated function + + Raises: + Any exceptions that do **not** match those passed to the `exception` parameter are immediately propagated. + Those that were specified in `exception` are propagated when `max_attempts` or `timeout_seconds` are reached. + """ + def decorator(function: AsyncFunction) -> AsyncFunction: + # Using `functools.wraps` to preserve information about the actual function being decorated + # More details: https://docs.python.org/3/library/functools.html#functools.wraps + @wraps(function) + async def wrapper(*args, **kwargs) -> Any: + start, failed_attempts = default_timer(), 0 + while True: + try: + return await function(*args, **kwargs) + except exception as e: + failed_attempts += 1 + if default_timer() - start >= timeout_seconds or failed_attempts >= max_attempts: + raise e + if callback: + await callback(function, e, failed_attempts, seconds_between, args, kwargs) + await asyncio.sleep(seconds_between) + return wrapper + return decorator if _func is None else decorator(_func) + + async def gather_in_batches(batch_size: int, *aws: Awaitable, return_exceptions: bool = False) -> list: """ Simple extension of the `asyncio.gather` function to make it easy to run awaitable objects in concurrent batches.