usage example; pool close instead of gather

This commit is contained in:
Daniil Fajnberg 2022-02-04 19:23:09 +01:00
parent 7020493d53
commit f45fef6497
4 changed files with 83 additions and 62 deletions

View File

@ -4,66 +4,7 @@ Dynamically manage pools of asyncio tasks
## Usage ## Usage
Demo: See [USAGE.md](usage/USAGE.md)
```python
import logging
import asyncio
from asyncio_taskpool.pool import TaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
async def work(n):
for i in range(n):
await asyncio.sleep(1)
print("did", i)
async def main():
pool = TaskPool(work, (5,)) # initializes the pool
pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5)
pool.start() # launches work task 3
await asyncio.sleep(1.5)
pool.stop(2) # cancels tasks 3 and 2
await pool.gather() # awaits all tasks, then flushes the pool
if __name__ == '__main__':
asyncio.run(main())
```
Output:
```
Started work_pool_task_0
Started work_pool_task_1
Started work_pool_task_2
did 0
did 0
did 0
Started work_pool_task_3
did 1
did 1
did 1
did 0
did 2
did 2
Cancelling work_pool_task_2 ...
Cancelled work_pool_task_2
Exiting work_pool_task_2
Cancelling work_pool_task_3 ...
Cancelled work_pool_task_3
Exiting work_pool_task_3
did 3
did 3
Exiting work_pool_task_0
Exiting work_pool_task_1
did 4
did 4
```
## Installation ## Installation

View File

@ -71,7 +71,7 @@ class TaskPool:
def stop_all(self) -> int: def stop_all(self) -> int:
return self.stop(self.size) return self.stop(self.size)
async def gather(self, return_exceptions: bool = False): async def close(self, return_exceptions: bool = False):
results = await gather(*self._tasks, *self._cancelled, return_exceptions=return_exceptions) results = await gather(*self._tasks, *self._cancelled, return_exceptions=return_exceptions)
self._tasks = self._cancelled = [] self._tasks = self._cancelled = []
return results return results

80
usage/USAGE.md Normal file
View File

@ -0,0 +1,80 @@
# Using `asyncio-taskpool`
## Simple example
The minimum required setup is a "worker" coroutine function that can do something asynchronously, a main coroutine function that sets up the `TaskPool` and starts/stops the tasks as desired, eventually awaiting them all.
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python
import logging
import asyncio
from asyncio_taskpool.pool import TaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
async def work(n: int) -> None:
"""
Pseudo-worker function.
Counts up to an integer with a second of sleep before each iteration.
In a real-world use case, a worker function should probably have access
to some synchronisation primitive or shared resource to distribute work
between an arbitrary number of workers.
"""
for i in range(n):
await asyncio.sleep(1)
print("did", i)
async def main() -> None:
pool = TaskPool(work, (5,)) # initializes the pool; no work is being done yet
pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.start() # launches work task 3
await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.stop(2) # cancels tasks 3 and 2
await pool.close() # awaits all tasks, then flushes the pool
if __name__ == '__main__':
asyncio.run(main())
```
### Output
Additional comments indicated with `<--`
```
Started work_pool_task_0
Started work_pool_task_1
Started work_pool_task_2
did 0
did 0
did 0
Started work_pool_task_3
did 1
did 1
did 1
did 0 <-- notice that the newly created task begins counting at 0
did 2
did 2 <-- two taks were stopped; only tasks 0 and 1 continue "working"
Cancelling work_pool_task_2 ...
Cancelled work_pool_task_2
Exiting work_pool_task_2
Cancelling work_pool_task_3 ...
Cancelled work_pool_task_3
Exiting work_pool_task_3
did 3
did 3
Exiting work_pool_task_0
Exiting work_pool_task_1
did 4
did 4
```
## Advanced example
...

View File

@ -56,7 +56,7 @@ async def main() -> None:
# Finally we allow for all tasks to do do their cleanup, if they need to do any, upon being cancelled. # Finally we allow for all tasks to do do their cleanup, if they need to do any, upon being cancelled.
# We block until they all return or raise an exception, but since we are not interested in any of their exceptions, # We block until they all return or raise an exception, but since we are not interested in any of their exceptions,
# we just silently collect their exceptions along with their return values. # we just silently collect their exceptions along with their return values.
await pool.gather(return_exceptions=True) await pool.close(return_exceptions=True)
await control_server_task await control_server_task