bugfix for TaskPool._map; usage example for TaskPool

This commit is contained in:
Daniil Fajnberg 2022-02-08 16:15:55 +01:00
parent 727f0b7c8b
commit 410e73e68b
4 changed files with 195 additions and 18 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.1.6 version = 0.1.7
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -438,7 +438,7 @@ class TaskPool(BaseTaskPool):
break break
await q.put(arg) # This blocks as long as the queue is full. await q.put(arg) # This blocks as long as the queue is full.
async def _queue_consumer(self, q: Queue, func: CoroutineFunc, arg_stars: int = 0, async def _queue_consumer(self, q: Queue, first_batch_started: Event, func: CoroutineFunc, arg_stars: int = 0,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
Wrapper around the `_start_task()` taking the next element from the arguments queue set up in `_map()`. Wrapper around the `_start_task()` taking the next element from the arguments queue set up in `_map()`.
@ -447,6 +447,9 @@ class TaskPool(BaseTaskPool):
Args: Args:
q: q:
The queue of function arguments to consume for starting the next task. The queue of function arguments to consume for starting the next task.
first_batch_started:
The event flag to wait for, before launching the next consumer.
It can only set by the `_map()` method, which happens after the first batch of task has been started.
func: func:
The coroutine function to use for spawning the tasks within the task pool. The coroutine function to use for spawning the tasks within the task pool.
arg_stars (optional): arg_stars (optional):
@ -466,15 +469,17 @@ class TaskPool(BaseTaskPool):
await self._start_task( await self._start_task(
star_function(func, arg, arg_stars=arg_stars), star_function(func, arg, arg_stars=arg_stars),
ignore_closed=True, ignore_closed=True,
end_callback=partial(TaskPool._queue_callback, self, q=q, func=func, arg_stars=arg_stars, end_callback=partial(TaskPool._queue_callback, self, q=q, first_batch_started=first_batch_started,
end_callback=end_callback, cancel_callback=cancel_callback), func=func, arg_stars=arg_stars, end_callback=end_callback,
cancel_callback=cancel_callback),
cancel_callback=cancel_callback cancel_callback=cancel_callback
) )
finally: finally:
q.task_done() q.task_done()
async def _queue_callback(self, task_id: int, q: Queue, func: CoroutineFunc, arg_stars: int = 0, async def _queue_callback(self, task_id: int, q: Queue, first_batch_started: Event, func: CoroutineFunc,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: arg_stars: int = 0, end_callback: EndCallbackT = None,
cancel_callback: CancelCallbackT = None) -> None:
""" """
Wrapper around an end callback function passed into the `_map()` method. Wrapper around an end callback function passed into the `_map()` method.
Triggers the next `_queue_consumer` with the same arguments. Triggers the next `_queue_consumer` with the same arguments.
@ -484,6 +489,9 @@ class TaskPool(BaseTaskPool):
The ID of the ending task. The ID of the ending task.
q: q:
The queue of function arguments to consume for starting the next task. The queue of function arguments to consume for starting the next task.
first_batch_started:
The event flag to wait for, before launching the next consumer.
It can only set by the `_map()` method, which happens after the first batch of task has been started.
func: func:
The coroutine function to use for spawning the tasks within the task pool. The coroutine function to use for spawning the tasks within the task pool.
arg_stars (optional): arg_stars (optional):
@ -495,7 +503,9 @@ class TaskPool(BaseTaskPool):
The callback that was specified to execute after cancellation of the task (and the next one). The callback that was specified to execute after cancellation of the task (and the next one).
It is run with the `task_id` as its only positional argument. It is run with the `task_id` as its only positional argument.
""" """
await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) await first_batch_started.wait()
await self._queue_consumer(q, first_batch_started, func, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)
await execute_optional(end_callback, args=(task_id,)) await execute_optional(end_callback, args=(task_id,))
def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue: def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue:
@ -574,10 +584,18 @@ class TaskPool(BaseTaskPool):
if not self.is_open: if not self.is_open:
raise exceptions.PoolIsClosed("Cannot start new tasks") raise exceptions.PoolIsClosed("Cannot start new tasks")
args_queue = self._set_up_args_queue(args_iter, num_tasks) args_queue = self._set_up_args_queue(args_iter, num_tasks)
# We need a flag to ensure that starting all tasks from the first batch here will not be blocked by the
# `_queue_callback` triggered by one or more of them.
# This could happen, e.g. if the pool has just enough room for one more task, but the queue here contains more
# than one element, and the pool remains full until after the first task of the first batch ends. Then the
# callback might trigger the next `_queue_consumer` before this method can, which will keep it blocked.
first_batch_started = Event()
for _ in range(args_queue.qsize()): for _ in range(args_queue.qsize()):
# This is where blocking can occur, if the pool is full. # This is where blocking can occur, if the pool is full.
await self._queue_consumer(args_queue, func, await self._queue_consumer(args_queue, first_batch_started, func,
arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)
# Now the callbacks can immediately trigger more tasks.
first_batch_started.set()
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_tasks: int = 1, async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_tasks: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:

View File

@ -446,14 +446,14 @@ class TaskPoolTestCase(CommonTestCase):
q, arg = Queue(), 420.69 q, arg = Queue(), 420.69
q.put_nowait(arg) q.put_nowait(arg)
mock_func, stars = MagicMock(), 3 mock_func, stars = MagicMock(), 3
end_cb, cancel_cb = MagicMock(), MagicMock() mock_flag, end_cb, cancel_cb = MagicMock(), MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_consumer(q, mock_func, stars, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._queue_consumer(q, mock_flag, mock_func, stars, end_cb, cancel_cb))
self.assertTrue(q.empty()) self.assertTrue(q.empty())
mock__start_task.assert_awaited_once_with(awaitable, ignore_closed=True, mock__start_task.assert_awaited_once_with(awaitable, ignore_closed=True,
end_callback=queue_callback, cancel_callback=cancel_cb) end_callback=queue_callback, cancel_callback=cancel_cb)
mock_star_function.assert_called_once_with(mock_func, arg, arg_stars=stars) mock_star_function.assert_called_once_with(mock_func, arg, arg_stars=stars)
mock_partial.assert_called_once_with(pool.TaskPool._queue_callback, self.task_pool, mock_partial.assert_called_once_with(pool.TaskPool._queue_callback, self.task_pool,
q=q, func=mock_func, arg_stars=stars, q=q, first_batch_started=mock_flag, func=mock_func, arg_stars=stars,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__start_task.reset_mock() mock__start_task.reset_mock()
mock_star_function.reset_mock() mock_star_function.reset_mock()
@ -470,9 +470,13 @@ class TaskPoolTestCase(CommonTestCase):
async def test__queue_callback(self, mock__queue_consumer: AsyncMock, mock_execute_optional: AsyncMock): async def test__queue_callback(self, mock__queue_consumer: AsyncMock, mock_execute_optional: AsyncMock):
task_id, mock_q = 420, MagicMock() task_id, mock_q = 420, MagicMock()
mock_func, stars = MagicMock(), 3 mock_func, stars = MagicMock(), 3
mock_wait = AsyncMock()
mock_flag = MagicMock(wait=mock_wait)
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_callback(task_id, mock_q, mock_func, stars, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._queue_callback(task_id, mock_q, mock_flag, mock_func, stars,
mock__queue_consumer.assert_awaited_once_with(mock_q, mock_func, stars, end_callback=end_cb, cancel_callback=cancel_cb))
mock_wait.assert_awaited_once_with()
mock__queue_consumer.assert_awaited_once_with(mock_q, mock_flag, mock_func, stars,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock_execute_optional.assert_awaited_once_with(end_cb, args=(task_id,)) mock_execute_optional.assert_awaited_once_with(end_cb, args=(task_id,))
@ -521,13 +525,16 @@ class TaskPoolTestCase(CommonTestCase):
mock__queue_producer.assert_not_called() mock__queue_producer.assert_not_called()
mock_create_task.assert_not_called() mock_create_task.assert_not_called()
@patch.object(pool, 'Event')
@patch.object(pool.TaskPool, '_queue_consumer') @patch.object(pool.TaskPool, '_queue_consumer')
@patch.object(pool.TaskPool, '_set_up_args_queue') @patch.object(pool.TaskPool, '_set_up_args_queue')
@patch.object(pool.TaskPool, 'is_open', new_callable=PropertyMock) @patch.object(pool.TaskPool, 'is_open', new_callable=PropertyMock)
async def test__map(self, mock_is_open: MagicMock, mock__set_up_args_queue: MagicMock, async def test__map(self, mock_is_open: MagicMock, mock__set_up_args_queue: MagicMock,
mock__queue_consumer: AsyncMock): mock__queue_consumer: AsyncMock, mock_event_cls: MagicMock):
qsize = 4 qsize = 4
mock__set_up_args_queue.return_value = mock_q = MagicMock(qsize=MagicMock(return_value=qsize)) mock__set_up_args_queue.return_value = mock_q = MagicMock(qsize=MagicMock(return_value=qsize))
mock_flag_set = MagicMock()
mock_event_cls.return_value = mock_flag = MagicMock(set=mock_flag_set)
mock_func, stars = MagicMock(), 3 mock_func, stars = MagicMock(), 3
args_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2 args_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2
@ -539,14 +546,16 @@ class TaskPoolTestCase(CommonTestCase):
mock_is_open.assert_called_once_with() mock_is_open.assert_called_once_with()
mock__set_up_args_queue.assert_not_called() mock__set_up_args_queue.assert_not_called()
mock__queue_consumer.assert_not_awaited() mock__queue_consumer.assert_not_awaited()
mock_flag_set.assert_not_called()
mock_is_open.reset_mock() mock_is_open.reset_mock()
mock_is_open.return_value = True mock_is_open.return_value = True
self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb))
mock__set_up_args_queue.assert_called_once_with(args_iter, num_tasks) mock__set_up_args_queue.assert_called_once_with(args_iter, num_tasks)
mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_func, arg_stars=stars, mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_flag, mock_func, arg_stars=stars,
end_callback=end_cb, cancel_callback=cancel_cb)]) end_callback=end_cb, cancel_callback=cancel_cb)])
mock_flag_set.assert_called_once_with()
@patch.object(pool.TaskPool, '_map') @patch.object(pool.TaskPool, '_map')
async def test_map(self, mock__map: AsyncMock): async def test_map(self, mock__map: AsyncMock):

