From c9e0e2f255636b4274a2e67ddeb3b3b13ef524cf Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Fri, 4 Feb 2022 11:07:02 +0100 Subject: [PATCH] first working draft --- README.md | 61 +++++++++++++++++++++++++++++++- src/asyncio_taskpool/__init__.py | 1 + src/asyncio_taskpool/pool.py | 53 +++++++++++++++++++++++++++ src/asyncio_taskpool/task.py | 30 ++++++++++++++++ src/asyncio_taskpool/types.py | 6 ++++ 5 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 src/asyncio_taskpool/pool.py create mode 100644 src/asyncio_taskpool/task.py create mode 100644 src/asyncio_taskpool/types.py diff --git a/README.md b/README.md index e9aa470..373990c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,66 @@ Dynamically manage pools of asyncio tasks ## Usage -... +Demo: +```python +import logging +import asyncio + +from asyncio_taskpool.pool import TaskPool + + +logging.getLogger().setLevel(logging.NOTSET) +logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler()) + + +async def work(n): + for i in range(n): + await asyncio.sleep(1) + print("did", i) + + +async def main(): + pool = TaskPool(work, (5,)) # initializes the pool + pool.start(3) # launches work tasks 0, 1, and 2 + await asyncio.sleep(1.5) + pool.start() # launches work task 3 + await asyncio.sleep(1.5) + pool.stop(2) # cancels tasks 3 and 2 + await pool.gather() # awaits all tasks, then flushes the pool + + +if __name__ == '__main__': + asyncio.run(main()) +``` + +Output: +``` +Started work_pool_task_0 +Started work_pool_task_1 +Started work_pool_task_2 +did 0 +did 0 +did 0 +Started work_pool_task_3 +did 1 +did 1 +did 1 +did 0 +did 2 +did 2 +Cancelling work_pool_task_2 ... +Cancelled work_pool_task_2 +Exiting work_pool_task_2 +Cancelling work_pool_task_3 ... +Cancelled work_pool_task_3 +Exiting work_pool_task_3 +did 3 +did 3 +Exiting work_pool_task_0 +Exiting work_pool_task_1 +did 4 +did 4 +``` ## Installation diff --git a/src/asyncio_taskpool/__init__.py b/src/asyncio_taskpool/__init__.py index e69de29..4a64d4c 100644 --- a/src/asyncio_taskpool/__init__.py +++ b/src/asyncio_taskpool/__init__.py @@ -0,0 +1 @@ +from .pool import TaskPool diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py new file mode 100644 index 0000000..b0bc38e --- /dev/null +++ b/src/asyncio_taskpool/pool.py @@ -0,0 +1,53 @@ +import logging +from asyncio import gather +from asyncio.tasks import Task +from typing import Mapping, List, Iterable, Any + +from .types import CoroutineFunc, FinalCallbackT, CancelCallbackT +from .task import start_task + + +log = logging.getLogger(__name__) + + +class TaskPool: + def __init__(self, func: CoroutineFunc, args: Iterable[Any] = (), kwargs: Mapping[str, Any] = None, + final_callback: FinalCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: + self._func: CoroutineFunc = func + self._args: Iterable[Any] = args + self._kwargs: Mapping[str, Any] = kwargs if kwargs is not None else {} + self._final_callback: FinalCallbackT = final_callback + self._cancel_callback: CancelCallbackT = cancel_callback + self._tasks: List[Task] = [] + + @property + def func_name(self) -> str: + return self._func.__name__ + + @property + def size(self) -> int: + return len(self._tasks) + + def __repr__(self) -> str: + return f'<{self.__class__.__name__} func={self.func_name} size={self.size}>' + + def _task_name(self, i: int) -> str: + return f'{self.func_name}_pool_task_{i}' + + def _start_one(self) -> None: + self._tasks.append(start_task(self._func(*self._args, **self._kwargs), self._task_name(self.size), + final_callback=self._final_callback, cancel_callback=self._cancel_callback)) + + def start(self, num: int = 1) -> None: + for _ in range(num): + self._start_one() + + def stop(self, num: int = 1) -> int: + if num < 1: + return 0 + return sum(task.cancel() for task in reversed(self._tasks[-num:])) + + async def gather(self, return_exceptions: bool = False): + results = await gather(*self._tasks, return_exceptions=return_exceptions) + self._tasks = [] + return results diff --git a/src/asyncio_taskpool/task.py b/src/asyncio_taskpool/task.py new file mode 100644 index 0000000..d006212 --- /dev/null +++ b/src/asyncio_taskpool/task.py @@ -0,0 +1,30 @@ +import logging +from asyncio.exceptions import CancelledError +from asyncio.tasks import Task, create_task +from typing import Awaitable, Any + +from .types import FinalCallbackT, CancelCallbackT + + +log = logging.getLogger(__name__) + + +async def wrap(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None, + cancel_callback: CancelCallbackT = None) -> Any: + log.info("Started %s", task_name) + try: + return await awaitable + except CancelledError: + log.info("Cancelling %s ...", task_name) + if callable(cancel_callback): + cancel_callback() + log.info("Cancelled %s", task_name) + finally: + if callable(final_callback): + final_callback() + log.info("Exiting %s", task_name) + + +def start_task(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None, + cancel_callback: CancelCallbackT = None) -> Task: + return create_task(wrap(awaitable, task_name, final_callback, cancel_callback), name=task_name) diff --git a/src/asyncio_taskpool/types.py b/src/asyncio_taskpool/types.py new file mode 100644 index 0000000..f2c16d4 --- /dev/null +++ b/src/asyncio_taskpool/types.py @@ -0,0 +1,6 @@ +from typing import Callable, Awaitable, Any + + +CoroutineFunc = Callable[[...], Awaitable[Any]] +FinalCallbackT = Callable +CancelCallbackT = Callable