Compare commits

...

5 Commits

8 changed files with 348 additions and 96 deletions

View File

@ -14,6 +14,11 @@ See [USAGE.md](usage/USAGE.md)
Python Version 3.8+, tested on Linux
## Testing
Install `dev` dependencies or just manually install `coverage` with `pip`.
Run the [`coverage.sh`](coverage.sh) shell script to execute all unit tests and receive the coverage report.
## Building from source
Run `python -m build`

View File

@ -1,6 +1,6 @@
[metadata]
name = asyncio-taskpool
version = 0.1.6
version = 0.2.0
author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks

View File

@ -2,7 +2,7 @@ class PoolException(Exception):
pass
class PoolIsClosed(PoolException):
class PoolIsLocked(PoolException):
pass
@ -22,7 +22,7 @@ class InvalidTaskID(PoolException):
pass
class PoolStillOpen(PoolException):
class PoolStillUnlocked(PoolException):
pass

View File

@ -31,7 +31,7 @@ class BaseTaskPool:
"""Initializes the necessary internal attributes and adds the new pool to the general pools list."""
self._enough_room: Semaphore = Semaphore()
self.pool_size = pool_size
self._open: bool = True
self._locked: bool = False
self._counter: int = 0
self._running: Dict[int, Task] = {}
self._cancelled: Dict[int, Task] = {}
@ -71,9 +71,21 @@ class BaseTaskPool:
self._pool_size = value
@property
def is_open(self) -> bool:
"""Returns `True` if more the pool has not been closed yet."""
return self._open
def is_locked(self) -> bool:
"""Returns `True` if more the pool has been locked (see below)."""
return self._locked
def lock(self) -> None:
"""Disallows any more tasks to be started in the pool."""
if not self._locked:
self._locked = True
log.info("%s is locked!", str(self))
def unlock(self) -> None:
"""Allows new tasks to be started in the pool."""
if self._locked:
self._locked = False
log.info("%s was unlocked.", str(self))
@property
def num_running(self) -> int:
@ -187,7 +199,7 @@ class BaseTaskPool:
finally:
await self._task_ending(task_id, custom_callback=end_callback)
async def _start_task(self, awaitable: Awaitable, ignore_closed: bool = False, end_callback: EndCallbackT = None,
async def _start_task(self, awaitable: Awaitable, ignore_lock: bool = False, end_callback: EndCallbackT = None,
cancel_callback: CancelCallbackT = None) -> int:
"""
Starts a coroutine as a new task in the pool.
@ -197,8 +209,8 @@ class BaseTaskPool:
Args:
awaitable:
The actual coroutine to be run within the task pool.
ignore_closed (optional):
If `True`, even if the pool is closed, the task will still be started.
ignore_lock (optional):
If `True`, even if the pool is locked, the task will still be started.
end_callback (optional):
A callback to execute after the task has ended.
It is run with the task's ID as its only positional argument.
@ -208,12 +220,12 @@ class BaseTaskPool:
Raises:
`asyncio_taskpool.exceptions.NotCoroutine` if `awaitable` is not a coroutine.
`asyncio_taskpool.exceptions.PoolIsClosed` if the pool has been closed and `ignore_closed` is `False`.
`asyncio_taskpool.exceptions.PoolIsLocked` if the pool has been locked and `ignore_lock` is `False`.
"""
if not iscoroutine(awaitable):
raise exceptions.NotCoroutine(f"Not awaitable: {awaitable}")
if not (self.is_open or ignore_closed):
raise exceptions.PoolIsClosed("Cannot start new tasks")
if self._locked and not ignore_lock:
raise exceptions.PoolIsLocked("Cannot start new tasks")
await self._enough_room.acquire()
task_id = self._counter
self._counter += 1
@ -303,16 +315,11 @@ class BaseTaskPool:
self._interrupt_flag.clear()
return results
def close(self) -> None:
"""Disallows any more tasks to be started in the pool."""
self._open = False
log.info("%s is closed!", str(self))
async def gather(self, return_exceptions: bool = False):
"""
Calls `asyncio.gather` on **all** tasks from the pool, returns their results, and forgets the tasks.
The `close()` method must have been called prior to this.
The `lock()` method must have been called prior to this.
Note that there may be an unknown number of coroutine functions "queued" to be run as tasks.
This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number smaller
@ -327,10 +334,10 @@ class BaseTaskPool:
return_exceptions (optional): Passed directly into `gather`.
Raises:
`asyncio_taskpool.exceptions.PoolStillOpen` if the pool has not been closed yet.
`asyncio_taskpool.exceptions.PoolStillUnlocked` if the pool has not been locked yet.
"""
if self._open:
raise exceptions.PoolStillOpen("Pool must be closed, before tasks can be gathered")
if not self._locked:
raise exceptions.PoolStillUnlocked("Pool must be locked, before tasks can be gathered")
await gather(*self._before_gathering)
results = await gather(*self._ended.values(), *self._cancelled.values(), *self._running.values(),
return_exceptions=return_exceptions)
@ -415,7 +422,7 @@ class TaskPool(BaseTaskPool):
Raises:
`NotCoroutine` if `func` is not a coroutine function.
`PoolIsClosed` if the pool has been closed already.
`PoolIsLocked` if the pool has been locked already.
"""
ids = await gather(*(self._apply_one(func, args, kwargs, end_callback, cancel_callback) for _ in range(num)))
# TODO: for some reason PyCharm wrongly claims that `gather` returns a tuple of exceptions
@ -438,7 +445,7 @@ class TaskPool(BaseTaskPool):
break
await q.put(arg) # This blocks as long as the queue is full.
async def _queue_consumer(self, q: Queue, func: CoroutineFunc, arg_stars: int = 0,
async def _queue_consumer(self, q: Queue, first_batch_started: Event, func: CoroutineFunc, arg_stars: int = 0,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
"""
Wrapper around the `_start_task()` taking the next element from the arguments queue set up in `_map()`.
@ -447,6 +454,9 @@ class TaskPool(BaseTaskPool):
Args:
q:
The queue of function arguments to consume for starting the next task.
first_batch_started:
The event flag to wait for, before launching the next consumer.
It can only set by the `_map()` method, which happens after the first batch of task has been started.
func:
The coroutine function to use for spawning the tasks within the task pool.
arg_stars (optional):
@ -465,16 +475,18 @@ class TaskPool(BaseTaskPool):
try:
await self._start_task(
star_function(func, arg, arg_stars=arg_stars),
ignore_closed=True,
end_callback=partial(TaskPool._queue_callback, self, q=q, func=func, arg_stars=arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback),
ignore_lock=True,
end_callback=partial(TaskPool._queue_callback, self, q=q, first_batch_started=first_batch_started,
func=func, arg_stars=arg_stars, end_callback=end_callback,
cancel_callback=cancel_callback),
cancel_callback=cancel_callback
)
finally:
q.task_done()
async def _queue_callback(self, task_id: int, q: Queue, func: CoroutineFunc, arg_stars: int = 0,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
async def _queue_callback(self, task_id: int, q: Queue, first_batch_started: Event, func: CoroutineFunc,
arg_stars: int = 0, end_callback: EndCallbackT = None,
cancel_callback: CancelCallbackT = None) -> None:
"""
Wrapper around an end callback function passed into the `_map()` method.
Triggers the next `_queue_consumer` with the same arguments.
@ -484,6 +496,9 @@ class TaskPool(BaseTaskPool):
The ID of the ending task.
q:
The queue of function arguments to consume for starting the next task.
first_batch_started:
The event flag to wait for, before launching the next consumer.
It can only set by the `_map()` method, which happens after the first batch of task has been started.
func:
The coroutine function to use for spawning the tasks within the task pool.
arg_stars (optional):
@ -495,7 +510,9 @@ class TaskPool(BaseTaskPool):
The callback that was specified to execute after cancellation of the task (and the next one).
It is run with the `task_id` as its only positional argument.
"""
await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)
await first_batch_started.wait()
await self._queue_consumer(q, first_batch_started, func, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)
await execute_optional(end_callback, args=(task_id,))
def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue:
@ -569,15 +586,23 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument.
Raises:
`asyncio_taskpool.exceptions.PoolIsClosed` if the pool has been closed.
`asyncio_taskpool.exceptions.PoolIsLocked` if the pool has been locked.
"""
if not self.is_open:
raise exceptions.PoolIsClosed("Cannot start new tasks")
if not self._locked:
raise exceptions.PoolIsLocked("Cannot start new tasks")
args_queue = self._set_up_args_queue(args_iter, num_tasks)
# We need a flag to ensure that starting all tasks from the first batch here will not be blocked by the
# `_queue_callback` triggered by one or more of them.
# This could happen, e.g. if the pool has just enough room for one more task, but the queue here contains more
# than one element, and the pool remains full until after the first task of the first batch ends. Then the
# callback might trigger the next `_queue_consumer` before this method can, which will keep it blocked.
first_batch_started = Event()
for _ in range(args_queue.qsize()):
# This is where blocking can occur, if the pool is full.
await self._queue_consumer(args_queue, func,
await self._queue_consumer(args_queue, first_batch_started, func,
arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)
# Now the callbacks can immediately trigger more tasks.
first_batch_started.set()
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_tasks: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
@ -610,7 +635,7 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument.
Raises:
`PoolIsClosed` if the pool has been closed.
`PoolIsLocked` if the pool has been locked.
`NotCoroutine` if `func` is not a coroutine function.
"""
await self._map(func, arg_iter, arg_stars=0, num_tasks=num_tasks,

67
tests/test_helpers.py Normal file
View File

@ -0,0 +1,67 @@
from unittest import IsolatedAsyncioTestCase
from unittest.mock import MagicMock, AsyncMock, NonCallableMagicMock
from asyncio_taskpool import helpers
class HelpersTestCase(IsolatedAsyncioTestCase):
async def test_execute_optional(self):
f, args, kwargs = NonCallableMagicMock(), [1, 2], None
a = [f, args, kwargs] # to avoid IDE nagging
self.assertIsNone(await helpers.execute_optional(*a))
expected_output = 'foo'
f = MagicMock(return_value=expected_output)
output = await helpers.execute_optional(f, args, kwargs)
self.assertEqual(expected_output, output)
f.assert_called_once_with(*args)
f.reset_mock()
kwargs = {'a': 100, 'b': 200}
output = await helpers.execute_optional(f, args, kwargs)
self.assertEqual(expected_output, output)
f.assert_called_once_with(*args, **kwargs)
f = AsyncMock(return_value=expected_output)
output = await helpers.execute_optional(f, args, kwargs)
self.assertEqual(expected_output, output)
f.assert_awaited_once_with(*args, **kwargs)
def test_star_function(self):
expected_output = 'bar'
f = MagicMock(return_value=expected_output)
a = (1, 2, 3)
stars = 0
output = helpers.star_function(f, a, stars)
self.assertEqual(expected_output, output)
f.assert_called_once_with(a)
f.reset_mock()
stars = 1
output = helpers.star_function(f, a, stars)
self.assertEqual(expected_output, output)
f.assert_called_once_with(*a)
f.reset_mock()
a = {'a': 1, 'b': 2}
stars = 2
output = helpers.star_function(f, a, stars)
self.assertEqual(expected_output, output)
f.assert_called_once_with(**a)
with self.assertRaises(ValueError):
helpers.star_function(f, a, 3)
with self.assertRaises(ValueError):
helpers.star_function(f, a, -1)
with self.assertRaises(ValueError):
helpers.star_function(f, a, 123456789)
async def test_join_queue(self):
mock_join = AsyncMock()
mock_queue = MagicMock(join=mock_join)
self.assertIsNone(await helpers.join_queue(mock_queue))
mock_join.assert_awaited_once_with()

View File

@ -68,7 +68,7 @@ class BaseTaskPoolTestCase(CommonTestCase):
def test_init(self):
self.assertIsInstance(self.task_pool._enough_room, asyncio.locks.Semaphore)
self.assertTrue(self.task_pool._open)
self.assertFalse(self.task_pool._locked)
self.assertEqual(0, self.task_pool._counter)
self.assertDictEqual(EMPTY_DICT, self.task_pool._running)
self.assertDictEqual(EMPTY_DICT, self.task_pool._cancelled)
@ -103,9 +103,23 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.task_pool.pool_size = new_size = 69
self.assertEqual(new_size, self.task_pool._pool_size)
def test_is_open(self):
self.task_pool._open = FOO
self.assertEqual(FOO, self.task_pool.is_open)
def test_is_locked(self):
self.task_pool._locked = FOO
self.assertEqual(FOO, self.task_pool.is_locked)
def test_lock(self):
assert not self.task_pool._locked
self.task_pool.lock()
self.assertTrue(self.task_pool._locked)
self.task_pool.lock()
self.assertTrue(self.task_pool._locked)
def test_unlock(self):
self.task_pool._locked = True
self.task_pool.unlock()
self.assertFalse(self.task_pool._locked)
self.task_pool.unlock()
self.assertFalse(self.task_pool._locked)
def test_num_running(self):
self.task_pool._running = ['foo', 'bar', 'baz']
@ -211,11 +225,9 @@ class BaseTaskPoolTestCase(CommonTestCase):
@patch.object(pool, 'create_task')
@patch.object(pool.BaseTaskPool, '_task_wrapper', new_callable=MagicMock)
@patch.object(pool.BaseTaskPool, '_task_name', return_value=FOO)
@patch.object(pool.BaseTaskPool, 'is_open', new_callable=PropertyMock)
async def test__start_task(self, mock_is_open: MagicMock, mock__task_name: MagicMock,
mock__task_wrapper: AsyncMock, mock_create_task: MagicMock):
async def test__start_task(self, mock__task_name: MagicMock, mock__task_wrapper: AsyncMock,
mock_create_task: MagicMock):
def reset_mocks() -> None:
mock_is_open.reset_mock()
mock__task_name.reset_mock()
mock__task_wrapper.reset_mock()
mock_create_task.reset_mock()
@ -226,31 +238,27 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.task_pool._counter = count = 123
self.task_pool._enough_room._value = room = 123
with self.assertRaises(exceptions.NotCoroutine):
await self.task_pool._start_task(MagicMock(), end_callback=mock_end_cb, cancel_callback=mock_cancel_cb)
def check_nothing_changed() -> None:
self.assertEqual(count, self.task_pool._counter)
self.assertNotIn(count, self.task_pool._running)
self.assertEqual(room, self.task_pool._enough_room._value)
mock_is_open.assert_not_called()
mock__task_name.assert_not_called()
mock__task_wrapper.assert_not_called()
mock_create_task.assert_not_called()
reset_mocks()
mock_is_open.return_value = ignore_closed = False
with self.assertRaises(exceptions.NotCoroutine):
await self.task_pool._start_task(MagicMock(), end_callback=mock_end_cb, cancel_callback=mock_cancel_cb)
check_nothing_changed()
self.task_pool._locked = True
ignore_closed = False
mock_awaitable = mock_coroutine()
with self.assertRaises(exceptions.PoolIsClosed):
with self.assertRaises(exceptions.PoolIsLocked):
await self.task_pool._start_task(mock_awaitable, ignore_closed,
end_callback=mock_end_cb, cancel_callback=mock_cancel_cb)
await mock_awaitable
self.assertEqual(count, self.task_pool._counter)
self.assertNotIn(count, self.task_pool._running)
self.assertEqual(room, self.task_pool._enough_room._value)
mock_is_open.assert_called_once_with()
mock__task_name.assert_not_called()
mock__task_wrapper.assert_not_called()
mock_create_task.assert_not_called()
reset_mocks()
check_nothing_changed()
ignore_closed = True
mock_awaitable = mock_coroutine()
@ -261,7 +269,6 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.assertEqual(count + 1, self.task_pool._counter)
self.assertEqual(mock_task, self.task_pool._running[count])
self.assertEqual(room - 1, self.task_pool._enough_room._value)
mock_is_open.assert_called_once_with()
mock__task_name.assert_called_once_with(count)
mock__task_wrapper.assert_called_once_with(mock_awaitable, count, mock_end_cb, mock_cancel_cb)
mock_create_task.assert_called_once_with(mock_wrapped, name=FOO)
@ -280,7 +287,6 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.assertEqual(count + 1, self.task_pool._counter)
self.assertNotIn(count, self.task_pool._running)
self.assertEqual(room, self.task_pool._enough_room._value)
mock_is_open.assert_called_once_with()
mock__task_name.assert_called_once_with(count)
mock__task_wrapper.assert_called_once_with(mock_awaitable, count, mock_end_cb, mock_cancel_cb)
mock_create_task.assert_called_once_with(mock_wrapped, name=FOO)
@ -345,11 +351,6 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.assertDictEqual(self.task_pool._ended, EMPTY_DICT)
self.assertDictEqual(self.task_pool._cancelled, EMPTY_DICT)
def test_close(self):
assert self.task_pool._open
self.task_pool.close()
self.assertFalse(self.task_pool._open)
async def test_gather(self):
test_exception = TestException()
mock_ended_func, mock_cancelled_func = AsyncMock(return_value=FOO), AsyncMock(side_effect=test_exception)
@ -361,8 +362,8 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.task_pool._running = running = {789: mock_running_func()}
self.task_pool._interrupt_flag.set()
assert self.task_pool._open
with self.assertRaises(exceptions.PoolStillOpen):
assert not self.task_pool._locked
with self.assertRaises(exceptions.PoolStillUnlocked):
await self.task_pool.gather()
self.assertDictEqual(self.task_pool._ended, ended)
self.assertDictEqual(self.task_pool._cancelled, cancelled)
@ -370,7 +371,7 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.assertListEqual(self.task_pool._before_gathering, before_gather)
self.assertTrue(self.task_pool._interrupt_flag.is_set())
self.task_pool._open = False
self.task_pool._locked = True
def check_assertions(output) -> None:
self.assertListEqual([FOO, test_exception, BAR], output)
@ -446,20 +447,20 @@ class TaskPoolTestCase(CommonTestCase):
q, arg = Queue(), 420.69
q.put_nowait(arg)
mock_func, stars = MagicMock(), 3
end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_consumer(q, mock_func, stars, end_cb, cancel_cb))
mock_flag, end_cb, cancel_cb = MagicMock(), MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_consumer(q, mock_flag, mock_func, stars, end_cb, cancel_cb))
self.assertTrue(q.empty())
mock__start_task.assert_awaited_once_with(awaitable, ignore_closed=True,
mock__start_task.assert_awaited_once_with(awaitable, ignore_lock=True,
end_callback=queue_callback, cancel_callback=cancel_cb)
mock_star_function.assert_called_once_with(mock_func, arg, arg_stars=stars)
mock_partial.assert_called_once_with(pool.TaskPool._queue_callback, self.task_pool,
q=q, func=mock_func, arg_stars=stars,
q=q, first_batch_started=mock_flag, func=mock_func, arg_stars=stars,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__start_task.reset_mock()
mock_star_function.reset_mock()
mock_partial.reset_mock()
self.assertIsNone(await self.task_pool._queue_consumer(q, mock_func, stars, end_cb, cancel_cb))
self.assertIsNone(await self.task_pool._queue_consumer(q, mock_flag, mock_func, stars, end_cb, cancel_cb))
self.assertTrue(q.empty())
mock__start_task.assert_not_awaited()
mock_star_function.assert_not_called()
@ -470,9 +471,13 @@ class TaskPoolTestCase(CommonTestCase):
async def test__queue_callback(self, mock__queue_consumer: AsyncMock, mock_execute_optional: AsyncMock):
task_id, mock_q = 420, MagicMock()
mock_func, stars = MagicMock(), 3
mock_wait = AsyncMock()
mock_flag = MagicMock(wait=mock_wait)
end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_callback(task_id, mock_q, mock_func, stars, end_cb, cancel_cb))
mock__queue_consumer.assert_awaited_once_with(mock_q, mock_func, stars,
self.assertIsNone(await self.task_pool._queue_callback(task_id, mock_q, mock_flag, mock_func, stars,
end_callback=end_cb, cancel_callback=cancel_cb))
mock_wait.assert_awaited_once_with()
mock__queue_consumer.assert_awaited_once_with(mock_q, mock_flag, mock_func, stars,
end_callback=end_cb, cancel_callback=cancel_cb)
mock_execute_optional.assert_awaited_once_with(end_cb, args=(task_id,))
@ -521,32 +526,33 @@ class TaskPoolTestCase(CommonTestCase):
mock__queue_producer.assert_not_called()
mock_create_task.assert_not_called()
@patch.object(pool, 'Event')
@patch.object(pool.TaskPool, '_queue_consumer')
@patch.object(pool.TaskPool, '_set_up_args_queue')
@patch.object(pool.TaskPool, 'is_open', new_callable=PropertyMock)
async def test__map(self, mock_is_open: MagicMock, mock__set_up_args_queue: MagicMock,
mock__queue_consumer: AsyncMock):
async def test__map(self, mock__set_up_args_queue: MagicMock, mock__queue_consumer: AsyncMock,
mock_event_cls: MagicMock):
qsize = 4
mock__set_up_args_queue.return_value = mock_q = MagicMock(qsize=MagicMock(return_value=qsize))
mock_flag_set = MagicMock()
mock_event_cls.return_value = mock_flag = MagicMock(set=mock_flag_set)
mock_func, stars = MagicMock(), 3
args_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2
end_cb, cancel_cb = MagicMock(), MagicMock()
mock_is_open.return_value = False
with self.assertRaises(exceptions.PoolIsClosed):
self.task_pool._locked = False
with self.assertRaises(exceptions.PoolIsLocked):
await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb)
mock_is_open.assert_called_once_with()
mock__set_up_args_queue.assert_not_called()
mock__queue_consumer.assert_not_awaited()
mock_flag_set.assert_not_called()
mock_is_open.reset_mock()
mock_is_open.return_value = True
self.task_pool._locked = True
self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb))
mock__set_up_args_queue.assert_called_once_with(args_iter, num_tasks)
mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_func, arg_stars=stars,
mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_flag, mock_func, arg_stars=stars,
end_callback=end_cb, cancel_callback=cancel_cb)])
mock_flag_set.assert_called_once_with()
@patch.object(pool.TaskPool, '_map')
async def test_map(self, mock__map: AsyncMock):

