diff --git a/docs/source/pages/control.rst b/docs/source/pages/control.rst index b185cc1..0411405 100644 --- a/docs/source/pages/control.rst +++ b/docs/source/pages/control.rst @@ -96,9 +96,9 @@ When you are dealing with a regular :py:class:`TaskPool map mypackage.mymodule.worker ['x','y','z'] -g 3 + > map mypackage.mymodule.worker ['x','y','z'] -n 3 -The :code:`-g` is a shorthand for :code:`--group-size` in this case. In general, all (public) pool methods will have a corresponding command in the control session. +The :code:`-n` is a shorthand for :code:`--num-concurrent` in this case. In general, all (public) pool methods will have a corresponding command in the control session. .. note:: diff --git a/docs/source/pages/pool.rst b/docs/source/pages/pool.rst index c30ba76..0452dd7 100644 --- a/docs/source/pages/pool.rst +++ b/docs/source/pages/pool.rst @@ -46,7 +46,7 @@ Let's take a look at an example. Say you have a coroutine function that takes tw async def queue_worker_function(in_queue: Queue, out_queue: Queue) -> None: while True: item = await in_queue.get() - ... # Do some work on the item amd arrive at a result. + ... # Do some work on the item and arrive at a result. await out_queue.put(result) How would we go about concurrently executing this function, say 5 times? There are (as always) a number of ways to do this with :code:`asyncio`. If we want to use tasks and be clean about it, we can do it like this: @@ -141,7 +141,7 @@ Or we could use a task pool: async def main(): ... pool = TaskPool() - await pool.map(another_worker_function, data_iterator, group_size=5) + await pool.map(another_worker_function, data_iterator, num_concurrent=5) ... pool.lock() await pool.gather_and_close() @@ -231,5 +231,6 @@ One method to be aware of is :py:meth:`.flush() ` and :py:meth:`TaskPool.apply() ` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased). -:py:meth:`TaskPool.map() ` (and its variants) will **never** block. Since it makes use of "meta-tasks" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns. +:py:meth:`TaskPool.map() ` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns. +:py:meth:`TaskPool.map() ` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns. diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 5f7a122..d8007d3 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -718,12 +718,12 @@ class TaskPool(BaseTaskPool): await execute_optional(actual_end_callback, args=(task_id,)) return release_callback - async def _arg_consumer(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, + async def _arg_consumer(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: """ Consumes arguments from :meth:`_map` and keeps a limited number of tasks working on them. - The `group_size` acts as the limiting value of an internal semaphore, which must be acquired before a new task + `num_concurrent` acts as the limiting value of an internal semaphore, which must be acquired before a new task can be started, and which must be released when one of these tasks ends. Intended to be run as a meta task of a specific group. @@ -731,7 +731,7 @@ class TaskPool(BaseTaskPool): Args: group_name: Name of the associated task group; passed into :meth:`_start_task`. - group_size: + num_concurrent: The maximum number new tasks spawned by this method to run concurrently. func: The coroutine function to use for spawning the new tasks within the task pool. @@ -746,7 +746,7 @@ class TaskPool(BaseTaskPool): The callback that was specified to execute after cancellation of the task (and the next one). It is run with the task's ID as its only positional argument. """ - map_semaphore = Semaphore(group_size) + map_semaphore = Semaphore(num_concurrent) release_cb = self._get_map_end_callback(map_semaphore, actual_end_callback=end_callback) for next_arg in arg_iter: # When the number of running tasks spawned by this method reaches the specified maximum, @@ -768,7 +768,7 @@ class TaskPool(BaseTaskPool): str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg)) map_semaphore.release() - async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, + async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: """ Creates tasks in the pool with arguments from the supplied iterable. @@ -777,21 +777,21 @@ class TaskPool(BaseTaskPool): All the new tasks are added to the same task group. - The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at - any given moment in time. As soon as one task from this group ends, it triggers the start of a new task + `num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at + any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task (assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size - of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running - concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course). + of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is + always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course). Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just because this method returns immediately, this does not mean that any task was started or that any number of - tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and the `group_size`. + tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`. Args: group_name: Name of the task group to add the new tasks to. It must be a name that doesn't exist yet. - group_size: - The maximum number new tasks spawned by this method to run concurrently. + num_concurrent: + The number new tasks spawned by this method to run concurrently. func: The coroutine function to use for spawning the new tasks within the task pool. arg_iter: @@ -806,21 +806,21 @@ class TaskPool(BaseTaskPool): It is run with the task's ID as its only positional argument. Raises: - `ValueError`: `group_size` is less than 1. + `ValueError`: `num_concurrent` is less than 1. `asyncio_taskpool.exceptions.InvalidGroupName`: A group named `group_name` exists in the pool. """ self._check_start(function=func) - if group_size < 1: - raise ValueError(f"Group size must be a positive integer.") + if num_concurrent < 1: + raise ValueError("`num_concurrent` must be a positive integer.") if group_name in self._task_groups.keys(): raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!") self._task_groups[group_name] = group_reg = TaskGroupRegister() async with group_reg: meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set()) - meta_tasks.add(create_task(self._arg_consumer(group_name, group_size, func, arg_iter, arg_stars, + meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback))) - async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1, group_name: str = None, + async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ A task-based equivalent of the `multiprocessing.pool.Pool.map` method. @@ -830,25 +830,23 @@ class TaskPool(BaseTaskPool): All the new tasks are added to the same task group. - The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at - any given moment in time. As soon as one task from this group ends, it triggers the start of a new task + `num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at + any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task (assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size - of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running - concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course). + of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is + always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course). - This method sets up an internal arguments queue which is continuously filled while consuming the `arg_iter`. - Because this method delegates the spawning of the tasks to two meta tasks (a producer and a consumer of the - aforementioned queue), it **never blocks**. However, just because this method returns immediately, this does - not mean that any task was started or that any number of tasks will start soon, as this is solely determined by - the :attr:`BaseTaskPool.pool_size` and the `group_size`. + Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just + because this method returns immediately, this does not mean that any task was started or that any number of + tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`. Args: func: The coroutine function to use for spawning the new tasks within the task pool. arg_iter: The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task. - group_size (optional): - The maximum number new tasks spawned by this method to run concurrently. Defaults to 1. + num_concurrent (optional): + The number new tasks spawned by this method to run concurrently. Defaults to 1. group_name (optional): Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet. end_callback (optional): @@ -865,16 +863,16 @@ class TaskPool(BaseTaskPool): `PoolIsClosed`: The pool is closed. `NotCoroutine`: `func` is not a coroutine function. `PoolIsLocked`: The pool is currently locked. - `ValueError`: `group_size` is less than 1. + `ValueError`: `num_concurrent` is less than 1. `InvalidGroupName`: A group named `group_name` exists in the pool. """ if group_name is None: group_name = self._generate_group_name('map', func) - await self._map(group_name, group_size, func, arg_iter, 0, + await self._map(group_name, num_concurrent, func, arg_iter, 0, end_callback=end_callback, cancel_callback=cancel_callback) return group_name - async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], group_size: int = 1, + async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked @@ -883,11 +881,11 @@ class TaskPool(BaseTaskPool): """ if group_name is None: group_name = self._generate_group_name('starmap', func) - await self._map(group_name, group_size, func, args_iter, 1, + await self._map(group_name, num_concurrent, func, args_iter, 1, end_callback=end_callback, cancel_callback=cancel_callback) return group_name - async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], group_size: int = 1, + async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ @@ -897,7 +895,7 @@ class TaskPool(BaseTaskPool): """ if group_name is None: group_name = self._generate_group_name('doublestarmap', func) - await self._map(group_name, group_size, func, kwargs_iter, 2, + await self._map(group_name, num_concurrent, func, kwargs_iter, 2, end_callback=end_callback, cancel_callback=cancel_callback) return group_name diff --git a/tests/test_pool.py b/tests/test_pool.py index 1e0b94c..6d89f0a 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -655,18 +655,18 @@ class TaskPoolTestCase(CommonTestCase): async def test_map(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock_func = MagicMock() - arg_iter, group_size, group_name = (FOO, BAR, 1, 2, 3), 2, FOO + BAR + arg_iter, num_concurrent, group_name = (FOO, BAR, 1, 2, 3), 2, FOO + BAR end_cb, cancel_cb = MagicMock(), MagicMock() - output = await self.task_pool.map(mock_func, arg_iter, group_size, group_name, end_cb, cancel_cb) + output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, group_name, end_cb, cancel_cb) self.assertEqual(group_name, output) - mock__map.assert_awaited_once_with(group_name, group_size, mock_func, arg_iter, 0, + mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, arg_iter, 0, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_not_called() mock__map.reset_mock() - output = await self.task_pool.map(mock_func, arg_iter, group_size, None, end_cb, cancel_cb) + output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, None, end_cb, cancel_cb) self.assertEqual(generated_name, output) - mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, arg_iter, 0, + mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, arg_iter, 0, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_called_once_with('map', mock_func) @@ -675,18 +675,18 @@ class TaskPoolTestCase(CommonTestCase): async def test_starmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock_func = MagicMock() - args_iter, group_size, group_name = ([FOO], [BAR]), 2, FOO + BAR + args_iter, num_concurrent, group_name = ([FOO], [BAR]), 2, FOO + BAR end_cb, cancel_cb = MagicMock(), MagicMock() - output = await self.task_pool.starmap(mock_func, args_iter, group_size, group_name, end_cb, cancel_cb) + output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, group_name, end_cb, cancel_cb) self.assertEqual(group_name, output) - mock__map.assert_awaited_once_with(group_name, group_size, mock_func, args_iter, 1, + mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, args_iter, 1, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_not_called() mock__map.reset_mock() - output = await self.task_pool.starmap(mock_func, args_iter, group_size, None, end_cb, cancel_cb) + output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, None, end_cb, cancel_cb) self.assertEqual(generated_name, output) - mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, args_iter, 1, + mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, args_iter, 1, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_called_once_with('starmap', mock_func) @@ -695,18 +695,18 @@ class TaskPoolTestCase(CommonTestCase): async def test_doublestarmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock_func = MagicMock() - kwargs_iter, group_size, group_name = [{'a': FOO}, {'a': BAR}], 2, FOO + BAR + kw_iter, num_concurrent, group_name = [{'a': FOO}, {'a': BAR}], 2, FOO + BAR end_cb, cancel_cb = MagicMock(), MagicMock() - output = await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, group_name, end_cb, cancel_cb) + output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, group_name, end_cb, cancel_cb) self.assertEqual(group_name, output) - mock__map.assert_awaited_once_with(group_name, group_size, mock_func, kwargs_iter, 2, + mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, kw_iter, 2, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_not_called() mock__map.reset_mock() - output = await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, None, end_cb, cancel_cb) + output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, None, end_cb, cancel_cb) self.assertEqual(generated_name, output) - mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, kwargs_iter, 2, + mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, kw_iter, 2, end_callback=end_cb, cancel_callback=cancel_cb) mock__generate_group_name.assert_called_once_with('doublestarmap', mock_func) diff --git a/usage/USAGE.md b/usage/USAGE.md index 4ca1b91..f982b1f 100644 --- a/usage/USAGE.md +++ b/usage/USAGE.md @@ -41,7 +41,7 @@ async def main() -> None: pool = SimpleTaskPool(work, args=(5,)) # initializes the pool; no work is being done yet await pool.start(3) # launches work tasks 0, 1, and 2 await asyncio.sleep(1.5) # lets the tasks work for a bit - await pool.start() # launches work task 3 + await pool.start(1) # launches work task 3 await asyncio.sleep(1.5) # lets the tasks work for a bit pool.stop(2) # cancels tasks 3 and 2 (LIFO order) pool.lock() # required for the last line @@ -135,7 +135,7 @@ async def main() -> None: # Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5) # only once there is room in the pool and no more than one other task of these new ones is running. args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] - await pool.starmap(other_work, args_list, group_size=2) + await pool.starmap(other_work, args_list, num_concurrent=2) print("> Called `starmap`") # Now we lock the pool, so that we can safely await all our tasks. pool.lock() @@ -199,7 +199,7 @@ Started TaskPool-0_Task-3 > other_work with 15 Ended TaskPool-0_Task-0 Ended TaskPool-0_Task-1 <--- these two end and free up two more slots in the pool -Started TaskPool-0_Task-4 <--- since the group size is set to 2, Task-5 will not start +Started TaskPool-0_Task-4 <--- since `num_concurrent` is set to 2, Task-5 will not start > work with 190 > work with 190 > other_work with 16