Compare commits

...

8 Commits

9 changed files with 470 additions and 446 deletions

View File

@ -27,24 +27,16 @@ Generally speaking, a task is added to a pool by providing it with a coroutine f
```python
from asyncio_taskpool import SimpleTaskPool
...
async def work(_foo, _bar): ...
...
async def main():
pool = SimpleTaskPool(work, args=('xyz', 420))
await pool.start(5)
pool.start(5)
...
pool.stop(3)
...
await pool.gather_and_close()
...
```
Since one of the main goals of `asyncio-taskpool` is to be able to start/stop tasks dynamically or "on-the-fly", _most_ of the associated methods are non-blocking _most_ of the time. A notable exception is the `gather_and_close` method for awaiting the return of all tasks in the pool. (It is essentially a glorified wrapper around the [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) function.)

View File

@ -81,7 +81,7 @@ By contrast, here is how you would do it with a task pool:
...
pool = TaskPool()
group_name = await pool.apply(queue_worker_function, args=(q_in, q_out), num=5)
group_name = pool.apply(queue_worker_function, args=(q_in, q_out), num=5)
...
pool.cancel_group(group_name)
...
@ -141,15 +141,17 @@ Or we could use a task pool:
async def main():
...
pool = TaskPool()
await pool.map(another_worker_function, data_iterator, num_concurrent=5)
pool.map(another_worker_function, data_iterator, num_concurrent=5)
...
await pool.gather_and_close()
Calling the :py:meth:`.map() <asyncio_taskpool.pool.TaskPool.map>` method this way ensures that there will **always** -- i.e. at any given moment in time -- be exactly 5 tasks working concurrently on our data (assuming no other pool interaction).
The :py:meth:`.gather_and_close() <asyncio_taskpool.pool.BaseTaskPool.gather_and_close>` line will block until **all the data** has been consumed. (see :ref:`blocking-pool-methods`)
.. note::
The :py:meth:`.gather_and_close() <asyncio_taskpool.pool.BaseTaskPool.gather_and_close>` line will block until **all the data** has been consumed. (see :ref:`blocking-pool-methods`)
Neither :py:meth:`.apply() <asyncio_taskpool.pool.TaskPool.apply>` nor :py:meth:`.map() <asyncio_taskpool.pool.TaskPool.map>` return coroutines. When they are called, the task pool immediately begins scheduling new tasks to run. No :code:`await` needed.
It can't get any simpler than that, can it? So glad you asked...
@ -162,13 +164,13 @@ Let's take the :ref:`queue worker example <queue-worker-function>` from before.
:caption: main.py
from asyncio_taskpool import SimpleTaskPool
from .work import another_worker_function
from .work import queue_worker_function
async def main():
...
pool = SimpleTaskPool(queue_worker_function, args=(q_in, q_out))
await pool.start(5)
pool.start(5)
...
pool.stop_all()
...
@ -228,6 +230,4 @@ The only method of a pool that one should **always** assume to be blocking is :p
One method to be aware of is :py:meth:`.flush() <asyncio_taskpool.pool.BaseTaskPool.flush>`. Since it will await only those tasks that the pool considers **ended** or **cancelled**, the blocking can only come from any callbacks that were provided for either of those situations.
In general, the act of adding tasks to a pool is non-blocking, no matter which particular methods are used. The only notable exception is when a limit on the pool size has been set and there is "not enough room" to add a task. In this case, :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased).
:py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` and :py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since they make use of "meta-tasks" under the hood, they will always return immediately. However, if the pool was full when one of them was called, there is **no guarantee** that even a single task has started, when the method returns.
All methods that add tasks to a pool, i.e. :py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants), :py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` and :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>`, are non-blocking by design. They all make use of "meta tasks" under the hood and return immediately. It is important however, to realize that just because they return, does not mean that any actual tasks have been spawned. For example, if a pool size limit was set and there was "no more room" in the pool when :py:meth:`.map() <asyncio_taskpool.pool.TaskPool.map>` was called, there is **no guarantee** that even a single task has started, when it returns.

View File

@ -1,6 +1,6 @@
[metadata]
name = asyncio-taskpool
version = 1.0.0-beta
version = 1.0.0
author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks
@ -11,7 +11,7 @@ url = https://git.fajnberg.de/daniil/asyncio-taskpool
project_urls =
Bug Tracker = https://github.com/daniil-berg/asyncio-taskpool/issues
classifiers =
Development Status :: 4 - Beta
Development Status :: 5 - Production/Stable
Programming Language :: Python :: 3
Operating System :: OS Independent
License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)

View File

@ -25,8 +25,6 @@ PACKAGE_NAME = 'asyncio_taskpool'
DEFAULT_TASK_GROUP = 'default'
DATETIME_FORMAT = '%Y-%m-%d_%H-%M-%S'
SESSION_MSG_BYTES = 1024 * 100
STREAM_WRITER = 'stream_writer'

View File

@ -23,7 +23,7 @@ This module should **not** be considered part of the public API.
from asyncio.streams import StreamReader, StreamWriter
from pathlib import Path
from typing import Any, Awaitable, Callable, Iterable, Mapping, Tuple, TypeVar, Union
from typing import Any, Awaitable, Callable, Coroutine, Iterable, Mapping, Tuple, TypeVar, Union
T = TypeVar('T')
@ -32,7 +32,7 @@ ArgsT = Iterable[Any]
KwArgsT = Mapping[str, Any]
AnyCallableT = Callable[[...], Union[T, Awaitable[T]]]
CoroutineFunc = Callable[[...], Awaitable[Any]]
CoroutineFunc = Callable[[...], Coroutine]
EndCB = Callable
CancelCB = Callable

View File

@ -33,12 +33,11 @@ from asyncio.exceptions import CancelledError
from asyncio.locks import Semaphore
from asyncio.tasks import Task, create_task, gather
from contextlib import suppress
from datetime import datetime
from math import inf
from typing import Any, Awaitable, Dict, Iterable, List, Set, Union
from . import exceptions
from .internals.constants import DEFAULT_TASK_GROUP, DATETIME_FORMAT
from .internals.constants import DEFAULT_TASK_GROUP
from .internals.group_register import TaskGroupRegister
from .internals.helpers import execute_optional, star_function
from .internals.types import ArgsT, KwArgsT, CoroutineFunc, EndCB, CancelCB
@ -84,6 +83,10 @@ class BaseTaskPool:
self._enough_room: Semaphore = Semaphore()
self._task_groups: Dict[str, TaskGroupRegister[int]] = {}
# Mapping task group names to sets of meta tasks, and a bucket for cancelled meta tasks.
self._group_meta_tasks_running: Dict[str, Set[Task]] = {}
self._meta_tasks_cancelled: Set[Task] = set()
# Finish with method/functions calls that add the pool to the internal list of pools, set its initial size,
# and issue a log message.
self._idx: int = self._add_pool(self)
@ -323,6 +326,8 @@ class BaseTaskPool:
"""
self._check_start(awaitable=awaitable, ignore_lock=ignore_lock)
await self._enough_room.acquire()
# TODO: Make sure that cancellation (group or pool) interrupts this method after context switching!
# Possibly make use of the task group register for that.
group_reg = self._task_groups.setdefault(group_name, TaskGroupRegister())
async with group_reg:
task_id = self._num_started
@ -376,123 +381,6 @@ class BaseTaskPool:
for task in tasks:
task.cancel(msg=msg)
def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None:
"""
Removes all tasks from the specified group and cancels them.
Does nothing to tasks, that are no longer running.
Args:
group_name: The name of the group of tasks that shall be cancelled.
group_reg: The task group register object containing the task IDs.
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
"""
while group_reg:
try:
self._tasks_running[group_reg.pop()].cancel(msg=msg)
except KeyError:
continue
log.debug("%s cancelled tasks from group %s", str(self), group_name)
async def cancel_group(self, group_name: str, msg: str = None) -> None:
"""
Cancels an entire group of tasks.
The task group is subsequently forgotten by the pool.
Args:
group_name: The name of the group of tasks that shall be cancelled.
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
Raises:
`InvalidGroupName`: if no task group named `group_name` exists in the pool.
"""
log.debug("%s cancelling tasks in group %s", str(self), group_name)
try:
group_reg = self._task_groups.pop(group_name)
except KeyError:
raise exceptions.InvalidGroupName(f"No task group named {group_name} exists in this pool.")
async with group_reg:
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
log.debug("%s forgot task group %s", str(self), group_name)
async def cancel_all(self, msg: str = None) -> None:
"""
Cancels all tasks still running within the pool.
Args:
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
"""
log.warning("%s cancelling all tasks!", str(self))
while self._task_groups:
group_name, group_reg = self._task_groups.popitem()
async with group_reg:
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
async def flush(self, return_exceptions: bool = False):
"""
Gathers (i.e. awaits) all ended/cancelled tasks in the pool.
The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded
`Task` objects.
It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks
registered for the tasks block.
Args:
return_exceptions (optional): Passed directly into `gather`.
"""
await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), return_exceptions=return_exceptions)
self._tasks_ended.clear()
self._tasks_cancelled.clear()
async def gather_and_close(self, return_exceptions: bool = False):
"""
Gathers (i.e. awaits) **all** tasks in the pool, then closes it.
Once this method is called, no more tasks can be started in the pool.
This method may block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of the
callbacks registered for a task blocks for whatever reason.
Args:
return_exceptions (optional): Passed directly into `gather`.
Raises:
`PoolStillUnlocked`: The pool has not been locked yet.
"""
self.lock()
await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), *self._tasks_running.values(),
return_exceptions=return_exceptions)
self._tasks_ended.clear()
self._tasks_cancelled.clear()
self._tasks_running.clear()
self._closed = True
class TaskPool(BaseTaskPool):
"""
General purpose task pool class.
Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib.
A `TaskPool` instance can manage an arbitrary number of concurrent tasks from any coroutine function.
Tasks in the pool can all belong to the same coroutine function,
but they can also come from any number of different and unrelated coroutine functions.
As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.)
Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment.
Adding tasks blocks **only if** the pool is full at that moment.
"""
def __init__(self, pool_size: int = inf, name: str = None) -> None:
super().__init__(pool_size=pool_size, name=name)
# In addition to all the attributes of the base class, we need a dictionary mapping task group names to sets of
# meta tasks that are/were running in the context of that group, and a bucket for cancelled meta tasks.
self._group_meta_tasks_running: Dict[str, Set[Task]] = {}
self._meta_tasks_cancelled: Set[Task] = set()
def _cancel_group_meta_tasks(self, group_name: str) -> None:
"""Cancels and forgets all meta tasks associated with the task group named `group_name`."""
try:
@ -505,43 +393,64 @@ class TaskPool(BaseTaskPool):
log.debug("%s cancelled and forgot meta tasks from group %s", str(self), group_name)
def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None:
"""See base class."""
self._cancel_group_meta_tasks(group_name)
super()._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
"""
Removes all tasks from the specified group and cancels them.
async def cancel_group(self, group_name: str, msg: str = None) -> None:
Does nothing to tasks, that are no longer running.
Args:
group_name: The name of the group of tasks that shall be cancelled.
group_reg: The task group register object containing the task IDs.
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
"""
self._cancel_group_meta_tasks(group_name)
while group_reg:
try:
self._tasks_running[group_reg.pop()].cancel(msg=msg)
except KeyError:
continue
log.debug("%s cancelled tasks from group %s", str(self), group_name)
def cancel_group(self, group_name: str, msg: str = None) -> None:
"""
Cancels an entire group of tasks.
The task group is subsequently forgotten by the pool.
If any methods such as :meth:`map` launched meta tasks belonging to that group, these meta tasks are cancelled
before the actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will
**never even start**. In the case of :meth:`map` this would mean that its `arg_iter` may be abandoned before it
was fully consumed (if that is even possible).
If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the
actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will
**never even start**.
Args:
group_name: The name of the group of tasks (and meta tasks) that shall be cancelled.
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
msg (optional): Passed to the `Task.cancel()` method of every task in the group.
Raises:
`InvalidGroupName`: No task group named `group_name` exists in the pool.
"""
await super().cancel_group(group_name=group_name, msg=msg)
log.debug("%s cancelling tasks in group %s", str(self), group_name)
try:
group_reg = self._task_groups.pop(group_name)
except KeyError:
raise exceptions.InvalidGroupName(f"No task group named {group_name} exists in this pool.")
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
log.debug("%s forgot task group %s", str(self), group_name)
async def cancel_all(self, msg: str = None) -> None:
def cancel_all(self, msg: str = None) -> None:
"""
Cancels all tasks still running within the pool (including meta tasks).
If any methods such as :meth:`map` launched meta tasks, these meta tasks are cancelled before the actual tasks
are cancelled. This means that any tasks "queued" to be started by a meta task will **never even start**. In the
case of :meth:`map` this would mean that its `arg_iter` may be abandoned before it was fully consumed (if that
is even possible).
If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the
actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will
**never even start**.
Args:
msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`.
msg (optional): Passed to the `Task.cancel()` method of every task.
"""
await super().cancel_all(msg=msg)
log.warning("%s cancelling all tasks!", str(self))
while self._task_groups:
group_name, group_reg = self._task_groups.popitem()
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
def _pop_ended_meta_tasks(self) -> Set[Task]:
"""
@ -574,7 +483,7 @@ class TaskPool(BaseTaskPool):
Gathers (i.e. awaits) all ended/cancelled tasks in the pool.
The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded
`Task` objects. It also gets rid of unneeded meta tasks.
`Task` objects. It also gets rid of unneeded (ended/cancelled) meta tasks.
It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks
registered for the tasks block.
@ -586,7 +495,9 @@ class TaskPool(BaseTaskPool):
await gather(*self._meta_tasks_cancelled, *self._pop_ended_meta_tasks(),
return_exceptions=return_exceptions)
self._meta_tasks_cancelled.clear()
await super().flush(return_exceptions=return_exceptions)
await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), return_exceptions=return_exceptions)
self._tasks_ended.clear()
self._tasks_cancelled.clear()
async def gather_and_close(self, return_exceptions: bool = False):
"""
@ -595,8 +506,8 @@ class TaskPool(BaseTaskPool):
Once this method is called, no more tasks can be started in the pool.
Note that this method may block indefinitely as long as any task in the pool is not done. This includes meta
tasks launched by methods such as :meth:`map`, which end by themselves, only once the arguments iterator is
fully consumed (which may not even be possible). To avoid this, make sure to call :meth:`cancel_all` first.
tasks launched by other methods, which may or may not even end by themselves. To avoid this, make sure to call
:meth:`cancel_all` first.
This method may also block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of
the callbacks registered for a task blocks for whatever reason.
@ -607,59 +518,105 @@ class TaskPool(BaseTaskPool):
Raises:
`PoolStillUnlocked`: The pool has not been locked yet.
"""
self.lock()
not_cancelled_meta_tasks = (task for task_set in self._group_meta_tasks_running.values() for task in task_set)
with suppress(CancelledError):
await gather(*self._meta_tasks_cancelled, *not_cancelled_meta_tasks, return_exceptions=return_exceptions)
self._meta_tasks_cancelled.clear()
self._group_meta_tasks_running.clear()
await super().gather_and_close(return_exceptions=return_exceptions)
await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), *self._tasks_running.values(),
return_exceptions=return_exceptions)
self._tasks_ended.clear()
self._tasks_cancelled.clear()
self._tasks_running.clear()
self._closed = True
# TODO: Turn the `_closed` attribute into an `Event` and add something like a `until_closed` method that will
# await it to allow blocking until a closing command comes from a server.
@staticmethod
def _generate_group_name(prefix: str, coroutine_function: CoroutineFunc) -> str:
class TaskPool(BaseTaskPool):
"""
General purpose task pool class.
Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib.
A `TaskPool` instance can manage an arbitrary number of concurrent tasks from any coroutine function.
Tasks in the pool can all belong to the same coroutine function,
but they can also come from any number of different and unrelated coroutine functions.
As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.)
Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment.
Adding tasks blocks **only if** the pool is full at that moment.
"""
def _generate_group_name(self, prefix: str, coroutine_function: CoroutineFunc) -> str:
"""
Creates a task group identifier that includes the current datetime.
Creates a unique task group identifier.
Args:
prefix: The start of the name; will be followed by an underscore.
coroutine_function: The function representing the task group.
Returns:
The constructed 'prefix_function_datetime' string to name a task group.
The constructed '{prefix}-{name}-group-{idx}' string to name a task group.
(With `name` being the name of the `coroutine_function` and `idx` being an incrementing index.)
"""
return f'{prefix}_{coroutine_function.__name__}_{datetime.now().strftime(DATETIME_FORMAT)}'
base_name = f'{prefix}-{coroutine_function.__name__}-group'
i = 0
while True:
name = f'{base_name}-{i}'
if name not in self._task_groups.keys():
return name
i += 1
async def _apply_num(self, group_name: str, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None,
num: int = 1, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
async def _apply_spawner(self, group_name: str, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None,
num: int = 1, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates a coroutine with the supplied arguments and runs it as a new task in the pool.
Creates coroutines with the supplied arguments and runs them as new tasks in the pool.
This method blocks, **only if** the pool has not enough room to accommodate `num` new tasks.
Args:
group_name:
Name of the task group to add the new task to.
Name of the task group to add the new tasks to.
func:
The coroutine function to be run as a task within the task pool.
The coroutine function to be run in `num` tasks within the task pool.
args (optional):
The positional arguments to pass into the function call.
The positional arguments to pass into each function call.
kwargs (optional):
The keyword-arguments to pass into the function call.
The keyword-arguments to pass into each function call.
num (optional):
The number of tasks to spawn with the specified parameters.
end_callback (optional):
A callback to execute after the task has ended.
A callback to execute after each task has ended.
It is run with the task's ID as its only positional argument.
cancel_callback (optional):
A callback to execute after cancellation of the task.
A callback to execute after cancellation of each task.
It is run with the task's ID as its only positional argument.
"""
if kwargs is None:
kwargs = {}
await gather(*(self._start_task(func(*args, **kwargs), group_name=group_name, end_callback=end_callback,
cancel_callback=cancel_callback) for _ in range(num)))
for i in range(num):
try:
coroutine = func(*args, **kwargs)
except Exception as e:
# This means there was probably something wrong with the function arguments.
log.exception("%s occurred in group '%s' while trying to create coroutine: %s(*%s, **%s)",
str(e.__class__.__name__), group_name, func.__name__, repr(args), repr(kwargs))
continue
try:
await self._start_task(coroutine, group_name=group_name, end_callback=end_callback,
cancel_callback=cancel_callback)
except CancelledError:
# Either the task group or all tasks were cancelled, so this meta tasks is not supposed to spawn any
# more tasks and can return immediately.
log.debug("Cancelled group '%s' after %s out of %s tasks have been spawned", group_name, i, num)
coroutine.close()
return
async def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None, num: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None, num: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates tasks with the supplied arguments to be run in the pool.
@ -672,6 +629,9 @@ class TaskPool(BaseTaskPool):
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num`.
If the entire task group is cancelled before `num` tasks have spawned, since the meta task is cancelled first,
the number of tasks spawned will end up being less than `num`.
Args:
func:
The coroutine function to use for spawning the new tasks within the task pool.
@ -682,7 +642,9 @@ class TaskPool(BaseTaskPool):
num (optional):
The number of tasks to spawn with the specified parameters.
group_name (optional):
Name of the task group to add the new tasks to.
Name of the task group to add the new tasks to. By default, a unique name is constructed in the form
:code:`'apply-{name}-group-{idx}'` (with `name` being the name of the `func` and `idx` being an
incrementing index).
end_callback (optional):
A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument.
@ -691,26 +653,28 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument.
Returns:
The name of the task group that the newly spawned tasks have been added to.
The name of the newly created group (see the `group_name` parameter).
Raises:
`PoolIsClosed`: The pool is closed.
`NotCoroutine`: `func` is not a coroutine function.
`PoolIsLocked`: The pool is currently locked.
`InvalidGroupName`: A group named `group_name` exists in the pool.
"""
self._check_start(function=func)
if group_name is None:
group_name = self._generate_group_name('apply', func)
group_reg = self._task_groups.setdefault(group_name, TaskGroupRegister())
async with group_reg:
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._apply_num(group_name, func, args, kwargs, num,
if group_name in self._task_groups.keys():
raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!")
self._task_groups.setdefault(group_name, TaskGroupRegister())
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._apply_spawner(group_name, func, args, kwargs, num,
end_callback=end_callback, cancel_callback=cancel_callback)))
return group_name
@staticmethod
def _get_map_end_callback(map_semaphore: Semaphore, actual_end_callback: EndCB) -> EndCB:
"""Returns a wrapped `end_callback` for each :meth:`_queue_consumer` task that releases the `map_semaphore`."""
"""Returns a wrapped `end_callback` for each :meth:`_arg_consumer` task that releases the `map_semaphore`."""
async def release_callback(task_id: int) -> None:
map_semaphore.release()
await execute_optional(actual_end_callback, args=(task_id,))
@ -744,30 +708,34 @@ class TaskPool(BaseTaskPool):
The callback that was specified to execute after cancellation of the task (and the next one).
It is run with the task's ID as its only positional argument.
"""
map_semaphore = Semaphore(num_concurrent)
release_cb = self._get_map_end_callback(map_semaphore, actual_end_callback=end_callback)
for next_arg in arg_iter:
# When the number of running tasks spawned by this method reaches the specified maximum,
# this next line will block, until one of them ends and releases the semaphore.
await map_semaphore.acquire()
semaphore = Semaphore(num_concurrent)
release_cb = self._get_map_end_callback(semaphore, actual_end_callback=end_callback)
for i, next_arg in enumerate(arg_iter):
semaphore_acquired = False
try:
await self._start_task(star_function(func, next_arg, arg_stars=arg_stars), group_name=group_name,
ignore_lock=True, end_callback=release_cb, cancel_callback=cancel_callback)
except CancelledError:
# This means that no more tasks are supposed to be created from this `arg_iter`;
# thus, we can forget about the rest of the arguments.
log.debug("Cancelled consumption of argument iterable in task group '%s'", group_name)
map_semaphore.release()
return
coroutine = star_function(func, next_arg, arg_stars=arg_stars)
except Exception as e:
# This means an exception occurred during task **creation**, meaning no task has been created.
# It does not imply an error within the task itself.
log.exception("%s occurred while trying to create task: %s(%s%s)",
str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
map_semaphore.release()
# This means there was probably something wrong with the function arguments.
log.exception("%s occurred in group '%s' while trying to create coroutine: %s(%s%s)",
str(e.__class__.__name__), group_name, func.__name__, '*' * arg_stars, str(next_arg))
continue
try:
# When the number of running tasks spawned by this method reaches the specified maximum,
# this next line will block, until one of them ends and releases the semaphore.
semaphore_acquired = await semaphore.acquire()
await self._start_task(coroutine, group_name=group_name, ignore_lock=True,
end_callback=release_cb, cancel_callback=cancel_callback)
except CancelledError:
# Either the task group or all tasks were cancelled, so this meta tasks is not supposed to spawn any
# more tasks and can return immediately. (This means we drop `arg_iter` without consuming it fully.)
log.debug("Cancelled group '%s' after %s tasks have been spawned", group_name, i)
coroutine.close()
if semaphore_acquired:
semaphore.release()
return
async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates tasks in the pool with arguments from the supplied iterable.
@ -785,6 +753,9 @@ class TaskPool(BaseTaskPool):
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
If the entire task group is cancelled, the meta task is cancelled first, which means that `arg_iter` may be
abandoned before being fully consumed (if that is even possible).
Args:
group_name:
Name of the task group to add the new tasks to. It must be a name that doesn't exist yet.
@ -812,14 +783,13 @@ class TaskPool(BaseTaskPool):
raise ValueError("`num_concurrent` must be a positive integer.")
if group_name in self._task_groups.keys():
raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!")
self._task_groups[group_name] = group_reg = TaskGroupRegister()
async with group_reg:
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)))
self._task_groups[group_name] = TaskGroupRegister()
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)))
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
@ -838,6 +808,9 @@ class TaskPool(BaseTaskPool):
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
If the entire task group is cancelled, the meta task is cancelled first, which means that `arg_iter` may be
abandoned before being fully consumed (if that is even possible).
Args:
func:
The coroutine function to use for spawning the new tasks within the task pool.
@ -847,6 +820,8 @@ class TaskPool(BaseTaskPool):
The number new tasks spawned by this method to run concurrently. Defaults to 1.
group_name (optional):
Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet.
By default, a unique name is constructed in the form :code:`'map-{name}-group-{idx}'`
(with `name` being the name of the `func` and `idx` being an incrementing index).
end_callback (optional):
A callback to execute after a task has ended.
It is run with the task's ID as its only positional argument.
@ -855,7 +830,7 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument.
Returns:
The name of the task group that the newly spawned tasks will be added to.
The name of the newly created group (see the `group_name` parameter).
Raises:
`PoolIsClosed`: The pool is closed.
@ -866,35 +841,42 @@ class TaskPool(BaseTaskPool):
"""
if group_name is None:
group_name = self._generate_group_name('map', func)
await self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name
async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
as positional arguments to the function.
Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`.
Returns:
The name of the newly created group in the form :code:`'starmap-{name}-group-{index}'`
(with `name` being the name of the `func` and `idx` being an incrementing index).
"""
if group_name is None:
group_name = self._generate_group_name('starmap', func)
await self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name
async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None,
cancel_callback: CancelCB = None) -> str:
def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be
unpacked as keyword-arguments to the function.
Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.
Returns:
The name of the newly created group in the form :code:`'doublestarmap-{name}-group-{index}'`
(with `name` being the name of the `func` and `idx` being an incrementing index).
"""
if group_name is None:
group_name = self._generate_group_name('doublestarmap', func)
await self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name
@ -949,6 +931,7 @@ class SimpleTaskPool(BaseTaskPool):
self._kwargs: KwArgsT = kwargs if kwargs is not None else {}
self._end_callback: EndCB = end_callback
self._cancel_callback: CancelCB = cancel_callback
self._start_calls: int = 0
super().__init__(pool_size=pool_size, name=name)
@property
@ -956,26 +939,41 @@ class SimpleTaskPool(BaseTaskPool):
"""Name of the coroutine function used in the pool."""
return self._func.__name__
async def _start_one(self) -> int:
"""Starts a single new task within the pool and returns its ID."""
return await self._start_task(self._func(*self._args, **self._kwargs),
end_callback=self._end_callback, cancel_callback=self._cancel_callback)
async def _start_num(self, num: int, group_name: str) -> None:
"""Starts `num` new tasks in group `group_name`."""
start_coroutines = (
self._start_task(self._func(*self._args, **self._kwargs), group_name=group_name,
end_callback=self._end_callback, cancel_callback=self._cancel_callback)
for _ in range(num)
)
# TODO: Same deal as with the other meta tasks, provide proper cancellation handling!
await gather(*start_coroutines)
async def start(self, num: int) -> List[int]:
def start(self, num: int) -> str:
"""
Starts specified number of new tasks in the pool and returns their IDs.
Starts specified number of new tasks in the pool as a new group.
This method may block if there is less room in the pool than the desired number of new tasks.
Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
because this method returns immediately, this does not mean that any task was started or that any number of
tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num`.
If the entire task group is cancelled before `num` tasks have spawned, since the meta task is cancelled first,
the number of tasks spawned will end up being less than `num`.
Args:
num: The number of new tasks to start.
Returns:
List of IDs of the new tasks that have been started (not necessarily in the order they were started).
The name of the newly created task group in the form :code:`'start-group-{idx}'`
(with `idx` being an incrementing index).
"""
ids = await gather(*(self._start_one() for _ in range(num)))
assert isinstance(ids, list) # for PyCharm
return ids
self._check_start(function=self._func)
group_name = f'start-group-{self._start_calls}'
self._start_calls += 1
self._task_groups.setdefault(group_name, TaskGroupRegister())
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._start_num(num, group_name)))
return group_name
def stop(self, num: int) -> List[int]:
"""

View File

@ -20,13 +20,11 @@ Unittests for the `asyncio_taskpool.pool` module.
from asyncio.exceptions import CancelledError
from asyncio.locks import Semaphore
from datetime import datetime
from unittest import IsolatedAsyncioTestCase
from unittest.mock import PropertyMock, MagicMock, AsyncMock, patch, call
from typing import Type
from asyncio_taskpool import pool, exceptions
from asyncio_taskpool.internals.constants import DATETIME_FORMAT
EMPTY_LIST, EMPTY_DICT, EMPTY_SET = [], {}, set()
@ -95,6 +93,9 @@ class BaseTaskPoolTestCase(CommonTestCase):
self.assertIsInstance(self.task_pool._enough_room, Semaphore)
self.assertDictEqual(EMPTY_DICT, self.task_pool._task_groups)
self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running)
self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled)
self.assertEqual(self.mock_idx, self.task_pool._idx)
self.mock__add_pool.assert_called_once_with(self.task_pool)
@ -318,86 +319,6 @@ class BaseTaskPoolTestCase(CommonTestCase):
mock__get_running_task.assert_has_calls([call(task_id1), call(task_id2), call(task_id3)])
mock_cancel.assert_has_calls([call(msg=FOO), call(msg=FOO), call(msg=FOO)])
def test__cancel_and_remove_all_from_group(self):
task_id = 555
mock_cancel = MagicMock()
self.task_pool._tasks_running[task_id] = MagicMock(cancel=mock_cancel)
class MockRegister(set, MagicMock):
pass
self.assertIsNone(self.task_pool._cancel_and_remove_all_from_group(' ', MockRegister({task_id, 'x'}), msg=FOO))
mock_cancel.assert_called_once_with(msg=FOO)
@patch.object(pool.BaseTaskPool, '_cancel_and_remove_all_from_group')
async def test_cancel_group(self, mock__cancel_and_remove_all_from_group: MagicMock):
mock_grp_aenter, mock_grp_aexit = AsyncMock(), AsyncMock()
mock_group_reg = MagicMock(__aenter__=mock_grp_aenter, __aexit__=mock_grp_aexit)
self.task_pool._task_groups[FOO] = mock_group_reg
with self.assertRaises(exceptions.InvalidGroupName):
await self.task_pool.cancel_group(BAR)
mock__cancel_and_remove_all_from_group.assert_not_called()
mock_grp_aenter.assert_not_called()
mock_grp_aexit.assert_not_called()
self.assertIsNone(await self.task_pool.cancel_group(FOO, msg=BAR))
mock__cancel_and_remove_all_from_group.assert_called_once_with(FOO, mock_group_reg, msg=BAR)
mock_grp_aenter.assert_awaited_once_with()
mock_grp_aexit.assert_awaited_once()
@patch.object(pool.BaseTaskPool, '_cancel_and_remove_all_from_group')
async def test_cancel_all(self, mock__cancel_and_remove_all_from_group: MagicMock):
mock_grp_aenter, mock_grp_aexit = AsyncMock(), AsyncMock()
mock_group_reg = MagicMock(__aenter__=mock_grp_aenter, __aexit__=mock_grp_aexit)
self.task_pool._task_groups[BAR] = mock_group_reg
self.assertIsNone(await self.task_pool.cancel_all(FOO))
mock__cancel_and_remove_all_from_group.assert_called_once_with(BAR, mock_group_reg, msg=FOO)
mock_grp_aenter.assert_awaited_once_with()
mock_grp_aexit.assert_awaited_once()
async def test_flush(self):
mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception)
self.task_pool._tasks_ended = {123: mock_ended_func()}
self.task_pool._tasks_cancelled = {456: mock_cancelled_func()}
self.assertIsNone(await self.task_pool.flush(return_exceptions=True))
mock_ended_func.assert_awaited_once_with()
mock_cancelled_func.assert_awaited_once_with()
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_ended)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_cancelled)
async def test_gather_and_close(self):
mock_running_func = AsyncMock()
mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception)
self.task_pool._tasks_ended = {123: mock_ended_func()}
self.task_pool._tasks_cancelled = {456: mock_cancelled_func()}
self.task_pool._tasks_running = {789: mock_running_func()}
self.task_pool._locked = True
self.assertIsNone(await self.task_pool.gather_and_close(return_exceptions=True))
mock_ended_func.assert_awaited_once_with()
mock_cancelled_func.assert_awaited_once_with()
mock_running_func.assert_awaited_once_with()
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_ended)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_cancelled)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_running)
self.assertTrue(self.task_pool._closed)
class TaskPoolTestCase(CommonTestCase):
TEST_CLASS = pool.TaskPool
task_pool: pool.TaskPool
def setUp(self) -> None:
self.base_class_init_patcher = patch.object(pool.BaseTaskPool, '__init__')
self.base_class_init = self.base_class_init_patcher.start()
super().setUp()
def tearDown(self) -> None:
self.base_class_init_patcher.stop()
super().tearDown()
def test_init(self):
self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running)
self.base_class_init.assert_called_once_with(pool_size=self.TEST_POOL_SIZE, name=self.TEST_POOL_NAME)
def test__cancel_group_meta_tasks(self):
mock_task1, mock_task2 = MagicMock(), MagicMock()
self.task_pool._group_meta_tasks_running[BAR] = {mock_task1, mock_task2}
@ -413,26 +334,41 @@ class TaskPoolTestCase(CommonTestCase):
mock_task1.cancel.assert_called_once_with()
mock_task2.cancel.assert_called_once_with()
@patch.object(pool.BaseTaskPool, '_cancel_group_meta_tasks')
def test__cancel_and_remove_all_from_group(self, mock__cancel_group_meta_tasks: MagicMock):
task_id = 555
mock_cancel = MagicMock()
def add_mock_task_to_running(_):
self.task_pool._tasks_running[task_id] = MagicMock(cancel=mock_cancel)
# We add the fake task to the `_tasks_running` dictionary as a side effect of calling the mocked method,
# to verify that it is called first, before the cancellation loop starts.
mock__cancel_group_meta_tasks.side_effect = add_mock_task_to_running
class MockRegister(set, MagicMock):
pass
self.assertIsNone(self.task_pool._cancel_and_remove_all_from_group(' ', MockRegister({task_id, 'x'}), msg=FOO))
mock_cancel.assert_called_once_with(msg=FOO)
@patch.object(pool.BaseTaskPool, '_cancel_and_remove_all_from_group')
@patch.object(pool.TaskPool, '_cancel_group_meta_tasks')
def test__cancel_and_remove_all_from_group(self, mock__cancel_group_meta_tasks: MagicMock,
mock_base__cancel_and_remove_all_from_group: MagicMock):
group_name, group_reg, msg = 'xyz', MagicMock(), FOO
self.assertIsNone(self.task_pool._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg))
mock__cancel_group_meta_tasks.assert_called_once_with(group_name)
mock_base__cancel_and_remove_all_from_group.assert_called_once_with(group_name, group_reg, msg=msg)
def test_cancel_group(self, mock__cancel_and_remove_all_from_group: MagicMock):
self.task_pool._task_groups[FOO] = mock_group_reg = MagicMock()
with self.assertRaises(exceptions.InvalidGroupName):
self.task_pool.cancel_group(BAR)
mock__cancel_and_remove_all_from_group.assert_not_called()
self.assertIsNone(self.task_pool.cancel_group(FOO, msg=BAR))
self.assertDictEqual(EMPTY_DICT, self.task_pool._task_groups)
mock__cancel_and_remove_all_from_group.assert_called_once_with(FOO, mock_group_reg, msg=BAR)
@patch.object(pool.BaseTaskPool, 'cancel_group')
async def test_cancel_group(self, mock_base_cancel_group: AsyncMock):
group_name, msg = 'abc', 'xyz'
await self.task_pool.cancel_group(group_name, msg=msg)
mock_base_cancel_group.assert_awaited_once_with(group_name=group_name, msg=msg)
@patch.object(pool.BaseTaskPool, 'cancel_all')
async def test_cancel_all(self, mock_base_cancel_all: AsyncMock):
msg = 'xyz'
await self.task_pool.cancel_all(msg=msg)
mock_base_cancel_all.assert_awaited_once_with(msg=msg)
@patch.object(pool.BaseTaskPool, '_cancel_and_remove_all_from_group')
def test_cancel_all(self, mock__cancel_and_remove_all_from_group: MagicMock):
mock_group_reg = MagicMock()
self.task_pool._task_groups = {FOO: mock_group_reg, BAR: mock_group_reg}
self.assertIsNone(self.task_pool.cancel_all('msg'))
mock__cancel_and_remove_all_from_group.assert_has_calls([
call(BAR, mock_group_reg, msg='msg'),
call(FOO, mock_group_reg, msg='msg')
])
def test__pop_ended_meta_tasks(self):
mock_task, mock_done_task1 = MagicMock(done=lambda: False), MagicMock(done=lambda: True)
@ -444,104 +380,153 @@ class TaskPoolTestCase(CommonTestCase):
self.assertSetEqual(expected_output, output)
self.assertDictEqual({FOO: {mock_task}}, self.task_pool._group_meta_tasks_running)
@patch.object(pool.TaskPool, '_pop_ended_meta_tasks')
@patch.object(pool.BaseTaskPool, 'flush')
async def test_flush(self, mock_base_flush: AsyncMock, mock__pop_ended_meta_tasks: MagicMock):
@patch.object(pool.BaseTaskPool, '_pop_ended_meta_tasks')
async def test_flush(self, mock__pop_ended_meta_tasks: MagicMock):
# Meta tasks:
mock_ended_meta_task = AsyncMock()
mock__pop_ended_meta_tasks.return_value = {mock_ended_meta_task()}
mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError)
self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()}
self.assertIsNone(await self.task_pool.flush(return_exceptions=False))
mock_base_flush.assert_awaited_once_with(return_exceptions=False)
# Actual tasks:
mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception)
self.task_pool._tasks_ended = {123: mock_ended_func()}
self.task_pool._tasks_cancelled = {456: mock_cancelled_func()}
self.assertIsNone(await self.task_pool.flush(return_exceptions=True))
# Meta tasks:
mock__pop_ended_meta_tasks.assert_called_once_with()
mock_ended_meta_task.assert_awaited_once_with()
mock_cancelled_meta_task.assert_awaited_once_with()
self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled)
# Actual tasks:
mock_ended_func.assert_awaited_once_with()
mock_cancelled_func.assert_awaited_once_with()
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_ended)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_cancelled)
@patch.object(pool.BaseTaskPool, 'gather_and_close')
async def test_gather_and_close(self, mock_base_gather_and_close: AsyncMock):
@patch.object(pool.BaseTaskPool, 'lock')
async def test_gather_and_close(self, mock_lock: MagicMock):
# Meta tasks:
mock_meta_task1, mock_meta_task2 = AsyncMock(), AsyncMock()
self.task_pool._group_meta_tasks_running = {FOO: {mock_meta_task1()}, BAR: {mock_meta_task2()}}
mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError)
self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()}
# Actual tasks:
mock_running_func = AsyncMock()
mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception)
self.task_pool._tasks_ended = {123: mock_ended_func()}
self.task_pool._tasks_cancelled = {456: mock_cancelled_func()}
self.task_pool._tasks_running = {789: mock_running_func()}
self.assertIsNone(await self.task_pool.gather_and_close(return_exceptions=True))
mock_base_gather_and_close.assert_awaited_once_with(return_exceptions=True)
mock_lock.assert_called_once_with()
# Meta tasks:
mock_meta_task1.assert_awaited_once_with()
mock_meta_task2.assert_awaited_once_with()
mock_cancelled_meta_task.assert_awaited_once_with()
self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running)
self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled)
# Actual tasks:
mock_ended_func.assert_awaited_once_with()
mock_cancelled_func.assert_awaited_once_with()
mock_running_func.assert_awaited_once_with()
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_ended)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_cancelled)
self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_running)
self.assertTrue(self.task_pool._closed)
@patch.object(pool, 'datetime')
def test__generate_group_name(self, mock_datetime: MagicMock):
class TaskPoolTestCase(CommonTestCase):
TEST_CLASS = pool.TaskPool
task_pool: pool.TaskPool
def test__generate_group_name(self):
prefix, func = 'x y z', AsyncMock(__name__=BAR)
dt = datetime(1776, 7, 4, 0, 0, 1)
mock_datetime.now = MagicMock(return_value=dt)
expected_output = f'{prefix}_{BAR}_{dt.strftime(DATETIME_FORMAT)}'
output = pool.TaskPool._generate_group_name(prefix, func)
base_name = f'{prefix}-{BAR}-group'
self.task_pool._task_groups = {
f'{base_name}-0': MagicMock(),
f'{base_name}-1': MagicMock(),
f'{base_name}-100': MagicMock(),
}
expected_output = f'{base_name}-2'
output = self.task_pool._generate_group_name(prefix, func)
self.assertEqual(expected_output, output)
@patch.object(pool.TaskPool, '_start_task')
async def test__apply_num(self, mock__start_task: AsyncMock):
group_name = FOO + BAR
mock_awaitable = object()
mock_func = MagicMock(return_value=mock_awaitable)
args, kwargs, num = (FOO, BAR), {'a': 1, 'b': 2}, 3
async def test__apply_spawner(self, mock__start_task: AsyncMock):
grp_name = FOO + BAR
mock_awaitable1, mock_awaitable2 = object(), object()
mock_func = MagicMock(side_effect=[mock_awaitable1, Exception(), mock_awaitable2], __name__='func')
args, kw, num = (FOO, BAR), {'a': 1, 'b': 2}, 3
end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._apply_num(group_name, mock_func, args, kwargs, num, end_cb, cancel_cb))
mock_func.assert_has_calls(3 * [call(*args, **kwargs)])
mock__start_task.assert_has_awaits(3 * [
call(mock_awaitable, group_name=group_name, end_callback=end_cb, cancel_callback=cancel_cb)
self.assertIsNone(await self.task_pool._apply_spawner(grp_name, mock_func, args, kw, num, end_cb, cancel_cb))
mock_func.assert_has_calls(num * [call(*args, **kw)])
mock__start_task.assert_has_awaits([
call(mock_awaitable1, group_name=grp_name, end_callback=end_cb, cancel_callback=cancel_cb),
call(mock_awaitable2, group_name=grp_name, end_callback=end_cb, cancel_callback=cancel_cb),
])
mock_func.reset_mock()
mock_func.reset_mock(side_effect=True)
mock__start_task.reset_mock()
self.assertIsNone(await self.task_pool._apply_num(group_name, mock_func, args, None, num, end_cb, cancel_cb))
mock_func.assert_has_calls(num * [call(*args)])
mock__start_task.assert_has_awaits(num * [
call(mock_awaitable, group_name=group_name, end_callback=end_cb, cancel_callback=cancel_cb)
# Simulate cancellation while the second task is being started.
mock__start_task.side_effect = [None, CancelledError, None]
mock_coroutine_to_close = MagicMock()
mock_func.side_effect = [mock_awaitable1, mock_coroutine_to_close, 'never called']
self.assertIsNone(await self.task_pool._apply_spawner(grp_name, mock_func, args, None, num, end_cb, cancel_cb))
mock_func.assert_has_calls(2 * [call(*args)])
mock__start_task.assert_has_awaits([
call(mock_awaitable1, group_name=grp_name, end_callback=end_cb, cancel_callback=cancel_cb),
call(mock_coroutine_to_close, group_name=grp_name, end_callback=end_cb, cancel_callback=cancel_cb),
])
mock_coroutine_to_close.close.assert_called_once_with()
@patch.object(pool, 'create_task')
@patch.object(pool.TaskPool, '_apply_num', new_callable=MagicMock())
@patch.object(pool.TaskPool, '_apply_spawner', new_callable=MagicMock())
@patch.object(pool, 'TaskGroupRegister')
@patch.object(pool.TaskPool, '_generate_group_name')
@patch.object(pool.BaseTaskPool, '_check_start')
async def test_apply(self, mock__check_start: MagicMock, mock__generate_group_name: MagicMock,
mock_reg_cls: MagicMock, mock__apply_num: MagicMock, mock_create_task: MagicMock):
def test_apply(self, mock__check_start: MagicMock, mock__generate_group_name: MagicMock,
mock_reg_cls: MagicMock, mock__apply_spawner: MagicMock, mock_create_task: MagicMock):
mock__generate_group_name.return_value = generated_name = 'name 123'
mock_group_reg = set_up_mock_group_register(mock_reg_cls)
mock__apply_num.return_value = mock_apply_coroutine = object()
mock__apply_spawner.return_value = mock_apply_coroutine = object()
mock_create_task.return_value = fake_task = object()
mock_func, num, group_name = MagicMock(), 3, FOO + BAR
args, kwargs = (FOO, BAR), {'a': 1, 'b': 2}
end_cb, cancel_cb = MagicMock(), MagicMock()
self.task_pool._task_groups = {group_name: 'causes error'}
with self.assertRaises(exceptions.InvalidGroupName):
self.task_pool.apply(mock_func, args, kwargs, num, group_name, end_cb, cancel_cb)
mock__check_start.assert_called_once_with(function=mock_func)
mock__apply_spawner.assert_not_called()
mock_create_task.assert_not_called()
mock__check_start.reset_mock()
self.task_pool._task_groups = {}
def check_assertions(_group_name, _output):
self.assertEqual(_group_name, _output)
mock__check_start.assert_called_once_with(function=mock_func)
self.assertEqual(mock_group_reg, self.task_pool._task_groups[_group_name])
mock_group_reg.__aenter__.assert_awaited_once_with()
mock__apply_num.assert_called_once_with(_group_name, mock_func, args, kwargs, num,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__apply_spawner.assert_called_once_with(_group_name, mock_func, args, kwargs, num,
end_callback=end_cb, cancel_callback=cancel_cb)
mock_create_task.assert_called_once_with(mock_apply_coroutine)
mock_group_reg.__aexit__.assert_awaited_once()
self.assertSetEqual({fake_task}, self.task_pool._group_meta_tasks_running[group_name])
output = await self.task_pool.apply(mock_func, args, kwargs, num, group_name, end_cb, cancel_cb)
output = self.task_pool.apply(mock_func, args, kwargs, num, group_name, end_cb, cancel_cb)
check_assertions(group_name, output)
mock__generate_group_name.assert_not_called()
mock__check_start.reset_mock()
self.task_pool._task_groups.clear()
mock_group_reg.__aenter__.reset_mock()
mock__apply_num.reset_mock()
mock__apply_spawner.reset_mock()
mock_create_task.reset_mock()
mock_group_reg.__aexit__.reset_mock()
output = await self.task_pool.apply(mock_func, args, kwargs, num, None, end_cb, cancel_cb)
output = self.task_pool.apply(mock_func, args, kwargs, num, None, end_cb, cancel_cb)
check_assertions(generated_name, output)
mock__generate_group_name.assert_called_once_with('apply', mock_func)
@ -563,20 +548,20 @@ class TaskPoolTestCase(CommonTestCase):
n = 2
mock_semaphore_cls.return_value = semaphore = Semaphore(n)
mock__get_map_end_callback.return_value = map_cb = MagicMock()
awaitable = 'totally an awaitable'
mock_star_function.side_effect = [awaitable, Exception(), awaitable]
awaitable1, awaitable2 = 'totally an awaitable', object()
mock_star_function.side_effect = [awaitable1, Exception(), awaitable2]
arg1, arg2, bad = 123456789, 'function argument', None
args = [arg1, bad, arg2]
group_name, mock_func, stars = 'whatever', MagicMock(__name__="mock"), 3
grp_name, mock_func, stars = 'whatever', MagicMock(__name__="mock"), 3
end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._arg_consumer(group_name, n, mock_func, args, stars, end_cb, cancel_cb))
# We expect the semaphore to be acquired 2 times, then be released once after the exception occurs, then
# acquired once more is reached. Since we initialized it with a value of 2, we expect it be locked.
self.assertIsNone(await self.task_pool._arg_consumer(grp_name, n, mock_func, args, stars, end_cb, cancel_cb))
# We initialized the semaphore with a value of 2. It should have been acquired twice. We expect it be locked.
self.assertTrue(semaphore.locked())
mock_semaphore_cls.assert_called_once_with(n)
mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb)
mock__start_task.assert_has_awaits(2 * [
call(awaitable, group_name=group_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb)
mock__start_task.assert_has_awaits([
call(awaitable1, group_name=grp_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb),
call(awaitable2, group_name=grp_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb),
])
mock_star_function.assert_has_calls([
call(mock_func, arg1, arg_stars=stars),
@ -587,24 +572,57 @@ class TaskPoolTestCase(CommonTestCase):
mock_semaphore_cls.reset_mock()
mock__get_map_end_callback.reset_mock()
mock__start_task.reset_mock()
mock_star_function.reset_mock()
mock_star_function.reset_mock(side_effect=True)
# With a CancelledError thrown while starting a task:
mock_semaphore_cls.return_value = semaphore = Semaphore(1)
mock_star_function.side_effect = CancelledError()
self.assertIsNone(await self.task_pool._arg_consumer(group_name, n, mock_func, args, stars, end_cb, cancel_cb))
self.assertFalse(semaphore.locked())
# With a CancelledError thrown while acquiring the semaphore:
mock_acquire = AsyncMock(side_effect=[True, CancelledError])
mock_semaphore_cls.return_value = mock_semaphore = MagicMock(acquire=mock_acquire)
mock_star_function.return_value = mock_coroutine = MagicMock()
arg_it = iter(arg for arg in (arg1, arg2, FOO))
self.assertIsNone(await self.task_pool._arg_consumer(grp_name, n, mock_func, arg_it, stars, end_cb, cancel_cb))
mock_semaphore_cls.assert_called_once_with(n)
mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb)
mock__start_task.assert_not_called()
mock_star_function.assert_called_once_with(mock_func, arg1, arg_stars=stars)
mock__get_map_end_callback.assert_called_once_with(mock_semaphore, actual_end_callback=end_cb)
mock_star_function.assert_has_calls([
call(mock_func, arg1, arg_stars=stars),
call(mock_func, arg2, arg_stars=stars)
])
mock_acquire.assert_has_awaits([call(), call()])
mock__start_task.assert_awaited_once_with(mock_coroutine, group_name=grp_name, ignore_lock=True,
end_callback=map_cb, cancel_callback=cancel_cb)
mock_coroutine.close.assert_called_once_with()
mock_semaphore.release.assert_not_called()
self.assertEqual(FOO, next(arg_it))
mock_acquire.reset_mock(side_effect=True)
mock_semaphore_cls.reset_mock()
mock__get_map_end_callback.reset_mock()
mock__start_task.reset_mock()
mock_star_function.reset_mock(side_effect=True)
# With a CancelledError thrown while starting the task:
mock__start_task.side_effect = [None, CancelledError]
arg_it = iter(arg for arg in (arg1, arg2, FOO))
self.assertIsNone(await self.task_pool._arg_consumer(grp_name, n, mock_func, arg_it, stars, end_cb, cancel_cb))
mock_semaphore_cls.assert_called_once_with(n)
mock__get_map_end_callback.assert_called_once_with(mock_semaphore, actual_end_callback=end_cb)
mock_star_function.assert_has_calls([
call(mock_func, arg1, arg_stars=stars),
call(mock_func, arg2, arg_stars=stars)
])
mock_acquire.assert_has_awaits([call(), call()])
mock__start_task.assert_has_awaits(2 * [
call(mock_coroutine, group_name=grp_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb)
])
mock_coroutine.close.assert_called_once_with()
mock_semaphore.release.assert_called_once_with()
self.assertEqual(FOO, next(arg_it))
@patch.object(pool, 'create_task')
@patch.object(pool.TaskPool, '_arg_consumer', new_callable=MagicMock)
@patch.object(pool, 'TaskGroupRegister')
@patch.object(pool.BaseTaskPool, '_check_start')
async def test__map(self, mock__check_start: MagicMock, mock_reg_cls: MagicMock, mock__arg_consumer: MagicMock,
mock_create_task: MagicMock):
def test__map(self, mock__check_start: MagicMock, mock_reg_cls: MagicMock, mock__arg_consumer: MagicMock,
mock_create_task: MagicMock):
mock_group_reg = set_up_mock_group_register(mock_reg_cls)
mock__arg_consumer.return_value = fake_consumer = object()
mock_create_task.return_value = fake_task = object()
@ -614,7 +632,7 @@ class TaskPoolTestCase(CommonTestCase):
end_cb, cancel_cb = MagicMock(), MagicMock()
with self.assertRaises(ValueError):
await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
mock__check_start.assert_called_once_with(function=func)
mock__check_start.reset_mock()
@ -623,82 +641,80 @@ class TaskPoolTestCase(CommonTestCase):
self.task_pool._task_groups = {group_name: MagicMock()}
with self.assertRaises(exceptions.InvalidGroupName):
await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
mock__check_start.assert_called_once_with(function=func)
mock__check_start.reset_mock()
self.task_pool._task_groups.clear()
self.assertIsNone(await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb))
self.assertIsNone(self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb))
mock__check_start.assert_called_once_with(function=func)
mock_reg_cls.assert_called_once_with()
self.task_pool._task_groups[group_name] = mock_group_reg
mock_group_reg.__aenter__.assert_awaited_once_with()
mock__arg_consumer.assert_called_once_with(group_name, n, func, arg_iter, stars,
end_callback=end_cb, cancel_callback=cancel_cb)
mock_create_task.assert_called_once_with(fake_consumer)
self.assertSetEqual({fake_task}, self.task_pool._group_meta_tasks_running[group_name])
mock_group_reg.__aexit__.assert_awaited_once()
@patch.object(pool.TaskPool, '_map')
@patch.object(pool.TaskPool, '_generate_group_name')
async def test_map(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
def test_map(self, mock__generate_group_name: MagicMock, mock__map: MagicMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock()
arg_iter, num_concurrent, group_name = (FOO, BAR, 1, 2, 3), 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, group_name, end_cb, cancel_cb)
output = self.task_pool.map(mock_func, arg_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(group_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called()
mock__map.reset_mock()
output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, None, end_cb, cancel_cb)
output = self.task_pool.map(mock_func, arg_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(generated_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('map', mock_func)
@patch.object(pool.TaskPool, '_map')
@patch.object(pool.TaskPool, '_generate_group_name')
async def test_starmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
def test_starmap(self, mock__generate_group_name: MagicMock, mock__map: MagicMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock()
args_iter, num_concurrent, group_name = ([FOO], [BAR]), 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, group_name, end_cb, cancel_cb)
output = self.task_pool.starmap(mock_func, args_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(group_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called()
mock__map.reset_mock()
output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, None, end_cb, cancel_cb)
output = self.task_pool.starmap(mock_func, args_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(generated_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('starmap', mock_func)
@patch.object(pool.TaskPool, '_map')
@patch.object(pool.TaskPool, '_generate_group_name')
async def test_doublestarmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
async def test_doublestarmap(self, mock__generate_group_name: MagicMock, mock__map: MagicMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock()
kw_iter, num_concurrent, group_name = [{'a': FOO}, {'a': BAR}], 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, group_name, end_cb, cancel_cb)
output = self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(group_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called()
mock__map.reset_mock()
output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, None, end_cb, cancel_cb)
output = self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__map.assert_called_once_with(generated_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('doublestarmap', mock_func)
@ -728,6 +744,7 @@ class SimpleTaskPoolTestCase(CommonTestCase):
def tearDown(self) -> None:
self.base_class_init_patcher.stop()
super().tearDown()
def test_init(self):
self.assertEqual(self.TEST_POOL_FUNC, self.task_pool._func)
@ -744,23 +761,42 @@ class SimpleTaskPoolTestCase(CommonTestCase):
self.assertEqual(self.TEST_POOL_FUNC.__name__, self.task_pool.func_name)
@patch.object(pool.SimpleTaskPool, '_start_task')
async def test__start_one(self, mock__start_task: AsyncMock):
mock__start_task.return_value = expected_output = 99
self.task_pool._func = MagicMock(return_value=BAR)
output = await self.task_pool._start_one()
self.assertEqual(expected_output, output)
self.task_pool._func.assert_called_once_with(*self.task_pool._args, **self.task_pool._kwargs)
mock__start_task.assert_awaited_once_with(BAR, end_callback=self.task_pool._end_callback,
cancel_callback=self.task_pool._cancel_callback)
async def test__start_num(self, mock__start_task: AsyncMock):
fake_coroutine = object()
self.task_pool._func = MagicMock(return_value=fake_coroutine)
num = 3
group_name = FOO + BAR + 'abc'
self.assertIsNone(await self.task_pool._start_num(num, group_name))
self.task_pool._func.assert_has_calls(num * [
call(*self.task_pool._args, **self.task_pool._kwargs)
])
mock__start_task.assert_has_awaits(num * [
call(fake_coroutine, group_name=group_name, end_callback=self.task_pool._end_callback,
cancel_callback=self.task_pool._cancel_callback)
])
@patch.object(pool.SimpleTaskPool, '_start_one')
async def test_start(self, mock__start_one: AsyncMock):
mock__start_one.return_value = FOO
@patch.object(pool, 'create_task')
@patch.object(pool.SimpleTaskPool, '_start_num', new_callable=MagicMock())
@patch.object(pool, 'TaskGroupRegister')
@patch.object(pool.BaseTaskPool, '_check_start')
def test_start(self, mock__check_start: MagicMock, mock_reg_cls: MagicMock, mock__start_num: AsyncMock,
mock_create_task: MagicMock):
mock_group_reg = set_up_mock_group_register(mock_reg_cls)
mock__start_num.return_value = mock_start_num_coroutine = object()
mock_create_task.return_value = fake_task = object()
self.task_pool._task_groups = {}
self.task_pool._group_meta_tasks_running = {}
num = 5
output = await self.task_pool.start(num)
expected_output = num * [FOO]
self.assertListEqual(expected_output, output)
mock__start_one.assert_has_awaits(num * [call()])
self.task_pool._start_calls = 42
expected_group_name = 'start-group-42'
output = self.task_pool.start(num)
self.assertEqual(expected_group_name, output)
mock__check_start.assert_called_once_with(function=self.TEST_POOL_FUNC)
self.assertEqual(43, self.task_pool._start_calls)
self.assertEqual(mock_group_reg, self.task_pool._task_groups[expected_group_name])
mock__start_num.assert_called_once_with(num, expected_group_name)
mock_create_task.assert_called_once_with(mock_start_num_coroutine)
self.assertSetEqual({fake_task}, self.task_pool._group_meta_tasks_running[expected_group_name])
@patch.object(pool.SimpleTaskPool, 'cancel')
def test_stop(self, mock_cancel: MagicMock):

View File

@ -39,9 +39,9 @@ async def work(n: int) -> None:
async def main() -> None:
pool = SimpleTaskPool(work, args=(5,)) # initializes the pool; no work is being done yet
await pool.start(3) # launches work tasks 0, 1, and 2
pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5) # lets the tasks work for a bit
await pool.start(1) # launches work task 3
pool.start(1) # launches work task 3
await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.stop(2) # cancels tasks 3 and 2 (LIFO order)
await pool.gather_and_close() # awaits all tasks, then flushes the pool
@ -122,7 +122,7 @@ async def main() -> None:
pool = TaskPool(3)
# Queue up two tasks (IDs 0 and 1) to run concurrently (with the same keyword-arguments).
print("> Called `apply`")
await pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2)
pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2)
# Let the tasks work for a bit.
await asyncio.sleep(1.5)
# Now, let us enqueue four more tasks (which will receive IDs 2, 3, 4, and 5), each created with different
@ -134,7 +134,7 @@ async def main() -> None:
# Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5)
# only once there is room in the pool and no more than one other task of these new ones is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
await pool.starmap(other_work, args_list, num_concurrent=2)
pool.starmap(other_work, args_list, num_concurrent=2)
print("> Called `starmap`")
# We block, until all tasks have ended.
print("> Calling `gather_and_close`...")

View File

@ -67,7 +67,7 @@ async def main() -> None:
for item in range(100):
q.put_nowait(item)
pool = SimpleTaskPool(worker, args=(q,)) # initializes the pool
await pool.start(3) # launches three worker tasks
pool.start(3) # launches three worker tasks
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever()
# We block until `.task_done()` has been called once by our workers for every item placed into the queue.
await q.join()