start method now returns group name

This commit is contained in:
Daniil Fajnberg 2022-03-30 15:31:38 +02:00
parent 82e6ca7b1a
commit 5b3ac52bf6
Signed by: daniil-berg
GPG Key ID: BE187C50903BEE97
4 changed files with 45 additions and 37 deletions

View File

@ -27,16 +27,9 @@ Generally speaking, a task is added to a pool by providing it with a coroutine f
```python ```python
from asyncio_taskpool import SimpleTaskPool from asyncio_taskpool import SimpleTaskPool
... ...
async def work(_foo, _bar): ... async def work(_foo, _bar): ...
...
async def main(): async def main():
pool = SimpleTaskPool(work, args=('xyz', 420)) pool = SimpleTaskPool(work, args=('xyz', 420))
await pool.start(5) await pool.start(5)
@ -44,7 +37,6 @@ async def main():
pool.stop(3) pool.stop(3)
... ...
await pool.gather_and_close() await pool.gather_and_close()
...
``` ```
Since one of the main goals of `asyncio-taskpool` is to be able to start/stop tasks dynamically or "on-the-fly", _most_ of the associated methods are non-blocking _most_ of the time. A notable exception is the `gather_and_close` method for awaiting the return of all tasks in the pool. (It is essentially a glorified wrapper around the [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) function.) Since one of the main goals of `asyncio-taskpool` is to be able to start/stop tasks dynamically or "on-the-fly", _most_ of the associated methods are non-blocking _most_ of the time. A notable exception is the `gather_and_close` method for awaiting the return of all tasks in the pool. (It is essentially a glorified wrapper around the [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) function.)

View File

@ -162,7 +162,7 @@ Let's take the :ref:`queue worker example <queue-worker-function>` from before.
:caption: main.py :caption: main.py
from asyncio_taskpool import SimpleTaskPool from asyncio_taskpool import SimpleTaskPool
from .work import another_worker_function from .work import queue_worker_function
async def main(): async def main():
@ -193,9 +193,9 @@ This may, at first glance, not seem like much of a difference, aside from differ
if some_condition and pool.num_running > 10: if some_condition and pool.num_running > 10:
pool.stop(3) pool.stop(3)
elif some_other_condition and pool.num_running < 5: elif some_other_condition and pool.num_running < 5:
pool.start(5) await pool.start(5)
else: else:
pool.start(1) await pool.start(1)
... ...
await pool.gather_and_close() await pool.gather_and_close()

View File