View File

@ -2,18 +2,18 @@
## Minimal example for `SimpleTaskPool`
The minimum required setup is a "worker" coroutine function that can do something asynchronously, a main coroutine function that sets up the `SimpleTaskPool` and starts/stops the tasks as desired, eventually awaiting them all.
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python
import logging
import asyncio
from asyncio_taskpool.pool import SimpleTaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
@ -38,7 +38,7 @@ async def main() -> None:
await pool.start() # launches work task 3
await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.stop(2) # cancels tasks 3 and 2
pool.close() # required for the last line
pool.lock() # required for the last line
await pool.gather() # awaits all tasks, then flushes the pool
@ -60,7 +60,7 @@ did 1
did 1
did 1
did 0
SimpleTaskPool-0 is closed!
SimpleTaskPool-0 is locked!
Cancelling SimpleTaskPool-0_Task-3 ...
Cancelled SimpleTaskPool-0_Task-3
Ended SimpleTaskPool-0_Task-3
@ -77,6 +77,155 @@ did 4
did 4
```
## Advanced example
## Advanced example for `TaskPool`
...
This time, we want to start tasks from _different_ coroutine functions **and** with _different_ arguments. For this we need an instance of the more generalized `TaskPool` class.
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python
import logging
import asyncio
from asyncio_taskpool.pool import TaskPool
logging.getLogger().setLevel(logging.NOTSET)
logging.getLogger('asyncio_taskpool').addHandler(logging.StreamHandler())
async def work(start: int, stop: int, step: int = 1) -> None:
"""Pseudo-worker function counting through a range with a second of sleep in between each iteration."""
for i in range(start, stop, step):
await asyncio.sleep(1)
print("work with", i)
async def other_work(a: int, b: int) -> None:
"""Different pseudo-worker counting through a range with half a second of sleep in between each iteration."""
for i in range(a, b):
await asyncio.sleep(0.5)
print("other_work with", i)
async def main() -> None:
# Initialize a new task pool instance and limit its size to 3 tasks.
pool = TaskPool(3)
# Queue up two tasks (IDs 0 and 1) to run concurrently (with the same positional arguments).
print("Called `apply`")
await pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2)
# Let the tasks work for a bit.
await asyncio.sleep(1.5)
# Now, let us enqueue four more tasks (which will receive IDs 2, 3, 4, and 5), each created with different
# positional arguments by using `starmap`, but have **no more than two of those** run concurrently.
# Since we set our pool size to 3, and already have two tasks working within the pool,
# only the first one of these will start immediately (and receive ID 2).
# The second one will start (with ID 3), only once there is room in the pool,
# which -- in this example -- will be the case after ID 2 ends;
# until then the `starmap` method call **will block**!
# Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5)
# **only** once there is room in the pool **and** no more than one of these last four tasks is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
print("Calling `starmap`...")
await pool.starmap(other_work, args_list, num_tasks=2)
print("`starmap` returned")
# Now we lock the pool, so that we can safely await all our tasks.
pool.lock()
# Finally, we block, until all tasks have ended.
print("Called `gather`")
await pool.gather()
print("Done.")
if __name__ == '__main__':
asyncio.run(main())
```
### Output
Additional comments for the output are provided with `<---` next to the output lines.
(Keep in mind that the logger and `print` asynchronously write to `stdout`.)
```
TaskPool-0 initialized
Started TaskPool-0_Task-0
Started TaskPool-0_Task-1
Called `apply`
work with 100
work with 100
Calling `starmap`... <--- notice that this blocks as expected
Started TaskPool-0_Task-2
work with 110
work with 110
other_work with 0
other_work with 1
work with 120
work with 120
other_work with 2
other_work with 3
work with 130
work with 130
other_work with 4
other_work with 5
work with 140
work with 140
other_work with 6
other_work with 7
work with 150
work with 150
other_work with 8
Ended TaskPool-0_Task-2 <--- here Task-2 makes room in the pool and unblocks `main()`
TaskPool-0 is locked!
Started TaskPool-0_Task-3
other_work with 9
`starmap` returned
Called `gather`
work with 160
work with 160
other_work with 10
other_work with 11
work with 170
work with 170
other_work with 12
other_work with 13
work with 180
work with 180
other_work with 14
other_work with 15
Ended TaskPool-0_Task-0
Ended TaskPool-0_Task-1 <--- even though there is room in the pool now, Task-5 will not start
Started TaskPool-0_Task-4
work with 190
work with 190
other_work with 16
other_work with 20
other_work with 17
other_work with 21
other_work with 18
other_work with 22
other_work with 19
Ended TaskPool-0_Task-3 <--- now that only Task-4 is left, Task-5 will start
Started TaskPool-0_Task-5
other_work with 23
other_work with 30
other_work with 24
other_work with 31
other_work with 25
other_work with 32
other_work with 26
other_work with 33
other_work with 27
other_work with 34
other_work with 28
other_work with 35
Ended TaskPool-0_Task-4
other_work with 29
other_work with 36
other_work with 37
other_work with 38
other_work with 39
Done.
Ended TaskPool-0_Task-5
```

View File

@ -48,12 +48,12 @@ async def main() -> None:
control_server_task = await UnixControlServer(pool, path='/tmp/py_asyncio_taskpool.sock').serve_forever()
# We block until `.task_done()` has been called once by our workers for every item placed into the queue.
await q.join()
# Since we don't need any "work" done anymore, we can close our control server by cancelling the task.
# Since we don't need any "work" done anymore, we can lock our control server by cancelling the task.
control_server_task.cancel()
# Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left,
# we can now safely cancel their tasks.
pool.stop_all()
pool.close()
pool.lock()
# Finally we allow for all tasks to do do their cleanup, if they need to do any, upon being cancelled.
# We block until they all return or raise an exception, but since we are not interested in any of their exceptions,
# we just silently collect their exceptions along with their return values.