From f45fef6497ac7cc8b7058e755adff9f87caefadf Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Fri, 4 Feb 2022 19:23:09 +0100 Subject: [PATCH] usage example; pool close instead of gather --- README.md | 61 +-------------------------- src/asyncio_taskpool/pool.py | 2 +- usage/USAGE.md | 80 ++++++++++++++++++++++++++++++++++++ usage/example_server.py | 2 +- 4 files changed, 83 insertions(+), 62 deletions(-) create mode 100644 usage/USAGE.md diff --git a/README.md b/README.md index 373990c..aca62ec 100644 --- a/README.md +++ b/README.md @@ -4,66 +4,7 @@ Dynamically manage pools of asyncio tasks ## Usage -Demo: -```python -import logging -import asyncio - -from asyncio_taskpool.pool import TaskPool - - -logging.getLogger().setLevel(logging.NOTSET) -logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler()) - - -async def work(n): - for i in range(n): - await asyncio.sleep(1) - print("did", i) - - -async def main(): - pool = TaskPool(work, (5,)) # initializes the pool - pool.start(3) # launches work tasks 0, 1, and 2 - await asyncio.sleep(1.5) - pool.start() # launches work task 3 - await asyncio.sleep(1.5) - pool.stop(2) # cancels tasks 3 and 2 - await pool.gather() # awaits all tasks, then flushes the pool - - -if __name__ == '__main__': - asyncio.run(main()) -``` - -Output: -``` -Started work_pool_task_0 -Started work_pool_task_1 -Started work_pool_task_2 -did 0 -did 0 -did 0 -Started work_pool_task_3 -did 1 -did 1 -did 1 -did 0 -did 2 -did 2 -Cancelling work_pool_task_2 ... -Cancelled work_pool_task_2 -Exiting work_pool_task_2 -Cancelling work_pool_task_3 ... -Cancelled work_pool_task_3 -Exiting work_pool_task_3 -did 3 -did 3 -Exiting work_pool_task_0 -Exiting work_pool_task_1 -did 4 -did 4 -``` +See [USAGE.md](usage/USAGE.md) ## Installation diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 421bb5d..f26bfc8 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -71,7 +71,7 @@ class TaskPool: def stop_all(self) -> int: return self.stop(self.size) - async def gather(self, return_exceptions: bool = False): + async def close(self, return_exceptions: bool = False): results = await gather(*self._tasks, *self._cancelled, return_exceptions=return_exceptions) self._tasks = self._cancelled = [] return results diff --git a/usage/USAGE.md b/usage/USAGE.md new file mode 100644 index 0000000..e9b9aab --- /dev/null +++ b/usage/USAGE.md @@ -0,0 +1,80 @@ +# Using `asyncio-taskpool` + +## Simple example + +The minimum required setup is a "worker" coroutine function that can do something asynchronously, a main coroutine function that sets up the `TaskPool` and starts/stops the tasks as desired, eventually awaiting them all. + +The following demo code enables full log output first for additional clarity. It is complete and should work as is. + +### Code +```python +import logging +import asyncio + +from asyncio_taskpool.pool import TaskPool + + +logging.getLogger().setLevel(logging.NOTSET) +logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler()) + + +async def work(n: int) -> None: + """ + Pseudo-worker function. + Counts up to an integer with a second of sleep before each iteration. + In a real-world use case, a worker function should probably have access + to some synchronisation primitive or shared resource to distribute work + between an arbitrary number of workers. + """ + for i in range(n): + await asyncio.sleep(1) + print("did", i) + + +async def main() -> None: + pool = TaskPool(work, (5,)) # initializes the pool; no work is being done yet + pool.start(3) # launches work tasks 0, 1, and 2 + await asyncio.sleep(1.5) # lets the tasks work for a bit + pool.start() # launches work task 3 + await asyncio.sleep(1.5) # lets the tasks work for a bit + pool.stop(2) # cancels tasks 3 and 2 + await pool.close() # awaits all tasks, then flushes the pool + + +if __name__ == '__main__': + asyncio.run(main()) +``` + +### Output +Additional comments indicated with `<--` +``` +Started work_pool_task_0 +Started work_pool_task_1 +Started work_pool_task_2 +did 0 +did 0 +did 0 +Started work_pool_task_3 +did 1 +did 1 +did 1 +did 0 <-- notice that the newly created task begins counting at 0 +did 2 +did 2 <-- two taks were stopped; only tasks 0 and 1 continue "working" +Cancelling work_pool_task_2 ... +Cancelled work_pool_task_2 +Exiting work_pool_task_2 +Cancelling work_pool_task_3 ... +Cancelled work_pool_task_3 +Exiting work_pool_task_3 +did 3 +did 3 +Exiting work_pool_task_0 +Exiting work_pool_task_1 +did 4 +did 4 +``` + +## Advanced example + +... diff --git a/usage/example_server.py b/usage/example_server.py index ffd1e0f..08dfb26 100644 --- a/usage/example_server.py +++ b/usage/example_server.py @@ -56,7 +56,7 @@ async def main() -> None: # Finally we allow for all tasks to do do their cleanup, if they need to do any, upon being cancelled. # We block until they all return or raise an exception, but since we are not interested in any of their exceptions, # we just silently collect their exceptions along with their return values. - await pool.gather(return_exceptions=True) + await pool.close(return_exceptions=True) await control_server_task