Compare commits

...

2 Commits

5 changed files with 110 additions and 186 deletions

View File

@ -96,9 +96,9 @@ When you are dealing with a regular :py:class:`TaskPool <asyncio_taskpool.pool.T
.. code-block:: none .. code-block:: none
> map mypackage.mymodule.worker ['x','y','z'] -g 3 > map mypackage.mymodule.worker ['x','y','z'] -n 3
The :code:`-g` is a shorthand for :code:`--group-size` in this case. In general, all (public) pool methods will have a corresponding command in the control session. The :code:`-n` is a shorthand for :code:`--num-concurrent` in this case. In general, all (public) pool methods will have a corresponding command in the control session.
.. note:: .. note::

View File

@ -46,7 +46,7 @@ Let's take a look at an example. Say you have a coroutine function that takes tw
async def queue_worker_function(in_queue: Queue, out_queue: Queue) -> None: async def queue_worker_function(in_queue: Queue, out_queue: Queue) -> None:
while True: while True:
item = await in_queue.get() item = await in_queue.get()
... # Do some work on the item amd arrive at a result. ... # Do some work on the item and arrive at a result.
await out_queue.put(result) await out_queue.put(result)
How would we go about concurrently executing this function, say 5 times? There are (as always) a number of ways to do this with :code:`asyncio`. If we want to use tasks and be clean about it, we can do it like this: How would we go about concurrently executing this function, say 5 times? There are (as always) a number of ways to do this with :code:`asyncio`. If we want to use tasks and be clean about it, we can do it like this:
@ -141,7 +141,7 @@ Or we could use a task pool:
async def main(): async def main():
... ...
pool = TaskPool() pool = TaskPool()
await pool.map(another_worker_function, data_iterator, group_size=5) await pool.map(another_worker_function, data_iterator, num_concurrent=5)
... ...
pool.lock() pool.lock()
await pool.gather_and_close() await pool.gather_and_close()
@ -231,5 +231,6 @@ One method to be aware of is :py:meth:`.flush() <asyncio_taskpool.pool.BaseTaskP
In general, the act of adding tasks to a pool is non-blocking, no matter which particular methods are used. The only notable exception is when a limit on the pool size has been set and there is "not enough room" to add a task. In this case, both :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>` and :py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased). In general, the act of adding tasks to a pool is non-blocking, no matter which particular methods are used. The only notable exception is when a limit on the pool size has been set and there is "not enough room" to add a task. In this case, both :py:meth:`SimpleTaskPool.start() <asyncio_taskpool.pool.SimpleTaskPool.start>` and :py:meth:`TaskPool.apply() <asyncio_taskpool.pool.TaskPool.apply>` will block until the desired number of new tasks found room in the pool (either because other tasks have ended or because the pool size was increased).
:py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since it makes use of "meta-tasks" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns. :py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns.
:py:meth:`TaskPool.map() <asyncio_taskpool.pool.TaskPool.map>` (and its variants) will **never** block. Since it makes use of a "meta-task" under the hood, it will always return immediately. However, if the pool was full when it was called, there is **no guarantee** that even a single task has started, when the method returns.

View File

@ -31,15 +31,13 @@ import logging
from asyncio.coroutines import iscoroutine, iscoroutinefunction from asyncio.coroutines import iscoroutine, iscoroutinefunction
from asyncio.exceptions import CancelledError from asyncio.exceptions import CancelledError
from asyncio.locks import Semaphore from asyncio.locks import Semaphore
from asyncio.queues import QueueEmpty
from asyncio.tasks import Task, create_task, gather from asyncio.tasks import Task, create_task, gather
from contextlib import suppress from contextlib import suppress
from datetime import datetime from datetime import datetime
from math import inf from math import inf
from typing import Any, Awaitable, Dict, Iterable, Iterator, List, Set, Union from typing import Any, Awaitable, Dict, Iterable, List, Set, Union
from . import exceptions from . import exceptions
from .queue_context import Queue
from .internals.constants import DEFAULT_TASK_GROUP, DATETIME_FORMAT from .internals.constants import DEFAULT_TASK_GROUP, DATETIME_FORMAT
from .internals.group_register import TaskGroupRegister from .internals.group_register import TaskGroupRegister
from .internals.helpers import execute_optional, star_function from .internals.helpers import execute_optional, star_function
@ -491,8 +489,6 @@ class TaskPool(BaseTaskPool):
Adding tasks blocks **only if** the pool is full at that moment. Adding tasks blocks **only if** the pool is full at that moment.
""" """
_QUEUE_END_SENTINEL = object()
def __init__(self, pool_size: int = inf, name: str = None) -> None: def __init__(self, pool_size: int = inf, name: str = None) -> None:
super().__init__(pool_size=pool_size, name=name) super().__init__(pool_size=pool_size, name=name)
# In addition to all the attributes of the base class, we need a dictionary mapping task group names to sets of # In addition to all the attributes of the base class, we need a dictionary mapping task group names to sets of
@ -714,34 +710,6 @@ class TaskPool(BaseTaskPool):
await task await task
return group_name return group_name
@classmethod
async def _queue_producer(cls, arg_queue: Queue, arg_iter: Iterator[Any], group_name: str) -> None:
"""
Keeps the arguments queue from :meth:`_map` full as long as the iterator has elements.
Intended to be run as a meta task of a specific group.
Args:
arg_queue: The queue of function arguments to consume for starting a new task.
arg_iter: The iterator of function arguments to put into the queue.
group_name: Name of the task group associated with this producer.
"""
try:
for arg in arg_iter:
await arg_queue.put(arg) # This blocks as long as the queue is full.
except CancelledError:
# This means that no more tasks are supposed to be created from this `_map()` call;
# thus, we can immediately drain the entire queue and forget about the rest of the arguments.
log.debug("Cancelled consumption of argument iterable in task group '%s'", group_name)
while True:
try:
arg_queue.get_nowait()
arg_queue.item_processed()
except QueueEmpty:
return
finally:
await arg_queue.put(cls._QUEUE_END_SENTINEL)
@staticmethod @staticmethod
def _get_map_end_callback(map_semaphore: Semaphore, actual_end_callback: EndCB) -> EndCB: def _get_map_end_callback(map_semaphore: Semaphore, actual_end_callback: EndCB) -> EndCB:
"""Returns a wrapped `end_callback` for each :meth:`_queue_consumer` task that releases the `map_semaphore`.""" """Returns a wrapped `end_callback` for each :meth:`_queue_consumer` task that releases the `map_semaphore`."""
@ -750,23 +718,25 @@ class TaskPool(BaseTaskPool):
await execute_optional(actual_end_callback, args=(task_id,)) await execute_optional(actual_end_callback, args=(task_id,))
return release_callback return release_callback
async def _queue_consumer(self, arg_queue: Queue, group_name: str, func: CoroutineFunc, arg_stars: int = 0, async def _arg_consumer(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
""" """
Consumes arguments from the queue from :meth:`_map` and keeps a limited number of tasks working on them. Consumes arguments from :meth:`_map` and keeps a limited number of tasks working on them.
The queue's maximum size is taken as the limiting value of an internal semaphore, which must be acquired before `num_concurrent` acts as the limiting value of an internal semaphore, which must be acquired before a new task
a new task can be started, and which must be released when one of these tasks ends. can be started, and which must be released when one of these tasks ends.
Intended to be run as a meta task of a specific group. Intended to be run as a meta task of a specific group.
Args: Args:
arg_queue:
The queue of function arguments to consume for starting a new task.
group_name: group_name:
Name of the associated task group; passed into :meth:`_start_task`. Name of the associated task group; passed into :meth:`_start_task`.
num_concurrent:
The maximum number new tasks spawned by this method to run concurrently.
func: func:
The coroutine function to use for spawning the new tasks within the task pool. The coroutine function to use for spawning the new tasks within the task pool.
arg_iter:
The iterable of arguments; each element is to be passed into a `func` call when spawning a new task.
arg_stars (optional): arg_stars (optional):
Whether or not to unpack an element from `arg_queue` using stars; must be 0, 1, or 2. Whether or not to unpack an element from `arg_queue` using stars; must be 0, 1, or 2.
end_callback (optional): end_callback (optional):
@ -776,31 +746,29 @@ class TaskPool(BaseTaskPool):
The callback that was specified to execute after cancellation of the task (and the next one). The callback that was specified to execute after cancellation of the task (and the next one).
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
""" """
map_semaphore = Semaphore(arg_queue.maxsize) # value determined by `group_size` in :meth:`_map` map_semaphore = Semaphore(num_concurrent)
release_cb = self._get_map_end_callback(map_semaphore, actual_end_callback=end_callback) release_cb = self._get_map_end_callback(map_semaphore, actual_end_callback=end_callback)
while True: for next_arg in arg_iter:
# The following line blocks **only if** the number of running tasks spawned by this method has reached the # When the number of running tasks spawned by this method reaches the specified maximum,
# specified maximum as determined in :meth:`_map`. # this next line will block, until one of them ends and releases the semaphore.
await map_semaphore.acquire() await map_semaphore.acquire()
# We await the queue's `get()` coroutine and ensure that its `item_processed()` method is called. try:
async with arg_queue as next_arg: await self._start_task(star_function(func, next_arg, arg_stars=arg_stars), group_name=group_name,
if next_arg is self._QUEUE_END_SENTINEL: ignore_lock=True, end_callback=release_cb, cancel_callback=cancel_callback)
# The :meth:`_queue_producer` either reached the last argument or was cancelled. except CancelledError:
return # This means that no more tasks are supposed to be created from this `arg_iter`;
try: # thus, we can forget about the rest of the arguments.
await self._start_task(star_function(func, next_arg, arg_stars=arg_stars), group_name=group_name, log.debug("Cancelled consumption of argument iterable in task group '%s'", group_name)
ignore_lock=True, end_callback=release_cb, cancel_callback=cancel_callback) map_semaphore.release()
except CancelledError: return
map_semaphore.release() except Exception as e:
return # This means an exception occurred during task **creation**, meaning no task has been created.
except Exception as e: # It does not imply an error within the task itself.
# This means an exception occurred during task **creation**, meaning no task has been created. log.exception("%s occurred while trying to create task: %s(%s%s)",
# It does not imply an error within the task itself. str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
log.exception("%s occurred while trying to create task: %s(%s%s)", map_semaphore.release()
str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
map_semaphore.release()
async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
""" """
Creates tasks in the pool with arguments from the supplied iterable. Creates tasks in the pool with arguments from the supplied iterable.
@ -809,23 +777,21 @@ class TaskPool(BaseTaskPool):
All the new tasks are added to the same task group. All the new tasks are added to the same task group.
The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at `num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this group ends, it triggers the start of a new task any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task
(assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size (assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size
of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is
concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course). always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course).
This method sets up an internal arguments queue which is continuously filled while consuming the `arg_iter`. Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
Because this method delegates the spawning of the tasks to two meta tasks (a producer and a consumer of the because this method returns immediately, this does not mean that any task was started or that any number of
aforementioned queue), it **never blocks**. However, just because this method returns immediately, this does tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
not mean that any task was started or that any number of tasks will start soon, as this is solely determined by
the :attr:`BaseTaskPool.pool_size` and the `group_size`.
Args: Args:
group_name: group_name:
Name of the task group to add the new tasks to. It must be a name that doesn't exist yet. Name of the task group to add the new tasks to. It must be a name that doesn't exist yet.
group_size: num_concurrent:
The maximum number new tasks spawned by this method to run concurrently. The number new tasks spawned by this method to run concurrently.
func: func:
The coroutine function to use for spawning the new tasks within the task pool. The coroutine function to use for spawning the new tasks within the task pool.
arg_iter: arg_iter:
@ -840,29 +806,21 @@ class TaskPool(BaseTaskPool):
It is run with the task's ID as its only positional argument. It is run with the task's ID as its only positional argument.
Raises: Raises:
`ValueError`: `group_size` is less than 1. `ValueError`: `num_concurrent` is less than 1.
`asyncio_taskpool.exceptions.InvalidGroupName`: A group named `group_name` exists in the pool. `asyncio_taskpool.exceptions.InvalidGroupName`: A group named `group_name` exists in the pool.
""" """
self._check_start(function=func) self._check_start(function=func)
if group_size < 1: if num_concurrent < 1:
raise ValueError(f"Group size must be a positive integer.") raise ValueError("`num_concurrent` must be a positive integer.")
if group_name in self._task_groups.keys(): if group_name in self._task_groups.keys():
raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!") raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!")
self._task_groups[group_name] = group_reg = TaskGroupRegister() self._task_groups[group_name] = group_reg = TaskGroupRegister()
async with group_reg: async with group_reg:
# Set up internal arguments queue. We limit its maximum size to enable lazy consumption of `arg_iter` by the
# `_queue_producer()`; that way an argument
# TODO: Perhaps this can be simplified to just one meta-task with no need for a queue.
# The limiting factor honoring the group size is already the semaphore in the queue consumer;
# Try to write this without a producer, instead consuming the `arg_iter` directly.
arg_queue = Queue(maxsize=group_size)
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set()) meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
# Start the producer and consumer meta tasks. meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
meta_tasks.add(create_task(self._queue_producer(arg_queue, iter(arg_iter), group_name))) end_callback=end_callback, cancel_callback=cancel_callback)))
meta_tasks.add(create_task(self._queue_consumer(arg_queue, group_name, func, arg_stars,
end_callback, cancel_callback)))
async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1, group_name: str = None, async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
""" """
A task-based equivalent of the `multiprocessing.pool.Pool.map` method. A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
@ -872,25 +830,23 @@ class TaskPool(BaseTaskPool):
All the new tasks are added to the same task group. All the new tasks are added to the same task group.
The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at `num_concurrent` determines the (maximum) number of tasks spawned this way that shall be running concurrently at
any given moment in time. As soon as one task from this group ends, it triggers the start of a new task any given moment in time. As soon as one task from this method call ends, it triggers the start of a new task
(assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size (assuming there is room in the pool), which consumes the next element from the arguments iterable. If the size
of the pool never imposes a limit, this ensures that the number of tasks belonging to this group and running of the pool never imposes a limit, this ensures that the number of tasks spawned and running concurrently is
concurrently is always equal to `group_size` (except for when `arg_iter` is exhausted of course). always equal to `num_concurrent` (except for when `arg_iter` is exhausted of course).
This method sets up an internal arguments queue which is continuously filled while consuming the `arg_iter`. Because this method delegates the spawning of the tasks to a meta task, it **never blocks**. However, just
Because this method delegates the spawning of the tasks to two meta tasks (a producer and a consumer of the because this method returns immediately, this does not mean that any task was started or that any number of
aforementioned queue), it **never blocks**. However, just because this method returns immediately, this does tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`.
not mean that any task was started or that any number of tasks will start soon, as this is solely determined by
the :attr:`BaseTaskPool.pool_size` and the `group_size`.
Args: Args:
func: func:
The coroutine function to use for spawning the new tasks within the task pool. The coroutine function to use for spawning the new tasks within the task pool.
arg_iter: arg_iter:
The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task. The iterable of arguments; each argument is to be passed into a `func` call when spawning a new task.
group_size (optional): num_concurrent (optional):
The maximum number new tasks spawned by this method to run concurrently. Defaults to 1. The number new tasks spawned by this method to run concurrently. Defaults to 1.
group_name (optional): group_name (optional):
Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet. Name of the task group to add the new tasks to. If provided, it must be a name that doesn't exist yet.
end_callback (optional): end_callback (optional):
@ -907,16 +863,16 @@ class TaskPool(BaseTaskPool):
`PoolIsClosed`: The pool is closed. `PoolIsClosed`: The pool is closed.
`NotCoroutine`: `func` is not a coroutine function. `NotCoroutine`: `func` is not a coroutine function.
`PoolIsLocked`: The pool is currently locked. `PoolIsLocked`: The pool is currently locked.
`ValueError`: `group_size` is less than 1. `ValueError`: `num_concurrent` is less than 1.
`InvalidGroupName`: A group named `group_name` exists in the pool. `InvalidGroupName`: A group named `group_name` exists in the pool.
""" """
if group_name is None: if group_name is None:
group_name = self._generate_group_name('map', func) group_name = self._generate_group_name('map', func)
await self._map(group_name, group_size, func, arg_iter, 0, await self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)
return group_name return group_name
async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], group_size: int = 1, async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
""" """
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
@ -925,11 +881,11 @@ class TaskPool(BaseTaskPool):
""" """
if group_name is None: if group_name is None:
group_name = self._generate_group_name('starmap', func) group_name = self._generate_group_name('starmap', func)
await self._map(group_name, group_size, func, args_iter, 1, await self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)
return group_name return group_name
async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], group_size: int = 1, async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, group_name: str = None, end_callback: EndCB = None,
cancel_callback: CancelCB = None) -> str: cancel_callback: CancelCB = None) -> str:
""" """
@ -939,7 +895,7 @@ class TaskPool(BaseTaskPool):
""" """
if group_name is None: if group_name is None:
group_name = self._generate_group_name('doublestarmap', func) group_name = self._generate_group_name('doublestarmap', func)
await self._map(group_name, group_size, func, kwargs_iter, 2, await self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback) end_callback=end_callback, cancel_callback=cancel_callback)
return group_name return group_name

View File

@ -20,7 +20,6 @@ Unittests for the `asyncio_taskpool.pool` module.
from asyncio.exceptions import CancelledError from asyncio.exceptions import CancelledError
from asyncio.locks import Semaphore from asyncio.locks import Semaphore
from asyncio.queues import QueueEmpty
from datetime import datetime from datetime import datetime
from unittest import IsolatedAsyncioTestCase from unittest import IsolatedAsyncioTestCase
from unittest.mock import PropertyMock, MagicMock, AsyncMock, patch, call from unittest.mock import PropertyMock, MagicMock, AsyncMock, patch, call
@ -555,28 +554,6 @@ class TaskPoolTestCase(CommonTestCase):
check_assertions(generated_name, output) check_assertions(generated_name, output)
mock__generate_group_name.assert_called_once_with('apply', mock_func) mock__generate_group_name.assert_called_once_with('apply', mock_func)
@patch.object(pool, 'Queue')
async def test__queue_producer(self, mock_queue_cls: MagicMock):
mock_put = AsyncMock()
mock_queue_cls.return_value = mock_queue = MagicMock(put=mock_put)
item1, item2, item3 = FOO, 420, 69
arg_iter = iter([item1, item2, item3])
self.assertIsNone(await self.task_pool._queue_producer(mock_queue, arg_iter, FOO + BAR))
mock_put.assert_has_awaits([call(item1), call(item2), call(item3), call(pool.TaskPool._QUEUE_END_SENTINEL)])
with self.assertRaises(StopIteration):
next(arg_iter)
mock_put.reset_mock()
mock_put.side_effect = [CancelledError, None]
arg_iter = iter([item1, item2, item3])
mock_queue.get_nowait.side_effect = [item2, item3, QueueEmpty]
self.assertIsNone(await self.task_pool._queue_producer(mock_queue, arg_iter, FOO + BAR))
mock_put.assert_has_awaits([call(item1), call(pool.TaskPool._QUEUE_END_SENTINEL)])
mock_queue.get_nowait.assert_has_calls([call(), call(), call()])
mock_queue.item_processed.assert_has_calls([call(), call()])
self.assertListEqual([item2, item3], list(arg_iter))
@patch.object(pool, 'execute_optional') @patch.object(pool, 'execute_optional')
async def test__get_map_end_callback(self, mock_execute_optional: AsyncMock): async def test__get_map_end_callback(self, mock_execute_optional: AsyncMock):
semaphore, mock_end_cb = Semaphore(1), MagicMock() semaphore, mock_end_cb = Semaphore(1), MagicMock()
@ -592,30 +569,28 @@ class TaskPoolTestCase(CommonTestCase):
@patch.object(pool, 'Semaphore') @patch.object(pool, 'Semaphore')
async def test__queue_consumer(self, mock_semaphore_cls: MagicMock, mock__get_map_end_callback: MagicMock, async def test__queue_consumer(self, mock_semaphore_cls: MagicMock, mock__get_map_end_callback: MagicMock,
mock__start_task: AsyncMock, mock_star_function: MagicMock): mock__start_task: AsyncMock, mock_star_function: MagicMock):
mock_semaphore_cls.return_value = semaphore = Semaphore(3) n = 2
mock_semaphore_cls.return_value = semaphore = Semaphore(n)
mock__get_map_end_callback.return_value = map_cb = MagicMock() mock__get_map_end_callback.return_value = map_cb = MagicMock()
awaitable = 'totally an awaitable' awaitable = 'totally an awaitable'
mock_star_function.side_effect = [awaitable, awaitable, Exception()] mock_star_function.side_effect = [awaitable, Exception(), awaitable]
arg1, arg2, bad = 123456789, 'function argument', None arg1, arg2, bad = 123456789, 'function argument', None
mock_q_maxsize = 3 args = [arg1, bad, arg2]
mock_q = MagicMock(__aenter__=AsyncMock(side_effect=[arg1, arg2, bad, pool.TaskPool._QUEUE_END_SENTINEL]),
__aexit__=AsyncMock(), maxsize=mock_q_maxsize)
group_name, mock_func, stars = 'whatever', MagicMock(__name__="mock"), 3 group_name, mock_func, stars = 'whatever', MagicMock(__name__="mock"), 3
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_consumer(mock_q, group_name, mock_func, stars, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._arg_consumer(group_name, n, mock_func, args, stars, end_cb, cancel_cb))
# We expect the semaphore to be acquired 3 times, then be released once after the exception occurs, then # We expect the semaphore to be acquired 2 times, then be released once after the exception occurs, then
# acquired once more when the `_QUEUE_END_SENTINEL` is reached. Since we initialized it with a value of 3, # acquired once more is reached. Since we initialized it with a value of 2, we expect it be locked.
# at the end of the loop, we expect it be locked.
self.assertTrue(semaphore.locked()) self.assertTrue(semaphore.locked())
mock_semaphore_cls.assert_called_once_with(mock_q_maxsize) mock_semaphore_cls.assert_called_once_with(n)
mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb) mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb)
mock__start_task.assert_has_awaits(2 * [ mock__start_task.assert_has_awaits(2 * [
call(awaitable, group_name=group_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb) call(awaitable, group_name=group_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb)
]) ])
mock_star_function.assert_has_calls([ mock_star_function.assert_has_calls([
call(mock_func, arg1, arg_stars=stars), call(mock_func, arg1, arg_stars=stars),
call(mock_func, arg2, arg_stars=stars), call(mock_func, bad, arg_stars=stars),
call(mock_func, bad, arg_stars=stars) call(mock_func, arg2, arg_stars=stars)
]) ])
mock_semaphore_cls.reset_mock() mock_semaphore_cls.reset_mock()
@ -626,61 +601,53 @@ class TaskPoolTestCase(CommonTestCase):
# With a CancelledError thrown while starting a task: # With a CancelledError thrown while starting a task:
mock_semaphore_cls.return_value = semaphore = Semaphore(1) mock_semaphore_cls.return_value = semaphore = Semaphore(1)
mock_star_function.side_effect = CancelledError() mock_star_function.side_effect = CancelledError()
mock_q = MagicMock(__aenter__=AsyncMock(return_value=arg1), __aexit__=AsyncMock(), maxsize=mock_q_maxsize) self.assertIsNone(await self.task_pool._arg_consumer(group_name, n, mock_func, args, stars, end_cb, cancel_cb))
self.assertIsNone(await self.task_pool._queue_consumer(mock_q, group_name, mock_func, stars, end_cb, cancel_cb))
self.assertFalse(semaphore.locked()) self.assertFalse(semaphore.locked())
mock_semaphore_cls.assert_called_once_with(mock_q_maxsize) mock_semaphore_cls.assert_called_once_with(n)
mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb) mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb)
mock__start_task.assert_not_called() mock__start_task.assert_not_called()
mock_star_function.assert_called_once_with(mock_func, arg1, arg_stars=stars) mock_star_function.assert_called_once_with(mock_func, arg1, arg_stars=stars)
@patch.object(pool, 'create_task') @patch.object(pool, 'create_task')
@patch.object(pool.TaskPool, '_queue_consumer', new_callable=MagicMock) @patch.object(pool.TaskPool, '_arg_consumer', new_callable=MagicMock)
@patch.object(pool.TaskPool, '_queue_producer', new_callable=MagicMock)
@patch.object(pool, 'Queue')
@patch.object(pool, 'TaskGroupRegister') @patch.object(pool, 'TaskGroupRegister')
@patch.object(pool.BaseTaskPool, '_check_start') @patch.object(pool.BaseTaskPool, '_check_start')
async def test__map(self, mock__check_start: MagicMock, mock_reg_cls: MagicMock, mock_queue_cls: MagicMock, async def test__map(self, mock__check_start: MagicMock, mock_reg_cls: MagicMock, mock__arg_consumer: MagicMock,
mock__queue_producer: MagicMock, mock__queue_consumer: MagicMock, mock_create_task: MagicMock): mock_create_task: MagicMock):
mock_group_reg = set_up_mock_group_register(mock_reg_cls) mock_group_reg = set_up_mock_group_register(mock_reg_cls)
mock_queue_cls.return_value = mock_q = MagicMock() mock__arg_consumer.return_value = fake_consumer = object()
mock__queue_producer.return_value = fake_producer = object() mock_create_task.return_value = fake_task = object()
mock__queue_consumer.return_value = fake_consumer = object()
fake_task1, fake_task2 = object(), object()
mock_create_task.side_effect = [fake_task1, fake_task2]
group_name, group_size = 'onetwothree', 0 group_name, n = 'onetwothree', 0
func, arg_iter, stars = AsyncMock(), [55, 66, 77], 3 func, arg_iter, stars = AsyncMock(), [55, 66, 77], 3
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
await self.task_pool._map(group_name, group_size, func, arg_iter, stars, end_cb, cancel_cb) await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
mock__check_start.assert_called_once_with(function=func) mock__check_start.assert_called_once_with(function=func)
mock__check_start.reset_mock() mock__check_start.reset_mock()
group_size = 1234 n = 1234
self.task_pool._task_groups = {group_name: MagicMock()} self.task_pool._task_groups = {group_name: MagicMock()}
with self.assertRaises(exceptions.InvalidGroupName): with self.assertRaises(exceptions.InvalidGroupName):
await self.task_pool._map(group_name, group_size, func, arg_iter, stars, end_cb, cancel_cb) await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb)
mock__check_start.assert_called_once_with(function=func) mock__check_start.assert_called_once_with(function=func)
mock__check_start.reset_mock() mock__check_start.reset_mock()
self.task_pool._task_groups.clear() self.task_pool._task_groups.clear()
self.task_pool._before_gathering = []
self.assertIsNone(await self.task_pool._map(group_name, group_size, func, arg_iter, stars, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._map(group_name, n, func, arg_iter, stars, end_cb, cancel_cb))
mock__check_start.assert_called_once_with(function=func) mock__check_start.assert_called_once_with(function=func)
mock_reg_cls.assert_called_once_with() mock_reg_cls.assert_called_once_with()
self.task_pool._task_groups[group_name] = mock_group_reg self.task_pool._task_groups[group_name] = mock_group_reg
mock_group_reg.__aenter__.assert_awaited_once_with() mock_group_reg.__aenter__.assert_awaited_once_with()
mock_queue_cls.assert_called_once_with(maxsize=group_size) mock__arg_consumer.assert_called_once_with(group_name, n, func, arg_iter, stars,
mock__queue_producer.assert_called_once() end_callback=end_cb, cancel_callback=cancel_cb)
mock__queue_consumer.assert_called_once_with(mock_q, group_name, func, stars, end_cb, cancel_cb) mock_create_task.assert_called_once_with(fake_consumer)
mock_create_task.assert_has_calls([call(fake_producer), call(fake_consumer)]) self.assertSetEqual({fake_task}, self.task_pool._group_meta_tasks_running[group_name])
self.assertSetEqual({fake_task1, fake_task2}, self.task_pool._group_meta_tasks_running[group_name])
mock_group_reg.__aexit__.assert_awaited_once() mock_group_reg.__aexit__.assert_awaited_once()
@patch.object(pool.TaskPool, '_map') @patch.object(pool.TaskPool, '_map')
@ -688,18 +655,18 @@ class TaskPoolTestCase(CommonTestCase):
async def test_map(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): async def test_map(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock() mock_func = MagicMock()
arg_iter, group_size, group_name = (FOO, BAR, 1, 2, 3), 2, FOO + BAR arg_iter, num_concurrent, group_name = (FOO, BAR, 1, 2, 3), 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.map(mock_func, arg_iter, group_size, group_name, end_cb, cancel_cb) output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output) self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, group_size, mock_func, arg_iter, 0, mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called() mock__generate_group_name.assert_not_called()
mock__map.reset_mock() mock__map.reset_mock()
output = await self.task_pool.map(mock_func, arg_iter, group_size, None, end_cb, cancel_cb) output = await self.task_pool.map(mock_func, arg_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output) self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, arg_iter, 0, mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, arg_iter, 0,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('map', mock_func) mock__generate_group_name.assert_called_once_with('map', mock_func)
@ -708,18 +675,18 @@ class TaskPoolTestCase(CommonTestCase):
async def test_starmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): async def test_starmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock() mock_func = MagicMock()
args_iter, group_size, group_name = ([FOO], [BAR]), 2, FOO + BAR args_iter, num_concurrent, group_name = ([FOO], [BAR]), 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.starmap(mock_func, args_iter, group_size, group_name, end_cb, cancel_cb) output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output) self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, group_size, mock_func, args_iter, 1, mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called() mock__generate_group_name.assert_not_called()
mock__map.reset_mock() mock__map.reset_mock()
output = await self.task_pool.starmap(mock_func, args_iter, group_size, None, end_cb, cancel_cb) output = await self.task_pool.starmap(mock_func, args_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output) self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, args_iter, 1, mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, args_iter, 1,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('starmap', mock_func) mock__generate_group_name.assert_called_once_with('starmap', mock_func)
@ -728,18 +695,18 @@ class TaskPoolTestCase(CommonTestCase):
async def test_doublestarmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock): async def test_doublestarmap(self, mock__generate_group_name: MagicMock, mock__map: AsyncMock):
mock__generate_group_name.return_value = generated_name = 'name 1 2 3' mock__generate_group_name.return_value = generated_name = 'name 1 2 3'
mock_func = MagicMock() mock_func = MagicMock()
kwargs_iter, group_size, group_name = [{'a': FOO}, {'a': BAR}], 2, FOO + BAR kw_iter, num_concurrent, group_name = [{'a': FOO}, {'a': BAR}], 2, FOO + BAR
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
output = await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, group_name, end_cb, cancel_cb) output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, group_name, end_cb, cancel_cb)
self.assertEqual(group_name, output) self.assertEqual(group_name, output)
mock__map.assert_awaited_once_with(group_name, group_size, mock_func, kwargs_iter, 2, mock__map.assert_awaited_once_with(group_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_not_called() mock__generate_group_name.assert_not_called()
mock__map.reset_mock() mock__map.reset_mock()
output = await self.task_pool.doublestarmap(mock_func, kwargs_iter, group_size, None, end_cb, cancel_cb) output = await self.task_pool.doublestarmap(mock_func, kw_iter, num_concurrent, None, end_cb, cancel_cb)
self.assertEqual(generated_name, output) self.assertEqual(generated_name, output)
mock__map.assert_awaited_once_with(generated_name, group_size, mock_func, kwargs_iter, 2, mock__map.assert_awaited_once_with(generated_name, num_concurrent, mock_func, kw_iter, 2,
end_callback=end_cb, cancel_callback=cancel_cb) end_callback=end_cb, cancel_callback=cancel_cb)
mock__generate_group_name.assert_called_once_with('doublestarmap', mock_func) mock__generate_group_name.assert_called_once_with('doublestarmap', mock_func)

View File

@ -41,7 +41,7 @@ async def main() -> None:
pool = SimpleTaskPool(work, args=(5,)) # initializes the pool; no work is being done yet pool = SimpleTaskPool(work, args=(5,)) # initializes the pool; no work is being done yet
await pool.start(3) # launches work tasks 0, 1, and 2 await pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5) # lets the tasks work for a bit await asyncio.sleep(1.5) # lets the tasks work for a bit
await pool.start() # launches work task 3 await pool.start(1) # launches work task 3
await asyncio.sleep(1.5) # lets the tasks work for a bit await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.stop(2) # cancels tasks 3 and 2 (LIFO order) pool.stop(2) # cancels tasks 3 and 2 (LIFO order)
pool.lock() # required for the last line pool.lock() # required for the last line
@ -135,7 +135,7 @@ async def main() -> None:
# Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5) # Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5)
# only once there is room in the pool and no more than one other task of these new ones is running. # only once there is room in the pool and no more than one other task of these new ones is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
await pool.starmap(other_work, args_list, group_size=2) await pool.starmap(other_work, args_list, num_concurrent=2)
print("> Called `starmap`") print("> Called `starmap`")
# Now we lock the pool, so that we can safely await all our tasks. # Now we lock the pool, so that we can safely await all our tasks.
pool.lock() pool.lock()
@ -199,7 +199,7 @@ Started TaskPool-0_Task-3
> other_work with 15 > other_work with 15
Ended TaskPool-0_Task-0 Ended TaskPool-0_Task-0
Ended TaskPool-0_Task-1 <--- these two end and free up two more slots in the pool Ended TaskPool-0_Task-1 <--- these two end and free up two more slots in the pool
Started TaskPool-0_Task-4 <--- since the group size is set to 2, Task-5 will not start Started TaskPool-0_Task-4 <--- since `num_concurrent` is set to 2, Task-5 will not start
> work with 190 > work with 190
> work with 190 > work with 190
> other_work with 16 > other_work with 16