@ -623,12 +623,13 @@ class TaskPool(BaseTaskPool):
coroutine_function: The function representing the task group. coroutine_function: The function representing the task group.
Returns: Returns:
The constructed 'prefix_function_index' string to name a task group. The constructed '{prefix}-{name}-group-{idx}' string to name a task group.
(With `name` being the name of the `coroutine_function` and `idx` being an incrementing index.)
""" """
base_name = f'{prefix}_{coroutine_function.__name__}' base_name = f'{prefix}-{coroutine_function.__name__}-group'
i = 0 i = 0
while True: while True:
name = f'{base_name}_{i}' name = f'{base_name}-{i}'
if name not in self._task_groups.keys(): if name not in self._task_groups.keys():
return name return name
i += 1 i += 1
@ -687,8 +688,9 @@ class TaskPool(BaseTaskPool):
num (optional): num (optional):
The number of tasks to spawn with the specified parameters. The number of tasks to spawn with the specified parameters.
group_name (optional): group_name (optional):
Name of the task group to add the new tasks to. By default, a unique name is constructed using the Name of the task group to add the new tasks to. By default, a unique name is constructed in the form
name of the provided `func` and an incrementing index as 'apply_func_index'. :code:`'apply-{name}-group-{idx}'` (with `name` being the name of the `func` and `idx` being an
incrementing index).
end_callback (optional): end_callback (optional):
A callback to execute after a task has ended. A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
@ -697,7 +699,7 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
Returns: Returns:
The name of the newly created task group (see the `group_name` parameter). The name of the newly created group (see the `group_name` parameter).
Raises: Raises:
`PoolIsClosed`: The pool is closed. `PoolIsClosed`: The pool is closed.
@ -853,8 +855,8 @@ class TaskPool(BaseTaskPool):
The number new tasks spawned by this method to run concurrently. Defaults to 1. The number new tasks spawned by this method to run concurrently. Defaults to 1.
group_name (optional): group_name (optional):
Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet. Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet.
By default, a unique name is constructed using the name of the provided `func` and an incrementing By default, a unique name is constructed in the form :code:`'map-{name}-group-{idx}'`
index as 'apply_func_index'. (with `name` being the name of the `func` and `idx` being an incrementing index).
end_callback (optional): end_callback (optional):
A callback to execute after a task has ended. A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
@ -863,7 +865,7 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
Returns: Returns:
The name of the newly created task group (see the `group_name` parameter). The name of the newly created group (see the `group_name` parameter).
Raises: Raises:
`PoolIsClosed`: The pool is closed. `PoolIsClosed`: The pool is closed.
@ -884,6 +886,10 @@ class TaskPool(BaseTaskPool):
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
as positional arguments to the function. as positional arguments to the function.
Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`. Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`.
Returns:
The name of the newly created group in the form :code:`'starmap-{name}-group-{index}'`
(with `name` being the name of the `func` and `idx` being an incrementing index).
""" """
if group_name is None: if group_name is None:
group_name = self._generate_group_name('starmap', func) group_name = self._generate_group_name('starmap', func)
@ -898,6 +904,10 @@ class TaskPool(BaseTaskPool):
Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be
unpacked as keyword-arguments to the function. unpacked as keyword-arguments to the function.
Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`. Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.
Returns:
The name of the newly created group in the form :code:`'doublestarmap-{name}-group-{index}'`
(with `name` being the name of the `func` and `idx` being an incrementing index).
""" """
if group_name is None: if group_name is None:
group_name = self._generate_group_name('doublestarmap', func) group_name = self._generate_group_name('doublestarmap', func)
@ -957,6 +967,7 @@ class SimpleTaskPool(BaseTaskPool):
self._kwargs: KwArgsT = kwargs if kwargs is not None else {} self._kwargs: KwArgsT = kwargs if kwargs is not None else {}
self._end_callback: EndCB = end_callback self._end_callback: EndCB = end_callback
self._cancel_callback: CancelCB = cancel_callback self._cancel_callback: CancelCB = cancel_callback
self._start_calls: int = 0
super().__init__(pool_size=pool_size, name=name) super().__init__(pool_size=pool_size, name=name)
@property @property
@ -964,14 +975,14 @@ class SimpleTaskPool(BaseTaskPool):
"""Name of the coroutine function used in the pool.""" """Name of the coroutine function used in the pool."""
return self._func.__name__ return self._func.__name__
async def _start_one(self) -> int: async def _start_one(self, group_name: str) -> int:
"""Starts a single new task within the pool and returns its ID.""" """Starts a single new task within the pool and returns its ID."""
return await self._start_task(self._func(*self._args, **self._kwargs), return await self._start_task(self._func(*self._args, **self._kwargs), group_name=group_name,
end_callback=self._end_callback, cancel_callback=self._cancel_callback) end_callback=self._end_callback, cancel_callback=self._cancel_callback)
async def start(self, num: int) -> List[int]: async def start(self, num: int) -> str:
""" """
Starts specified number of new tasks in the pool and returns their IDs. Starts specified number of new tasks in the pool as a new group.
This method may block if there is less room in the pool than the desired number of new tasks. This method may block if there is less room in the pool than the desired number of new tasks.
@ -979,11 +990,13 @@ class SimpleTaskPool(BaseTaskPool):
num: The number of new tasks to start. num: The number of new tasks to start.
Returns: Returns:
List of IDs of the new tasks that have been started (not necessarily in the order they were started). The name of the newly created task group in the form :code:`'start-group-{idx}'`
(with `idx` being an incrementing index).
""" """
ids = await gather(*(self._start_one() for _ in range(num))) group_name = f'start-group-{self._start_calls}'
assert isinstance(ids, list) # for PyCharm self._start_calls += 1
return ids await gather(*(self._start_one(group_name) for _ in range(num)))
return group_name
def stop(self, num: int) -> List[int]: def stop(self, num: int) -> List[int]:
""" """

View File

@ -476,12 +476,13 @@ class TaskPoolTestCase(CommonTestCase):
def test__generate_group_name(self): def test__generate_group_name(self):
prefix, func = 'x y z', AsyncMock(__name__=BAR) prefix, func = 'x y z', AsyncMock(__name__=BAR)
base_name = f'{prefix}-{BAR}-group'
self.task_pool._task_groups = { self.task_pool._task_groups = {
f'{prefix}_{BAR}_0': MagicMock(), f'{base_name}-0': MagicMock(),
f'{prefix}_{BAR}_1': MagicMock(), f'{base_name}-1': MagicMock(),
f'{prefix}_{BAR}_100': MagicMock(), f'{base_name}-100': MagicMock(),
} }
expected_output = f'{prefix}_{BAR}_2' expected_output = f'{base_name}-2'
output = self.task_pool._generate_group_name(prefix, func) output = self.task_pool._generate_group_name(prefix, func)
self.assertEqual(expected_output, output) self.assertEqual(expected_output, output)
@ -751,20 +752,22 @@ class SimpleTaskPoolTestCase(CommonTestCase):
async def test__start_one(self, mock__start_task: AsyncMock): async def test__start_one(self, mock__start_task: AsyncMock):
mock__start_task.return_value = expected_output = 99 mock__start_task.return_value = expected_output = 99
self.task_pool._func = MagicMock(return_value=BAR) self.task_pool._func = MagicMock(return_value=BAR)
output = await self.task_pool._start_one() group_name = FOO + BAR + 'abc'
output = await self.task_pool._start_one(group_name)
self.assertEqual(expected_output, output) self.assertEqual(expected_output, output)
self.task_pool._func.assert_called_once_with(*self.task_pool._args, **self.task_pool._kwargs) self.task_pool._func.assert_called_once_with(*self.task_pool._args, **self.task_pool._kwargs)
mock__start_task.assert_awaited_once_with(BAR, end_callback=self.task_pool._end_callback, mock__start_task.assert_awaited_once_with(BAR, group_name=group_name, end_callback=self.task_pool._end_callback,
cancel_callback=self.task_pool._cancel_callback) cancel_callback=self.task_pool._cancel_callback)
@patch.object(pool.SimpleTaskPool, '_start_one') @patch.object(pool.SimpleTaskPool, '_start_one')
async def test_start(self, mock__start_one: AsyncMock): async def test_start(self, mock__start_one: AsyncMock):
mock__start_one.return_value = FOO mock__start_one.return_value = FOO
num = 5 num = 5
self.task_pool._start_calls = 42
output = await self.task_pool.start(num) output = await self.task_pool.start(num)
expected_output = num * [FOO] expected_output = 'start-group-42'
self.assertListEqual(expected_output, output) self.assertEqual(expected_output, output)
mock__start_one.assert_has_awaits(num * [call()]) mock__start_one.assert_has_awaits(num * [call(expected_output)])
@patch.object(pool.SimpleTaskPool, 'cancel') @patch.object(pool.SimpleTaskPool, 'cancel')
def test_stop(self, mock_cancel: MagicMock): def test_stop(self, mock_cancel: MagicMock):