Compare commits

...

2 Commits

Author SHA1 Message Date
c9e0e2f255 first working draft 2022-02-04 11:07:02 +01:00
2e57447f5c meta adjustments 2022-02-03 20:10:26 +01:00
7 changed files with 154 additions and 7 deletions

View File

@ -4,7 +4,66 @@ Dynamically manage pools of asyncio tasks
## Usage
...
Demo:
```python
import logging
import asyncio
from asyncio_taskpool.pool import TaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
async def work(n):
for i in range(n):
await asyncio.sleep(1)
print("did", i)
async def main():
pool = TaskPool(work, (5,)) # initializes the pool
pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5)
pool.start() # launches work task 3
await asyncio.sleep(1.5)
pool.stop(2) # cancels tasks 3 and 2
await pool.gather() # awaits all tasks, then flushes the pool
if __name__ == '__main__':
asyncio.run(main())
```
Output:
```
Started work_pool_task_0
Started work_pool_task_1
Started work_pool_task_2
did 0
did 0
did 0
Started work_pool_task_3
did 1
did 1
did 1
did 0
did 2
did 2
Cancelling work_pool_task_2 ...
Cancelled work_pool_task_2
Exiting work_pool_task_2
Cancelling work_pool_task_3 ...
Cancelled work_pool_task_3
Exiting work_pool_task_3
did 3
did 3
Exiting work_pool_task_0
Exiting work_pool_task_1
did 4
did 4
```
## Installation
@ -12,7 +71,7 @@ Dynamically manage pools of asyncio tasks
## Dependencies
Python Version ..., OS ...
Python Version 3.8+, tested on Linux
## Building from source

View File

@ -1,8 +1,8 @@
[metadata]
name = asyncio-taskpool
version = 0.0.1
author = Daniil
author_email = mail@placeholder123.to
author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks
long_description = file: README.md
long_description_content_type = text/markdown
@ -17,9 +17,7 @@ classifiers =
package_dir =
= src
packages = find:
python_requires = >=3
install_requires =
...
python_requires = >=3.8
[options.extras_require]
dev =

View File

@ -0,0 +1 @@
from .pool import TaskPool

View File

@ -0,0 +1,53 @@
import logging
from asyncio import gather
from asyncio.tasks import Task
from typing import Mapping, List, Iterable, Any
from .types import CoroutineFunc, FinalCallbackT, CancelCallbackT
from .task import start_task
log = logging.getLogger(__name__)
class TaskPool:
def __init__(self, func: CoroutineFunc, args: Iterable[Any] = (), kwargs: Mapping[str, Any] = None,
final_callback: FinalCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
self._func: CoroutineFunc = func
self._args: Iterable[Any] = args
self._kwargs: Mapping[str, Any] = kwargs if kwargs is not None else {}
self._final_callback: FinalCallbackT = final_callback
self._cancel_callback: CancelCallbackT = cancel_callback
self._tasks: List[Task] = []
@property
def func_name(self) -> str:
return self._func.__name__
@property
def size(self) -> int:
return len(self._tasks)
def __repr__(self) -> str:
return f'<{self.__class__.__name__} func={self.func_name} size={self.size}>'
def _task_name(self, i: int) -> str:
return f'{self.func_name}_pool_task_{i}'
def _start_one(self) -> None:
self._tasks.append(start_task(self._func(*self._args, **self._kwargs), self._task_name(self.size),
final_callback=self._final_callback, cancel_callback=self._cancel_callback))
def start(self, num: int = 1) -> None:
for _ in range(num):
self._start_one()
def stop(self, num: int = 1) -> int:
if num < 1:
return 0
return sum(task.cancel() for task in reversed(self._tasks[-num:]))
async def gather(self, return_exceptions: bool = False):
results = await gather(*self._tasks, return_exceptions=return_exceptions)
self._tasks = []
return results

View File

@ -0,0 +1,30 @@
import logging
from asyncio.exceptions import CancelledError
from asyncio.tasks import Task, create_task
from typing import Awaitable, Any
from .types import FinalCallbackT, CancelCallbackT
log = logging.getLogger(__name__)
async def wrap(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None,
cancel_callback: CancelCallbackT = None) -> Any:
log.info("Started %s", task_name)
try:
return await awaitable
except CancelledError:
log.info("Cancelling %s ...", task_name)
if callable(cancel_callback):
cancel_callback()
log.info("Cancelled %s", task_name)
finally:
if callable(final_callback):
final_callback()
log.info("Exiting %s", task_name)
def start_task(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None,
cancel_callback: CancelCallbackT = None) -> Task:
return create_task(wrap(awaitable, task_name, final_callback, cancel_callback), name=task_name)

View File

@ -0,0 +1,6 @@
from typing import Callable, Awaitable, Any
CoroutineFunc = Callable[[...], Awaitable[Any]]
FinalCallbackT = Callable
CancelCallbackT = Callable