renamed num_tasks in the map-methods to group_size; reworded/extended the docstrings

This commit is contained in:
Daniil Fajnberg 2022-02-19 16:02:50 +01:00
parent d0c0177681
commit 4994135062
4 changed files with 67 additions and 52 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.3.4 version = 0.3.5
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -319,8 +319,8 @@ class BaseTaskPool:
""" """
Cancels all tasks still running within the pool. Cancels all tasks still running within the pool.
Note that there may be an unknown number of coroutine functions "queued" to be run as tasks. Note that there may be an unknown number of coroutine functions already "queued" to be run as tasks.
This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number smaller This can happen, if for example the `TaskPool.map` method was called with `group_size` set to a number smaller
than the number of arguments from `args_iter`. than the number of arguments from `args_iter`.
In this case, those already running will be cancelled, while the following will **never even start**. In this case, those already running will be cancelled, while the following will **never even start**.
@ -355,8 +355,8 @@ class BaseTaskPool:
The `lock()` method must have been called prior to this. The `lock()` method must have been called prior to this.
Note that there may be an unknown number of coroutine functions "queued" to be run as tasks. Note that there may be an unknown number of coroutine functions already "queued" to be run as tasks.
This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number smaller This can happen, if for example the `TaskPool.map` method was called with `group_size` set to a number smaller
than the number of arguments from `args_iter`. than the number of arguments from `args_iter`.
In this case, calling `cancel_all()` prior to this, will prevent those tasks from starting and potentially In this case, calling `cancel_all()` prior to this, will prevent those tasks from starting and potentially
blocking this method. Otherwise it will wait until they all have started. blocking this method. Otherwise it will wait until they all have started.
@ -552,10 +552,10 @@ class TaskPool(BaseTaskPool):
def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue: def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue:
""" """
Helper function for `_map()`. Helper function for `_map()`.
Takes the iterable of function arguments `args_iter` and adds up to `num_tasks` to a new `asyncio.Queue`. Takes the iterable of function arguments `args_iter` and adds up to `group_size` to a new `asyncio.Queue`.
The queue's `join()` method is added to the pool's `_before_gathering` list and the queue is returned. The queue's `join()` method is added to the pool's `_before_gathering` list and the queue is returned.
If the iterable contains less than `num_tasks` elements, nothing else happens; otherwise the `_queue_producer` If the iterable contains less than `group_size` elements, nothing else happens; otherwise the `_queue_producer`
is started as a separate task with the arguments queue and and iterator of the remaining arguments. is started as a separate task with the arguments queue and and iterator of the remaining arguments.
Args: Args:
@ -567,7 +567,7 @@ class TaskPool(BaseTaskPool):
Returns: Returns:
The newly created and filled arguments queue for spawning new tasks. The newly created and filled arguments queue for spawning new tasks.
""" """
# Setting the `maxsize` of the queue to `num_tasks` will ensure that no more than `num_tasks` tasks will run # Setting the `maxsize` of the queue to `group_size` will ensure that no more than `group_size` tasks will run
# concurrently because the size of the queue is what will determine the number of immediately started tasks in # concurrently because the size of the queue is what will determine the number of immediately started tasks in
# the `_map()` method and each of those will only ever start (at most) one other task upon ending. # the `_map()` method and each of those will only ever start (at most) one other task upon ending.
args_queue = Queue(maxsize=num_tasks) args_queue = Queue(maxsize=num_tasks)
@ -575,12 +575,12 @@ class TaskPool(BaseTaskPool):
args_iter = iter(args_iter) args_iter = iter(args_iter)
try: try:
# Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of # Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of
# tasks, which will be at most `num_tasks` (meaning the queue will be full). # tasks, which will be at most `group_size` (meaning the queue will be full).
for i in range(num_tasks): for i in range(num_tasks):
args_queue.put_nowait(next(args_iter)) args_queue.put_nowait(next(args_iter))
except StopIteration: except StopIteration:
# If we get here, this means that the number of elements in the arguments iterator was less than the # If we get here, this means that the number of elements in the arguments iterator was less than the
# specified `num_tasks`. Still, the number of tasks to start immediately will be the size of the queue. # specified `group_size`. Still, the number of tasks to start immediately will be the size of the queue.
# The `_queue_producer` won't be necessary, since we already put all the elements in the queue. # The `_queue_producer` won't be necessary, since we already put all the elements in the queue.
pass pass
else: else:
@ -591,17 +591,27 @@ class TaskPool(BaseTaskPool):
create_task(self._queue_producer(args_queue, args_iter)) create_task(self._queue_producer(args_queue, args_iter))
return args_queue return args_queue
async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1, async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, group_size: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
Creates coroutines with arguments from a supplied iterable and runs them as new tasks in the pool in batches. Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
TODO: If task groups are implemented, consider adding all tasks from one call of this method to the same group
and referring to "group size" rather than chunk/batch size.
Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being an element from the iterable.
This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `args_iter`.
It sets up an internal arguments queue which is continuously filled while consuming the arguments iterable. The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at
any given moment in time. Assuming the number of elements produced by `args_iter` is greater than `group_size`,
this method will block **only** until the first `group_size` tasks have been **started**, before returning.
(If the number of elements from `args_iter` is smaller than `group_size`, this method will return as soon as
all of them have been started.)
As soon as one task from this first batch ends, it triggers the start of a new task (assuming there is room in
the pool), which consumes the next element from the arguments iterable. If the size of the pool never imposes a
limit, this ensures that the number of tasks running concurrently as a result of this method call is always
equal to `group_size` (except for when `args_iter` is exhausted of course).
Thus, this method blocks, **only if** there is not enough room in the pool for the first batch of new tasks.
This method sets up an internal arguments queue which is continuously filled while consuming the `args_iter`.
Args: Args:
func: func:
@ -610,8 +620,8 @@ class TaskPool(BaseTaskPool):
The iterable of arguments; each element is to be passed into a `func` call when spawning a new task. The iterable of arguments; each element is to be passed into a `func` call when spawning a new task.
arg_stars (optional): arg_stars (optional):
Whether or not to unpack an element from `args_iter` using stars; must be 0, 1, or 2. Whether or not to unpack an element from `args_iter` using stars; must be 0, 1, or 2.
num_tasks (optional): group_size (optional):
The maximum number of the new tasks to run concurrently. The maximum number new tasks spawned by this method to run concurrently. Defaults to 1.
end_callback (optional): end_callback (optional):
A callback to execute after a task has ended. A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
@ -624,7 +634,7 @@ class TaskPool(BaseTaskPool):
""" """
if not self._locked: if not self._locked:
raise exceptions.PoolIsLocked("Cannot start new tasks") raise exceptions.PoolIsLocked("Cannot start new tasks")
args_queue = self._set_up_args_queue(args_iter, num_tasks) args_queue = self._set_up_args_queue(args_iter, group_size)
# We need a flag to ensure that starting all tasks from the first batch here will not be blocked by the # We need a flag to ensure that starting all tasks from the first batch here will not be blocked by the
# `_queue_callback` triggered by one or more of them. # `_queue_callback` triggered by one or more of them.
# This could happen, e.g. if the pool has just enough room for one more task, but the queue here contains more # This could happen, e.g. if the pool has just enough room for one more task, but the queue here contains more
@ -638,29 +648,34 @@ class TaskPool(BaseTaskPool):
# Now the callbacks can immediately trigger more tasks. # Now the callbacks can immediately trigger more tasks.
first_batch_started.set() first_batch_started.set()
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_tasks: int = 1, async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
An asyncio-task-based equivalent of the `multiprocessing.pool.Pool.map` method. An asyncio-task-based equivalent of the `multiprocessing.pool.Pool.map` method.
Creates coroutines with arguments from a supplied iterable and runs them as new tasks in the pool in batches. Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Each coroutine looks like `func(arg)`, `arg` being an element from the iterable. Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`.
Once the first batch of tasks has started to run, this method returns. The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at
As soon as on of them finishes, it triggers the start of a new task (assuming there is room in the pool) any given moment in time. Assuming the number of elements produced by `arg_iter` is greater than `group_size`,
consuming the next element from the arguments iterable. this method will block **only** until the first `group_size` tasks have been **started**, before returning.
If the size of the pool never imposes a limit, this ensures that there is almost continuously the desired number (If the number of elements from `arg_iter` is smaller than `group_size`, this method will return as soon as
of tasks from this call concurrently running within the pool. all of them have been started.)
This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. As soon as one task from this first batch ends, it triggers the start of a new task (assuming there is room in
the pool), which consumes the next element from the arguments iterable. If the size of the pool never imposes a
limit, this ensures that the number of tasks running concurrently as a result of this method call is always
equal to `group_size` (except for when `arg_iter` is exhausted of course).
Thus, this method blocks, **only if** there is not enough room in the pool for the first batch of new tasks.
Args: Args:
func: func:
The coroutine function to use for spawning the new tasks within the task pool. The coroutine function to use for spawning the new tasks within the task pool.
arg_iter: arg_iter:
The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task. The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task.
num_tasks (optional): group_size (optional):
The maximum number of the new tasks to run concurrently. The maximum number new tasks spawned by this method to run concurrently. Defaults to 1.
end_callback (optional): end_callback (optional):
A callback to execute after a task has ended. A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
@ -672,27 +687,27 @@ class TaskPool(BaseTaskPool):
`PoolIsLocked` if the pool has been locked. `PoolIsLocked` if the pool has been locked.
`NotCoroutine` if `func` is not a coroutine function. `NotCoroutine` if `func` is not a coroutine function.
""" """
await self._map(func, arg_iter, arg_stars=0, num_tasks=num_tasks, await self._map(func, arg_iter, arg_stars=0, group_size=group_size,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)
async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_tasks: int = 1, async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], group_size: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
Like `map()` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked as Like `map()` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked as
positional arguments to the function. positional arguments to the function.
Each coroutine then looks like `func(*arg)`, `arg` being an element from `args_iter`. Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`.
""" """
await self._map(func, args_iter, arg_stars=1, num_tasks=num_tasks, await self._map(func, args_iter, arg_stars=1, group_size=group_size,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)
async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_tasks: int = 1, async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], group_size: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
Like `map()` except that the elements of `kwargs_iter` are expected to be iterables themselves to be unpacked as Like `map()` except that the elements of `kwargs_iter` are expected to be iterables themselves to be unpacked as
keyword-arguments to the function. keyword-arguments to the function.
Each coroutine then looks like `func(**arg)`, `arg` being an element from `kwargs_iter`. Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.
""" """
await self._map(func, kwargs_iter, arg_stars=2, num_tasks=num_tasks, await self._map(func, kwargs_iter, arg_stars=2, group_size=group_size,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)

View File

@ -558,19 +558,19 @@ class TaskPoolTestCase(CommonTestCase):
mock_event_cls.return_value = mock_flag = MagicMock(set=mock_flag_set) mock_event_cls.return_value = mock_flag = MagicMock(set=mock_flag_set)
mock_func, stars = MagicMock(), 3 mock_func, stars = MagicMock(), 3
args_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2 args_iter, group_size = (FOO, BAR, 1, 2, 3), 2
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.task_pool._locked = False self.task_pool._locked = False
with self.assertRaises(exceptions.PoolIsLocked): with self.assertRaises(exceptions.PoolIsLocked):
await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb) await self.task_pool._map(mock_func, args_iter, stars, group_size, end_cb, cancel_cb)
mock__set_up_args_queue.assert_not_called() mock__set_up_args_queue.assert_not_called()
mock__queue_consumer.assert_not_awaited() mock__queue_consumer.assert_not_awaited()
mock_flag_set.assert_not_called() mock_flag_set.assert_not_called()
self.task_pool._locked = True self.task_pool._locked = True
self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, group_size, end_cb, cancel_cb))
mock__set_up_args_queue.assert_called_once_with(args_iter, num_tasks) mock__set_up_args_queue.assert_called_once_with(args_iter, group_size)
mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_flag, mock_func, arg_stars=stars, mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_flag, mock_func, arg_stars=stars,
end_callback=end_cb, cancel_callback=cancel_cb)]) end_callback=end_cb, cancel_callback=cancel_cb)])
mock_flag_set.assert_called_once_with() mock_flag_set.assert_called_once_with()
@ -578,28 +578,28 @@ class TaskPoolTestCase(CommonTestCase):
@patch.object(pool.TaskPool, '_map') @patch.object(pool.TaskPool, '_map')
async def test_map(self, mock__map: AsyncMock): async def test_map(self, mock__map: AsyncMock):
mock_func = MagicMock() mock_func = MagicMock()
arg_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2 arg_iter, group_size = (FOO, BAR, 1, 2, 3), 2
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool.map(mock_func, arg_iter, num_tasks, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool.map(mock_func, arg_iter, group_size, end_cb, cancel_cb))
mock__map.assert_awaited_once_with(mock_func, arg_iter, arg_stars=0, num_tasks=num_tasks, mock__map.assert_awaited_once_with(mock_func, arg_iter, arg_stars=0, group_size=group_size,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
@patch.object(pool.TaskPool, '_map') @patch.object(pool.TaskPool, '_map')
async def test_starmap(self, mock__map: AsyncMock): async def test_starmap(self, mock__map: AsyncMock):
mock_func = MagicMock() mock_func = MagicMock()
args_iter, num_tasks = ([FOO], [BAR]), 2 args_iter, group_size = ([FOO], [BAR]), 2
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool.starmap(mock_func, args_iter, num_tasks, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool.starmap(mock_func, args_iter, group_size, end_cb, cancel_cb))
mock__map.assert_awaited_once_with(mock_func, args_iter, arg_stars=1, num_tasks=num_tasks, mock__map.assert_awaited_once_with(mock_func, args_iter, arg_stars=1, group_size=group_size,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
@patch.object(pool.TaskPool, '_map') @patch.object(pool.TaskPool, '_map')
async def test_doublestarmap(self, mock__map: AsyncMock): async def test_doublestarmap(self, mock__map: AsyncMock):
mock_func = MagicMock() mock_func = MagicMock()
kwargs_iter, num_tasks = [{'a': FOO}, {'a': BAR}], 2 kwargs_iter, group_size = [{'a': FOO}, {'a': BAR}], 2
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool.doublestarmap(mock_func, kwargs_iter, num_tasks, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, end_cb, cancel_cb))
mock__map.assert_awaited_once_with(mock_func, kwargs_iter, arg_stars=2, num_tasks=num_tasks, mock__map.assert_awaited_once_with(mock_func, kwargs_iter, arg_stars=2, group_size=group_size,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)

View File

@ -130,7 +130,7 @@ async def main() -> None:
# **only** once there is room in the pool **and** no more than one of these last four tasks is running. # **only** once there is room in the pool **and** no more than one of these last four tasks is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
print("Calling `starmap`...") print("Calling `starmap`...")
await pool.starmap(other_work, args_list, num_tasks=2) await pool.starmap(other_work, args_list, group_size=2)
print("`starmap` returned") print("`starmap` returned")
# Now we lock the pool, so that we can safely await all our tasks. # Now we lock the pool, so that we can safely await all our tasks.
pool.lock() pool.lock()