made apply non-blocking by using a meta-task

This commit is contained in:
Daniil Fajnberg 2022-03-29 20:01:44 +02:00
parent 1beb9fc9b0
commit 17539e9c27
Signed by: daniil-berg
GPG Key ID: BE187C50903BEE97
3 changed files with 12 additions and 13 deletions

View File

@ -228,8 +228,6 @@ The only method of a pool that one should **always** assume to be blocking is :p
One method to be aware of is :py:meth:`.flush() <asyncio_taskpool.pool.BaseTaskPool.flush>`. Since it will await only those tasks that the pool considers **ended** or **cancelled**, the blocking can only come from any callbacks that were provided for either of those situations. One method to be aware of is :py:meth:`.flush() <asyncio_taskpool.pool.BaseTaskPool.flush>`. Since it will await only those tasks that the pool considers **ended** or **cancelled**, the blocking can only come from any callbacks that were provided for either of those situations.
In general, the act of adding tasks to a pool is non-blocking, no matter which particular methods are used. The only notable exception is when a limit on the pool size has been set and there is "not enough room" to add a task. In this case, both :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>` and :py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased). In general, the act of adding tasks to a pool is non-blocking, no matter which particular methods are used. The only notable exception is when a limit on the pool size has been set and there is "not enough room" to add a task. In this case, :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased).
:py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns.
:py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns.
:py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` and :py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since they make use of "meta-tasks" under the hood, they will always return immediately. However, if the pool was full when one of them was called, there is **no guarantee** that even a single task has started, when the method returns.

View File

@ -668,7 +668,9 @@ class TaskPool(BaseTaskPool):
All the new tasks are added to the same task group. All the new tasks are added to the same task group.
This method blocks, **only if** the pool has not enough room to accommodate `num` new tasks. Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num`.
Args: Args:
func: func:
@ -701,8 +703,9 @@ class TaskPool(BaseTaskPool):
group_name = self._generate_group_name('apply', func) group_name = self._generate_group_name('apply', func)
group_reg = self._task_groups.setdefault(group_name, TaskGroupRegister()) group_reg = self._task_groups.setdefault(group_name, TaskGroupRegister())
async with group_reg: async with group_reg:
task = create_task(self._apply_num(group_name, func, args, kwargs, num, end_callback, cancel_callback)) meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
await task meta_tasks.add(create_task(self._apply_num(group_name, func, args, kwargs, num,
end_callback=end_callback, cancel_callback=cancel_callback)))
return group_name return group_name
@staticmethod @staticmethod

View File

@ -513,8 +513,7 @@ class TaskPoolTestCase(CommonTestCase):
mock__generate_group_name.return_value = generated_name = 'name 123' mock__generate_group_name.return_value = generated_name = 'name 123'
mock_group_reg = set_up_mock_group_register(mock_reg_cls) mock_group_reg = set_up_mock_group_register(mock_reg_cls)
mock__apply_num.return_value = mock_apply_coroutine = object() mock__apply_num.return_value = mock_apply_coroutine = object()
mock_task_future = AsyncMock() mock_create_task.return_value = fake_task = object()
mock_create_task.return_value = mock_task_future()
mock_func, num, group_name = MagicMock(), 3, FOO + BAR mock_func, num, group_name = MagicMock(), 3, FOO + BAR
args, kwargs = (FOO, BAR), {'a': 1, 'b': 2} args, kwargs = (FOO, BAR), {'a': 1, 'b': 2}
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
@ -525,10 +524,11 @@ class TaskPoolTestCase(CommonTestCase):
mock__check_start.assert_called_once_with(function=mock_func) mock__check_start.assert_called_once_with(function=mock_func)
self.assertEqual(mock_group_reg, self.task_pool._task_groups[_group_name]) self.assertEqual(mock_group_reg, self.task_pool._task_groups[_group_name])
mock_group_reg.__aenter__.assert_awaited_once_with() mock_group_reg.__aenter__.assert_awaited_once_with()
mock__apply_num.assert_called_once_with(_group_name, mock_func, args, kwargs, num, end_cb, cancel_cb) mock__apply_num.assert_called_once_with(_group_name, mock_func, args, kwargs, num,
end_callback=end_cb, cancel_callback=cancel_cb)
mock_create_task.assert_called_once_with(mock_apply_coroutine) mock_create_task.assert_called_once_with(mock_apply_coroutine)
mock_group_reg.__aexit__.assert_awaited_once() mock_group_reg.__aexit__.assert_awaited_once()
mock_task_future.assert_awaited_once_with() self.assertSetEqual({fake_task}, self.task_pool._group_meta_tasks_running[group_name])
output = await self.task_pool.apply(mock_func, args, kwargs, num, group_name, end_cb, cancel_cb) output = await self.task_pool.apply(mock_func, args, kwargs, num, group_name, end_cb, cancel_cb)
check_assertions(group_name, output) check_assertions(group_name, output)
@ -540,8 +540,6 @@ class TaskPoolTestCase(CommonTestCase):
mock__apply_num.reset_mock() mock__apply_num.reset_mock()
mock_create_task.reset_mock() mock_create_task.reset_mock()
mock_group_reg.__aexit__.reset_mock() mock_group_reg.__aexit__.reset_mock()
mock_task_future = AsyncMock()
mock_create_task.return_value = mock_task_future()
output = await self.task_pool.apply(mock_func, args, kwargs, num, None, end_cb, cancel_cb) output = await self.task_pool.apply(mock_func, args, kwargs, num, None, end_cb, cancel_cb)
check_assertions(generated_name, output) check_assertions(generated_name, output)