diff --git a/README.md b/README.md index 76ee5e3..9b67b16 100644 --- a/README.md +++ b/README.md @@ -27,16 +27,9 @@ Generally speaking, a task is added to a pool by providing it with a coroutine f ```python from asyncio_taskpool import SimpleTaskPool - ... - - async def work(_foo, _bar): ... - -... - - async def main(): pool = SimpleTaskPool(work, args=('xyz', 420)) await pool.start(5) @@ -44,7 +37,6 @@ async def main(): pool.stop(3) ... await pool.gather_and_close() - ... ``` Since one of the main goals of `asyncio-taskpool` is to be able to start/stop tasks dynamically or "on-the-fly", _most_ of the associated methods are non-blocking _most_ of the time. A notable exception is the `gather_and_close` method for awaiting the return of all tasks in the pool. (It is essentially a glorified wrapper around the [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) function.) diff --git a/docs/source/pages/pool.rst b/docs/source/pages/pool.rst index 2dce091..5034f74 100644 --- a/docs/source/pages/pool.rst +++ b/docs/source/pages/pool.rst @@ -162,7 +162,7 @@ Let's take the :ref:`queue worker example ` from before. :caption: main.py from asyncio_taskpool import SimpleTaskPool - from .work import another_worker_function + from .work import queue_worker_function async def main(): @@ -193,9 +193,9 @@ This may, at first glance, not seem like much of a difference, aside from differ if some_condition and pool.num_running > 10: pool.stop(3) elif some_other_condition and pool.num_running < 5: - pool.start(5) + await pool.start(5) else: - pool.start(1) + await pool.start(1) ... await pool.gather_and_close() diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 6fbc819..ef10b05 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -623,12 +623,13 @@ class TaskPool(BaseTaskPool): coroutine_function: The function representing the task group. Returns: - The constructed 'prefix_function_index' string to name a task group. + The constructed '{prefix}-{name}-group-{idx}' string to name a task group. + (With `name` being the name of the `coroutine_function` and `idx` being an incrementing index.) """ - base_name = f'{prefix}_{coroutine_function.__name__}' + base_name = f'{prefix}-{coroutine_function.__name__}-group' i = 0 while True: - name = f'{base_name}_{i}' + name = f'{base_name}-{i}' if name not in self._task_groups.keys(): return name i += 1 @@ -687,8 +688,9 @@ class TaskPool(BaseTaskPool): num (optional): The number of tasks to spawn with the specified parameters. group_name (optional): - Name of the task group to add the new tasks to. By default, a unique name is constructed using the - name of the provided `func` and an incrementing index as 'apply_func_index'. + Name of the task group to add the new tasks to. By default, a unique name is constructed in the form + :code:`'apply-{name}-group-{idx}'` (with `name` being the name of the `func` and `idx` being an + incrementing index). end_callback (optional): A callback to execute after a task has ended. It is run with the task's ID as its only positional argument. @@ -697,7 +699,7 @@ class TaskPool(BaseTaskPool): It is run with the task's ID as its only positional argument. Returns: - The name of the newly created task group (see the `group_name` parameter). + The name of the newly created group (see the `group_name` parameter). Raises: `PoolIsClosed`: The pool is closed. @@ -853,8 +855,8 @@ class TaskPool(BaseTaskPool): The number new tasks spawned by this method to run concurrently. Defaults to 1. group_name (optional): Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet. - By default, a unique name is constructed using the name of the provided `func` and an incrementing - index as 'apply_func_index'. + By default, a unique name is constructed in the form :code:`'map-{name}-group-{idx}'` + (with `name` being the name of the `func` and `idx` being an incrementing index). end_callback (optional): A callback to execute after a task has ended. It is run with the task's ID as its only positional argument. @@ -863,7 +865,7 @@ class TaskPool(BaseTaskPool): It is run with the task's ID as its only positional argument. Returns: - The name of the newly created task group (see the `group_name` parameter). + The name of the newly created group (see the `group_name` parameter). Raises: `PoolIsClosed`: The pool is closed. @@ -884,6 +886,10 @@ class TaskPool(BaseTaskPool): Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked as positional arguments to the function. Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`. + + Returns: + The name of the newly created group in the form :code:`'starmap-{name}-group-{index}'` + (with `name` being the name of the `func` and `idx` being an incrementing index). """ if group_name is None: group_name = self._generate_group_name('starmap', func) @@ -898,6 +904,10 @@ class TaskPool(BaseTaskPool): Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be unpacked as keyword-arguments to the function. Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`. + + Returns: + The name of the newly created group in the form :code:`'doublestarmap-{name}-group-{index}'` + (with `name` being the name of the `func` and `idx` being an incrementing index). """ if group_name is None: group_name = self._generate_group_name('doublestarmap', func) @@ -957,6 +967,7 @@ class SimpleTaskPool(BaseTaskPool): self._kwargs: KwArgsT = kwargs if kwargs is not None else {} self._end_callback: EndCB = end_callback self._cancel_callback: CancelCB = cancel_callback + self._start_calls: int = 0 super().__init__(pool_size=pool_size, name=name) @property @@ -964,14 +975,14 @@ class SimpleTaskPool(BaseTaskPool): """Name of the coroutine function used in the pool.""" return self._func.__name__ - async def _start_one(self) -> int: + async def _start_one(self, group_name: str) -> int: """Starts a single new task within the pool and returns its ID.""" - return await self._start_task(self._func(*self._args, **self._kwargs), + return await self._start_task(self._func(*self._args, **self._kwargs), group_name=group_name, end_callback=self._end_callback, cancel_callback=self._cancel_callback) - async def start(self, num: int) -> List[int]: + async def start(self, num: int) -> str: """ - Starts specified number of new tasks in the pool and returns their IDs. + Starts specified number of new tasks in the pool as a new group. This method may block if there is less room in the pool than the desired number of new tasks. @@ -979,11 +990,13 @@ class SimpleTaskPool(BaseTaskPool): num: The number of new tasks to start. Returns: - List of IDs of the new tasks that have been started (not necessarily in the order they were started). + The name of the newly created task group in the form :code:`'start-group-{idx}'` + (with `idx` being an incrementing index). """ - ids = await gather(*(self._start_one() for _ in range(num))) - assert isinstance(ids, list) # for PyCharm - return ids + group_name = f'start-group-{self._start_calls}' + self._start_calls += 1 + await gather(*(self._start_one(group_name) for _ in range(num))) + return group_name def stop(self, num: int) -> List[int]: """ diff --git a/tests/test_pool.py b/tests/test_pool.py index 57444f8..d2b43d0 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -476,12 +476,13 @@ class TaskPoolTestCase(CommonTestCase): def test__generate_group_name(self): prefix, func = 'x y z', AsyncMock(__name__=BAR) + base_name = f'{prefix}-{BAR}-group' self.task_pool._task_groups = { - f'{prefix}_{BAR}_0': MagicMock(), - f'{prefix}_{BAR}_1': MagicMock(), - f'{prefix}_{BAR}_100': MagicMock(), + f'{base_name}-0': MagicMock(), + f'{base_name}-1': MagicMock(), + f'{base_name}-100': MagicMock(), } - expected_output = f'{prefix}_{BAR}_2' + expected_output = f'{base_name}-2' output = self.task_pool._generate_group_name(prefix, func) self.assertEqual(expected_output, output) @@ -751,20 +752,22 @@ class SimpleTaskPoolTestCase(CommonTestCase): async def test__start_one(self, mock__start_task: AsyncMock): mock__start_task.return_value = expected_output = 99 self.task_pool._func = MagicMock(return_value=BAR) - output = await self.task_pool._start_one() + group_name = FOO + BAR + 'abc' + output = await self.task_pool._start_one(group_name) self.assertEqual(expected_output, output) self.task_pool._func.assert_called_once_with(*self.task_pool._args, **self.task_pool._kwargs) - mock__start_task.assert_awaited_once_with(BAR, end_callback=self.task_pool._end_callback, + mock__start_task.assert_awaited_once_with(BAR, group_name=group_name, end_callback=self.task_pool._end_callback, cancel_callback=self.task_pool._cancel_callback) @patch.object(pool.SimpleTaskPool, '_start_one') async def test_start(self, mock__start_one: AsyncMock): mock__start_one.return_value = FOO num = 5 + self.task_pool._start_calls = 42 output = await self.task_pool.start(num) - expected_output = num * [FOO] - self.assertListEqual(expected_output, output) - mock__start_one.assert_has_awaits(num * [call()]) + expected_output = 'start-group-42' + self.assertEqual(expected_output, output) + mock__start_one.assert_has_awaits(num * [call(expected_output)]) @patch.object(pool.SimpleTaskPool, 'cancel') def test_stop(self, mock_cancel: MagicMock):