renamed group_size to num_concurrent in _map

This commit is contained in:
2022-03-29 19:28:58 +02:00
parent 54e5bfa8a0
commit 23a4cb028a
5 changed files with 56 additions and 57 deletions

View File

@ -718,12 +718,12 @@ class TaskPool(BaseTaskPool):
await execute_optional(actual_end_callback, args=(task_id,))
return release_callback
async def _arg_consumer(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT,
async def _arg_consumer(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT,
arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Consumes arguments from :meth:`_map` and keeps a limited number of tasks working on them.
The `group_size` acts as the limiting value of an internal semaphore, which must be acquired before a new task
`num_concurrent` acts as the limiting value of an internal semaphore, which must be acquired before a new task
can be started, and which must be released when one of these tasks ends.
Intended to be run as a meta task of a specific group.
@ -731,7 +731,7 @@ class TaskPool(BaseTaskPool):
Args:
group_name:
Name of the associated task group; passed into :meth:`_start_task`.
group_size:
num_concurrent:
The maximum number new tasks spawned by this method to run concurrently.
func:
The coroutine function to use for spawning the new tasks within the task pool.
@ -746,7 +746,7 @@ class TaskPool(BaseTaskPool):
The callback that was specified to execute after cancellation of the task (and the next one).
It is run with the task's ID as its only positional argument.
"""
map_semaphore = Semaphore(group_size)
map_semaphore = Semaphore(num_concurrent)
release_cb = self._get_map_end_callback(map_semaphore, actual_end_callback=end_callback)
for next_arg in arg_iter:
# When the number of running tasks spawned by this method reaches the specified maximum,
@ -768,7 +768,7 @@ class TaskPool(BaseTaskPool):
str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
map_semaphore.release()
async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates tasks in the pool with arguments from the supplied iterable.
@ -777,21 +777,21 @@ class TaskPool(BaseTaskPool):
All the new tasks are added to the same task group.
The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this group ends, it triggers the start of a new task
`num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task
(assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size
of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running
concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course).
of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is
always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course).
Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and the `group_size`.
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
Args:
group_name:
Name of the task group to add the new tasks to. It must be a name that doesn't exist yet.
group_size:
The maximum number new tasks spawned by this method to run concurrently.
num_concurrent:
The number new tasks spawned by this method to run concurrently.
func:
The coroutine function to use for spawning the new tasks within the task pool.
arg_iter:
@ -806,21 +806,21 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument.
Raises:
`ValueError`: `group_size` is less than 1.
`ValueError`: `num_concurrent` is less than 1.
`asyncio_taskpool.exceptions.InvalidGroupName`: A group named `group_name` exists in the pool.
"""
self._check_start(function=func)
if group_size < 1:
raise ValueError(f"Group size must be a positive integer.")
if num_concurrent < 1:
raise ValueError("`num_concurrent` must be a positive integer.")
if group_name in self._task_groups.keys():
raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!")
self._task_groups[group_name] = group_reg = TaskGroupRegister()
async with group_reg:
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._arg_consumer(group_name, group_size, func, arg_iter, arg_stars,
meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)))
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1, group_name: str = None,
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
@ -830,25 +830,23 @@ class TaskPool(BaseTaskPool):
All the new tasks are added to the same task group.
The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this group ends, it triggers the start of a new task
`num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task
(assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size
of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running
concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course).
of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is
always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course).
This method sets up an internal arguments queue which is continuously filled while consuming the `arg_iter`.
Because this method delegates the spawning of the tasks to two meta tasks (a producer and a consumer of the
aforementioned queue), it **never blocks**. However, just because this method returns immediately, this does
not mean that any task was started or that any number of tasks will start soon, as this is solely determined by
the :attr:`BaseTaskPool.pool_size` and the `group_size`.
Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
Args:
func:
The coroutine function to use for spawning the new tasks within the task pool.
arg_iter:
The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task.
group_size (optional):
The maximum number new tasks spawned by this method to run concurrently. Defaults to 1.
num_concurrent (optional):
The number new tasks spawned by this method to run concurrently. Defaults to 1.
group_name (optional):
Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet.
end_callback (optional):
@ -865,16 +863,16 @@ class TaskPool(BaseTaskPool):
`PoolIsClosed`: The pool is closed.
`NotCoroutine`: `func` is not a coroutine function.
`PoolIsLocked`: The pool is currently locked.
`ValueError`: `group_size` is less than 1.
`ValueError`: `num_concurrent` is less than 1.
`InvalidGroupName`: A group named `group_name` exists in the pool.
"""
if group_name is None:
group_name = self._generate_group_name('map', func)
await self._map(group_name, group_size, func, arg_iter, 0,
await self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name
async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], group_size: int = 1,
async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
@ -883,11 +881,11 @@ class TaskPool(BaseTaskPool):
"""
if group_name is None:
group_name = self._generate_group_name('starmap', func)
await self._map(group_name, group_size, func, args_iter, 1,
await self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name
async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], group_size: int = 1,
async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None,
cancel_callback: CancelCB = None) -> str:
"""
@ -897,7 +895,7 @@ class TaskPool(BaseTaskPool):
"""
if group_name is None:
group_name = self._generate_group_name('doublestarmap', func)
await self._map(group_name, group_size, func, kwargs_iter, 2,
await self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name