View File

@ -2,7 +2,7 @@
## Minimal example for `SimpleTaskPool` ## Minimal example for `SimpleTaskPool`
The minimum required setup is a "worker" coroutine function that can do something asynchronously, a main coroutine function that sets up the `SimpleTaskPool` and starts/stops the tasks as desired, eventually awaiting them all. The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
The following demo code enables full log output first for additional clarity. It is complete and should work as is. The following demo code enables full log output first for additional clarity. It is complete and should work as is.
@ -77,6 +77,156 @@ did 4
did 4 did 4
``` ```
## Advanced example ## Advanced example for `TaskPool`
... This time, we want to start tasks from _different_ coroutine functions **and** with _different_ arguments. For this we need an instance of the more generalized `TaskPool` class.
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python
import logging
import asyncio
from asyncio_taskpool.pool import TaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
async def work(start: int, stop: int, step: int = 1) -> None:
"""Pseudo-worker function counting through a range with a second of sleep in between each iteration."""
for i in range(start, stop, step):
await asyncio.sleep(1)
print("work with", i)
async def other_work(a: int, b: int) -> None:
"""Different pseudo-worker counting through a range with half a second of sleep in between each iteration."""
for i in range(a, b):
await asyncio.sleep(0.5)
print("other_work with", i)
async def main() -> None:
# Initialize a new task pool instance and limit its size to 3 tasks.
pool = TaskPool(3)
# Queue up two tasks (IDs 0 and 1) to run concurrently (with the same positional arguments).
print("Called `apply`")
await pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2)
# Let the tasks work for a bit.
await asyncio.sleep(1.5)
# Now, let us enqueue four more tasks (which will receive IDs 2, 3, 4, and 5), each created with different
# positional arguments by using `starmap`, but have **no more than two of those** run concurrently.
# Since we set our pool size to 3, and already have two tasks working within the pool,
# only the first one of these will start immediately (and receive ID 2).
# The second one will start (with ID 3), only once there is room in the pool,
# which -- in this example -- will be the case after ID 2 ends;
# until then the `starmap` method call **will block**!
# Once there is room in the pool again, the third one will start immediately (and receive ID 4).
# The last one will start (with ID 5) **only** once there is room in the pool **and** no more than one of these
# last four tasks is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
print("Calling `starmap`...")
await pool.starmap(other_work, args_list, num_tasks=2)
print("`starmap` returned")
# Now we close the pool, so that we can safely await all our tasks.
pool.close()
# Finally, we block, until all tasks have ended.
print("Called `gather`")
await pool.gather()
print("Done.")
if __name__ == '__main__':
asyncio.run(main())
```
### Output
Additional comments for the output are provided with `<---` next to the output lines.
(Keep in mind that the logger and `print` asynchronously write to `stdout`.)
```
TaskPool-0 initialized
Started TaskPool-0_Task-0
Started TaskPool-0_Task-1
Called `apply`
work with 100
work with 100
Calling `starmap`... <--- notice that this blocks as expected
Started TaskPool-0_Task-2
work with 110
work with 110
other_work with 0
other_work with 1
work with 120
work with 120
other_work with 2
other_work with 3
work with 130
work with 130
other_work with 4
other_work with 5
work with 140
work with 140
other_work with 6
other_work with 7
work with 150
work with 150
other_work with 8
Ended TaskPool-0_Task-2 <--- here Task-2 makes room in the pool and unblocks `main()`
TaskPool-0 is closed!
Started TaskPool-0_Task-3
other_work with 9
`starmap` returned
Called `gather`
work with 160
work with 160
other_work with 10
other_work with 11
work with 170
work with 170
other_work with 12
other_work with 13
work with 180
work with 180
other_work with 14
other_work with 15
Ended TaskPool-0_Task-0
Ended TaskPool-0_Task-1 <--- even though there is room in the pool now, Task-5 will not start
Started TaskPool-0_Task-4
work with 190
work with 190
other_work with 16
other_work with 20
other_work with 17
other_work with 21
other_work with 18
other_work with 22
other_work with 19
Ended TaskPool-0_Task-3 <--- now that only Task-4 is left, Task-5 will start
Started TaskPool-0_Task-5
other_work with 23
other_work with 30
other_work with 24
other_work with 31
other_work with 25
other_work with 32
other_work with 26
other_work with 33
other_work with 27
other_work with 34
other_work with 28
other_work with 35
Ended TaskPool-0_Task-4
other_work with 29
other_work with 36
other_work with 37
other_work with 38
other_work with 39
Done.
Ended TaskPool-0_Task-5
```