generated from daniil-berg/boilerplate-py
Compare commits
No commits in common. "c9e0e2f255636b4274a2e67ddeb3b3b13ef524cf" and "3d5d9bd8be06d299ac97625167d66168b1690d88" have entirely different histories.
c9e0e2f255
...
3d5d9bd8be
63
README.md
63
README.md
@ -4,66 +4,7 @@ Dynamically manage pools of asyncio tasks
|
|||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
Demo:
|
...
|
||||||
```python
|
|
||||||
import logging
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
from asyncio_taskpool.pool import TaskPool
|
|
||||||
|
|
||||||
|
|
||||||
logging.getLogger().setLevel(logging.NOTSET)
|
|
||||||
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
|
|
||||||
|
|
||||||
|
|
||||||
async def work(n):
|
|
||||||
for i in range(n):
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
print("did", i)
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
pool = TaskPool(work, (5,)) # initializes the pool
|
|
||||||
pool.start(3) # launches work tasks 0, 1, and 2
|
|
||||||
await asyncio.sleep(1.5)
|
|
||||||
pool.start() # launches work task 3
|
|
||||||
await asyncio.sleep(1.5)
|
|
||||||
pool.stop(2) # cancels tasks 3 and 2
|
|
||||||
await pool.gather() # awaits all tasks, then flushes the pool
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
asyncio.run(main())
|
|
||||||
```
|
|
||||||
|
|
||||||
Output:
|
|
||||||
```
|
|
||||||
Started work_pool_task_0
|
|
||||||
Started work_pool_task_1
|
|
||||||
Started work_pool_task_2
|
|
||||||
did 0
|
|
||||||
did 0
|
|
||||||
did 0
|
|
||||||
Started work_pool_task_3
|
|
||||||
did 1
|
|
||||||
did 1
|
|
||||||
did 1
|
|
||||||
did 0
|
|
||||||
did 2
|
|
||||||
did 2
|
|
||||||
Cancelling work_pool_task_2 ...
|
|
||||||
Cancelled work_pool_task_2
|
|
||||||
Exiting work_pool_task_2
|
|
||||||
Cancelling work_pool_task_3 ...
|
|
||||||
Cancelled work_pool_task_3
|
|
||||||
Exiting work_pool_task_3
|
|
||||||
did 3
|
|
||||||
did 3
|
|
||||||
Exiting work_pool_task_0
|
|
||||||
Exiting work_pool_task_1
|
|
||||||
did 4
|
|
||||||
did 4
|
|
||||||
```
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@ -71,7 +12,7 @@ did 4
|
|||||||
|
|
||||||
## Dependencies
|
## Dependencies
|
||||||
|
|
||||||
Python Version 3.8+, tested on Linux
|
Python Version ..., OS ...
|
||||||
|
|
||||||
## Building from source
|
## Building from source
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = asyncio-taskpool
|
name = asyncio-taskpool
|
||||||
version = 0.0.1
|
version = 0.0.1
|
||||||
author = Daniil Fajnberg
|
author = Daniil
|
||||||
author_email = mail@daniil.fajnberg.de
|
author_email = mail@placeholder123.to
|
||||||
description = Dynamically manage pools of asyncio tasks
|
description = Dynamically manage pools of asyncio tasks
|
||||||
long_description = file: README.md
|
long_description = file: README.md
|
||||||
long_description_content_type = text/markdown
|
long_description_content_type = text/markdown
|
||||||
@ -17,7 +17,9 @@ classifiers =
|
|||||||
package_dir =
|
package_dir =
|
||||||
= src
|
= src
|
||||||
packages = find:
|
packages = find:
|
||||||
python_requires = >=3.8
|
python_requires = >=3
|
||||||
|
install_requires =
|
||||||
|
...
|
||||||
|
|
||||||
[options.extras_require]
|
[options.extras_require]
|
||||||
dev =
|
dev =
|
||||||
|
@ -1 +0,0 @@
|
|||||||
from .pool import TaskPool
|
|
@ -1,53 +0,0 @@
|
|||||||
import logging
|
|
||||||
from asyncio import gather
|
|
||||||
from asyncio.tasks import Task
|
|
||||||
from typing import Mapping, List, Iterable, Any
|
|
||||||
|
|
||||||
from .types import CoroutineFunc, FinalCallbackT, CancelCallbackT
|
|
||||||
from .task import start_task
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class TaskPool:
|
|
||||||
def __init__(self, func: CoroutineFunc, args: Iterable[Any] = (), kwargs: Mapping[str, Any] = None,
|
|
||||||
final_callback: FinalCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
|
|
||||||
self._func: CoroutineFunc = func
|
|
||||||
self._args: Iterable[Any] = args
|
|
||||||
self._kwargs: Mapping[str, Any] = kwargs if kwargs is not None else {}
|
|
||||||
self._final_callback: FinalCallbackT = final_callback
|
|
||||||
self._cancel_callback: CancelCallbackT = cancel_callback
|
|
||||||
self._tasks: List[Task] = []
|
|
||||||
|
|
||||||
@property
|
|
||||||
def func_name(self) -> str:
|
|
||||||
return self._func.__name__
|
|
||||||
|
|
||||||
@property
|
|
||||||
def size(self) -> int:
|
|
||||||
return len(self._tasks)
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return f'<{self.__class__.__name__} func={self.func_name} size={self.size}>'
|
|
||||||
|
|
||||||
def _task_name(self, i: int) -> str:
|
|
||||||
return f'{self.func_name}_pool_task_{i}'
|
|
||||||
|
|
||||||
def _start_one(self) -> None:
|
|
||||||
self._tasks.append(start_task(self._func(*self._args, **self._kwargs), self._task_name(self.size),
|
|
||||||
final_callback=self._final_callback, cancel_callback=self._cancel_callback))
|
|
||||||
|
|
||||||
def start(self, num: int = 1) -> None:
|
|
||||||
for _ in range(num):
|
|
||||||
self._start_one()
|
|
||||||
|
|
||||||
def stop(self, num: int = 1) -> int:
|
|
||||||
if num < 1:
|
|
||||||
return 0
|
|
||||||
return sum(task.cancel() for task in reversed(self._tasks[-num:]))
|
|
||||||
|
|
||||||
async def gather(self, return_exceptions: bool = False):
|
|
||||||
results = await gather(*self._tasks, return_exceptions=return_exceptions)
|
|
||||||
self._tasks = []
|
|
||||||
return results
|
|
@ -1,30 +0,0 @@
|
|||||||
import logging
|
|
||||||
from asyncio.exceptions import CancelledError
|
|
||||||
from asyncio.tasks import Task, create_task
|
|
||||||
from typing import Awaitable, Any
|
|
||||||
|
|
||||||
from .types import FinalCallbackT, CancelCallbackT
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
async def wrap(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None,
|
|
||||||
cancel_callback: CancelCallbackT = None) -> Any:
|
|
||||||
log.info("Started %s", task_name)
|
|
||||||
try:
|
|
||||||
return await awaitable
|
|
||||||
except CancelledError:
|
|
||||||
log.info("Cancelling %s ...", task_name)
|
|
||||||
if callable(cancel_callback):
|
|
||||||
cancel_callback()
|
|
||||||
log.info("Cancelled %s", task_name)
|
|
||||||
finally:
|
|
||||||
if callable(final_callback):
|
|
||||||
final_callback()
|
|
||||||
log.info("Exiting %s", task_name)
|
|
||||||
|
|
||||||
|
|
||||||
def start_task(awaitable: Awaitable, task_name: str, final_callback: FinalCallbackT = None,
|
|
||||||
cancel_callback: CancelCallbackT = None) -> Task:
|
|
||||||
return create_task(wrap(awaitable, task_name, final_callback, cancel_callback), name=task_name)
|
|
@ -1,6 +0,0 @@
|
|||||||
from typing import Callable, Awaitable, Any
|
|
||||||
|
|
||||||
|
|
||||||
CoroutineFunc = Callable[[...], Awaitable[Any]]
|
|
||||||
FinalCallbackT = Callable
|
|
||||||
CancelCallbackT = Callable
|
|
0
src/package_name/__init__.py
Normal file
0
src/package_name/__init__.py
Normal file
Loading…
x
Reference in New Issue
Block a user