From 4994135062a2008d9aee79958f9258a2c876687d Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Sat, 19 Feb 2022 16:02:50 +0100 Subject: [PATCH] renamed `num_tasks` in the map-methods to `group_size`; reworded/extended the docstrings --- setup.cfg | 2 +- src/asyncio_taskpool/pool.py | 89 +++++++++++++++++++++--------------- tests/test_pool.py | 26 +++++------ usage/USAGE.md | 2 +- 4 files changed, 67 insertions(+), 52 deletions(-) diff --git a/setup.cfg b/setup.cfg index 3be8096..0e7eb76 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = asyncio-taskpool -version = 0.3.4 +version = 0.3.5 author = Daniil Fajnberg author_email = mail@daniil.fajnberg.de description = Dynamically manage pools of asyncio tasks diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 81d579e..ed3bfb0 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -319,8 +319,8 @@ class BaseTaskPool: """ Cancels all tasks still running within the pool. - Note that there may be an unknown number of coroutine functions "queued" to be run as tasks. - This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number smaller + Note that there may be an unknown number of coroutine functions already "queued" to be run as tasks. + This can happen, if for example the `TaskPool.map` method was called with `group_size` set to a number smaller than the number of arguments from `args_iter`. In this case, those already running will be cancelled, while the following will **never even start**. @@ -355,8 +355,8 @@ class BaseTaskPool: The `lock()` method must have been called prior to this. - Note that there may be an unknown number of coroutine functions "queued" to be run as tasks. - This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number smaller + Note that there may be an unknown number of coroutine functions already "queued" to be run as tasks. + This can happen, if for example the `TaskPool.map` method was called with `group_size` set to a number smaller than the number of arguments from `args_iter`. In this case, calling `cancel_all()` prior to this, will prevent those tasks from starting and potentially blocking this method. Otherwise it will wait until they all have started. @@ -552,10 +552,10 @@ class TaskPool(BaseTaskPool): def _set_up_args_queue(self, args_iter: ArgsT, num_tasks: int) -> Queue: """ Helper function for `_map()`. - Takes the iterable of function arguments `args_iter` and adds up to `num_tasks` to a new `asyncio.Queue`. + Takes the iterable of function arguments `args_iter` and adds up to `group_size` to a new `asyncio.Queue`. The queue's `join()` method is added to the pool's `_before_gathering` list and the queue is returned. - If the iterable contains less than `num_tasks` elements, nothing else happens; otherwise the `_queue_producer` + If the iterable contains less than `group_size` elements, nothing else happens; otherwise the `_queue_producer` is started as a separate task with the arguments queue and and iterator of the remaining arguments. Args: @@ -567,7 +567,7 @@ class TaskPool(BaseTaskPool): Returns: The newly created and filled arguments queue for spawning new tasks. """ - # Setting the `maxsize` of the queue to `num_tasks` will ensure that no more than `num_tasks` tasks will run + # Setting the `maxsize` of the queue to `group_size` will ensure that no more than `group_size` tasks will run # concurrently because the size of the queue is what will determine the number of immediately started tasks in # the `_map()` method and each of those will only ever start (at most) one other task upon ending. args_queue = Queue(maxsize=num_tasks) @@ -575,12 +575,12 @@ class TaskPool(BaseTaskPool): args_iter = iter(args_iter) try: # Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of - # tasks, which will be at most `num_tasks` (meaning the queue will be full). + # tasks, which will be at most `group_size` (meaning the queue will be full). for i in range(num_tasks): args_queue.put_nowait(next(args_iter)) except StopIteration: # If we get here, this means that the number of elements in the arguments iterator was less than the - # specified `num_tasks`. Still, the number of tasks to start immediately will be the size of the queue. + # specified `group_size`. Still, the number of tasks to start immediately will be the size of the queue. # The `_queue_producer` won't be necessary, since we already put all the elements in the queue. pass else: @@ -591,17 +591,27 @@ class TaskPool(BaseTaskPool): create_task(self._queue_producer(args_queue, args_iter)) return args_queue - async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1, + async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, group_size: int = 1, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: """ - Creates coroutines with arguments from a supplied iterable and runs them as new tasks in the pool in batches. - TODO: If task groups are implemented, consider adding all tasks from one call of this method to the same group - and referring to "group size" rather than chunk/batch size. - Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being an element from the iterable. + Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. - This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. + Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `args_iter`. - It sets up an internal arguments queue which is continuously filled while consuming the arguments iterable. + The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at + any given moment in time. Assuming the number of elements produced by `args_iter` is greater than `group_size`, + this method will block **only** until the first `group_size` tasks have been **started**, before returning. + (If the number of elements from `args_iter` is smaller than `group_size`, this method will return as soon as + all of them have been started.) + + As soon as one task from this first batch ends, it triggers the start of a new task (assuming there is room in + the pool), which consumes the next element from the arguments iterable. If the size of the pool never imposes a + limit, this ensures that the number of tasks running concurrently as a result of this method call is always + equal to `group_size` (except for when `args_iter` is exhausted of course). + + Thus, this method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. + + This method sets up an internal arguments queue which is continuously filled while consuming the `args_iter`. Args: func: @@ -610,8 +620,8 @@ class TaskPool(BaseTaskPool): The iterable of arguments; each element is to be passed into a `func` call when spawning a new task. arg_stars (optional): Whether or not to unpack an element from `args_iter` using stars; must be 0, 1, or 2. - num_tasks (optional): - The maximum number of the new tasks to run concurrently. + group_size (optional): + The maximum number new tasks spawned by this method to run concurrently. Defaults to 1. end_callback (optional): A callback to execute after a task has ended. It is run with the task's ID as its only positional argument. @@ -624,7 +634,7 @@ class TaskPool(BaseTaskPool): """ if not self._locked: raise exceptions.PoolIsLocked("Cannot start new tasks") - args_queue = self._set_up_args_queue(args_iter, num_tasks) + args_queue = self._set_up_args_queue(args_iter, group_size) # We need a flag to ensure that starting all tasks from the first batch here will not be blocked by the # `_queue_callback` triggered by one or more of them. # This could happen, e.g. if the pool has just enough room for one more task, but the queue here contains more @@ -638,29 +648,34 @@ class TaskPool(BaseTaskPool): # Now the callbacks can immediately trigger more tasks. first_batch_started.set() - async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_tasks: int = 1, + async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: """ An asyncio-task-based equivalent of the `multiprocessing.pool.Pool.map` method. - Creates coroutines with arguments from a supplied iterable and runs them as new tasks in the pool in batches. - Each coroutine looks like `func(arg)`, `arg` being an element from the iterable. + Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. + Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. - Once the first batch of tasks has started to run, this method returns. - As soon as on of them finishes, it triggers the start of a new task (assuming there is room in the pool) - consuming the next element from the arguments iterable. - If the size of the pool never imposes a limit, this ensures that there is almost continuously the desired number - of tasks from this call concurrently running within the pool. + The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at + any given moment in time. Assuming the number of elements produced by `arg_iter` is greater than `group_size`, + this method will block **only** until the first `group_size` tasks have been **started**, before returning. + (If the number of elements from `arg_iter` is smaller than `group_size`, this method will return as soon as + all of them have been started.) - This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. + As soon as one task from this first batch ends, it triggers the start of a new task (assuming there is room in + the pool), which consumes the next element from the arguments iterable. If the size of the pool never imposes a + limit, this ensures that the number of tasks running concurrently as a result of this method call is always + equal to `group_size` (except for when `arg_iter` is exhausted of course). + + Thus, this method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. Args: func: The coroutine function to use for spawning the new tasks within the task pool. arg_iter: The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task. - num_tasks (optional): - The maximum number of the new tasks to run concurrently. + group_size (optional): + The maximum number new tasks spawned by this method to run concurrently. Defaults to 1. end_callback (optional): A callback to execute after a task has ended. It is run with the task's ID as its only positional argument. @@ -672,27 +687,27 @@ class TaskPool(BaseTaskPool): `PoolIsLocked` if the pool has been locked. `NotCoroutine` if `func` is not a coroutine function. """ - await self._map(func, arg_iter, arg_stars=0, num_tasks=num_tasks, + await self._map(func, arg_iter, arg_stars=0, group_size=group_size, end_callback=end_callback, cancel_callback=cancel_callback) - async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_tasks: int = 1, + async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], group_size: int = 1, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: """ Like `map()` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked as positional arguments to the function. - Each coroutine then looks like `func(*arg)`, `arg` being an element from `args_iter`. + Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`. """ - await self._map(func, args_iter, arg_stars=1, num_tasks=num_tasks, + await self._map(func, args_iter, arg_stars=1, group_size=group_size, end_callback=end_callback, cancel_callback=cancel_callback) - async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_tasks: int = 1, + async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], group_size: int = 1, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: """ Like `map()` except that the elements of `kwargs_iter` are expected to be iterables themselves to be unpacked as keyword-arguments to the function. - Each coroutine then looks like `func(**arg)`, `arg` being an element from `kwargs_iter`. + Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`. """ - await self._map(func, kwargs_iter, arg_stars=2, num_tasks=num_tasks, + await self._map(func, kwargs_iter, arg_stars=2, group_size=group_size, end_callback=end_callback, cancel_callback=cancel_callback) diff --git a/tests/test_pool.py b/tests/test_pool.py index 69ff6d0..bbbc64f 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -558,19 +558,19 @@ class TaskPoolTestCase(CommonTestCase): mock_event_cls.return_value = mock_flag = MagicMock(set=mock_flag_set) mock_func, stars = MagicMock(), 3 - args_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2 + args_iter, group_size = (FOO, BAR, 1, 2, 3), 2 end_cb, cancel_cb = MagicMock(), MagicMock() self.task_pool._locked = False with self.assertRaises(exceptions.PoolIsLocked): - await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb) + await self.task_pool._map(mock_func, args_iter, stars, group_size, end_cb, cancel_cb) mock__set_up_args_queue.assert_not_called() mock__queue_consumer.assert_not_awaited() mock_flag_set.assert_not_called() self.task_pool._locked = True - self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, num_tasks, end_cb, cancel_cb)) - mock__set_up_args_queue.assert_called_once_with(args_iter, num_tasks) + self.assertIsNone(await self.task_pool._map(mock_func, args_iter, stars, group_size, end_cb, cancel_cb)) + mock__set_up_args_queue.assert_called_once_with(args_iter, group_size) mock__queue_consumer.assert_has_awaits(qsize * [call(mock_q, mock_flag, mock_func, arg_stars=stars, end_callback=end_cb, cancel_callback=cancel_cb)]) mock_flag_set.assert_called_once_with() @@ -578,28 +578,28 @@ class TaskPoolTestCase(CommonTestCase): @patch.object(pool.TaskPool, '_map') async def test_map(self, mock__map: AsyncMock): mock_func = MagicMock() - arg_iter, num_tasks = (FOO, BAR, 1, 2, 3), 2 + arg_iter, group_size = (FOO, BAR, 1, 2, 3), 2 end_cb, cancel_cb = MagicMock(), MagicMock() - self.assertIsNone(await self.task_pool.map(mock_func, arg_iter, num_tasks, end_cb, cancel_cb)) - mock__map.assert_awaited_once_with(mock_func, arg_iter, arg_stars=0, num_tasks=num_tasks, + self.assertIsNone(await self.task_pool.map(mock_func, arg_iter, group_size, end_cb, cancel_cb)) + mock__map.assert_awaited_once_with(mock_func, arg_iter, arg_stars=0, group_size=group_size, end_callback=end_cb, cancel_callback=cancel_cb) @patch.object(pool.TaskPool, '_map') async def test_starmap(self, mock__map: AsyncMock): mock_func = MagicMock() - args_iter, num_tasks = ([FOO], [BAR]), 2 + args_iter, group_size = ([FOO], [BAR]), 2 end_cb, cancel_cb = MagicMock(), MagicMock() - self.assertIsNone(await self.task_pool.starmap(mock_func, args_iter, num_tasks, end_cb, cancel_cb)) - mock__map.assert_awaited_once_with(mock_func, args_iter, arg_stars=1, num_tasks=num_tasks, + self.assertIsNone(await self.task_pool.starmap(mock_func, args_iter, group_size, end_cb, cancel_cb)) + mock__map.assert_awaited_once_with(mock_func, args_iter, arg_stars=1, group_size=group_size, end_callback=end_cb, cancel_callback=cancel_cb) @patch.object(pool.TaskPool, '_map') async def test_doublestarmap(self, mock__map: AsyncMock): mock_func = MagicMock() - kwargs_iter, num_tasks = [{'a': FOO}, {'a': BAR}], 2 + kwargs_iter, group_size = [{'a': FOO}, {'a': BAR}], 2 end_cb, cancel_cb = MagicMock(), MagicMock() - self.assertIsNone(await self.task_pool.doublestarmap(mock_func, kwargs_iter, num_tasks, end_cb, cancel_cb)) - mock__map.assert_awaited_once_with(mock_func, kwargs_iter, arg_stars=2, num_tasks=num_tasks, + self.assertIsNone(await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, end_cb, cancel_cb)) + mock__map.assert_awaited_once_with(mock_func, kwargs_iter, arg_stars=2, group_size=group_size, end_callback=end_cb, cancel_callback=cancel_cb) diff --git a/usage/USAGE.md b/usage/USAGE.md index e5c3314..33272b4 100644 --- a/usage/USAGE.md +++ b/usage/USAGE.md @@ -130,7 +130,7 @@ async def main() -> None: # **only** once there is room in the pool **and** no more than one of these last four tasks is running. args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] print("Calling `starmap`...") - await pool.starmap(other_work, args_list, num_tasks=2) + await pool.starmap(other_work, args_list, group_size=2) print("`starmap` returned") # Now we lock the pool, so that we can safely await all our tasks. pool.lock()