From 4c6a5412cabf70216aae17aa570e8f0ca565f689 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Thu, 17 Mar 2022 13:52:02 +0100 Subject: [PATCH] removed `num_cancellations` and `num_finished` from the pool interface; added `num_cancelled`; made the `num` argument non-optional in the `start` and `stop` methods of the `SimpleTaskPool`; changed some internals; improved docstrings --- setup.cfg | 2 +- src/asyncio_taskpool/pool.py | 128 ++++++++++++++++++++--------------- tests/test_pool.py | 20 ++---- 3 files changed, 79 insertions(+), 71 deletions(-) diff --git a/setup.cfg b/setup.cfg index 34fc2f8..9fe975e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = asyncio-taskpool -version = 0.7.1 +version = 0.8.0 author = Daniil Fajnberg author_email = mail@daniil.fajnberg.de description = Dynamically manage pools of asyncio tasks diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 2198697..0a69533 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -59,16 +59,14 @@ class BaseTaskPool: @classmethod def _add_pool(cls, pool: 'BaseTaskPool') -> int: - """Adds a `pool` (instance of any subclass) to the general list of pools and returns it's index in the list.""" + """Adds a `pool` to the general list of pools and returns it's index.""" cls._pools.append(pool) return len(cls._pools) - 1 def __init__(self, pool_size: int = inf, name: str = None) -> None: """Initializes the necessary internal attributes and adds the new pool to the general pools list.""" - # Initialize a counter for the total number of tasks started through the pool and one for the total number of - # tasks cancelled through the pool. + # Initialize a counter for the total number of tasks started through the pool. self._num_started: int = 0 - self._num_cancellations: int = 0 # Initialize flags; immutably set the name. self._locked: bool = False @@ -97,18 +95,18 @@ class BaseTaskPool: @property def pool_size(self) -> int: - """Returns the maximum number of concurrently running tasks currently set in the pool.""" - return self._pool_size + """Maximum number of concurrently running tasks allowed in the pool.""" + return getattr(self._enough_room, '_value') @pool_size.setter def pool_size(self, value: int) -> None: """ Sets the maximum number of concurrently running tasks in the pool. + NOTE: Increasing the pool size will immediately start tasks that are awaiting enough room to run. + Args: - value: - A non-negative integer. - NOTE: Increasing the pool size will immediately start tasks that are awaiting enough room to run. + value: A non-negative integer. Raises: `ValueError` if `value` is less than 0. @@ -116,11 +114,10 @@ class BaseTaskPool: if value < 0: raise ValueError("Pool size can not be less than 0") self._enough_room._value = value - self._pool_size = value @property def is_locked(self) -> bool: - """Returns `True` if the pool has been locked (see below).""" + """`True` if the pool has been locked (see below).""" return self._locked def lock(self) -> None: @@ -138,26 +135,26 @@ class BaseTaskPool: @property def num_running(self) -> int: """ - Returns the number of tasks in the pool that are (at that moment) still running. + Number of tasks in the pool that are still running. At the moment a task's `end_callback` or `cancel_callback` is fired, it is no longer considered running. """ return len(self._tasks_running) @property - def num_cancellations(self) -> int: + def num_cancelled(self) -> int: """ - Returns the number of tasks in the pool that have been cancelled through the pool (up until that moment). + Number of tasks in the pool that have been cancelled. - At the moment a task's `cancel_callback` is fired, this counts as a cancellation, and the task is then - considered cancelled (instead of running) until its `end_callback` is fired. + At the moment a task's `cancel_callback` is fired, it is considered to be cancelled and no longer running, + until its `end_callback` is fired, at which point it is considered ended (instead of cancelled). """ - return self._num_cancellations + return len(self._tasks_cancelled) @property def num_ended(self) -> int: """ - Returns the number of tasks started through the pool that have stopped running (up until that moment). + Number of tasks in the pool that have stopped running. At the moment a task's `end_callback` is fired, it is considered ended and no longer running (or cancelled). When a task is cancelled, it is not immediately considered ended; only after its `cancel_callback` has returned, @@ -165,16 +162,12 @@ class BaseTaskPool: """ return len(self._tasks_ended) - @property - def num_finished(self) -> int: - """Returns the number of tasks in the pool that have finished running (without having been cancelled).""" - return len(self._tasks_ended) - self._num_cancellations + len(self._tasks_cancelled) - @property def is_full(self) -> bool: """ - Returns `False` only if (at that moment) the number of running tasks is below the pool's specified size. - When the pool is full, any call to start a new task within it will block. + `False` if the number of running tasks is less than the `pool_size`. + + When the pool is full, any call to start a new task within it will block, until there is enough room for it. """ return self._enough_room.locked() @@ -247,7 +240,6 @@ class BaseTaskPool: """ log.debug("Cancelling %s ...", self._task_name(task_id)) self._tasks_cancelled[task_id] = self._tasks_running.pop(task_id) - self._num_cancellations += 1 log.debug("Cancelled %s", self._task_name(task_id)) await execute_optional(custom_callback, args=(task_id,)) @@ -276,7 +268,9 @@ class BaseTaskPool: async def _task_wrapper(self, awaitable: Awaitable, task_id: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> Any: """ - Universal wrapper around every task run in the pool that returns/raises whatever the wrapped coroutine does. + Universal wrapper around every task run in the pool. + + Returns/raises whatever the wrapped coroutine does. Responsible for catching cancellation and awaiting the `_task_cancellation` callback, as well as for awaiting the `_task_ending` callback, after the coroutine returns or raises an exception. @@ -381,7 +375,9 @@ class BaseTaskPool: def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None: """ - Removes all tasks from the specified group and cancels them, if they are still running. + Removes all tasks from the specified group and cancels them. + + Does nothing to tasks, that are no longer running. Args: group_name: The name of the group of tasks that shall be cancelled. @@ -397,7 +393,9 @@ class BaseTaskPool: async def cancel_group(self, group_name: str, msg: str = None) -> None: """ - Cancels an entire group of tasks. The task group is subsequently forgotten by the pool. + Cancels an entire group of tasks. + + The task group is subsequently forgotten by the pool. Args: group_name: The name of the group of tasks that shall be cancelled. @@ -430,12 +428,13 @@ class BaseTaskPool: async def flush(self, return_exceptions: bool = False): """ - Calls `asyncio.gather` on all ended/cancelled tasks from the pool, and forgets the tasks. + Calls `asyncio.gather` on all ended/cancelled tasks in the pool. - This method exists mainly to free up memory of unneeded `Task` objects. + The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded + `Task` objects. - This method blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the - callbacks registered for the tasks block. + It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks + registered for the tasks block. Args: return_exceptions (optional): Passed directly into `gather`. @@ -446,7 +445,9 @@ class BaseTaskPool: async def gather_and_close(self, return_exceptions: bool = False): """ - Calls `asyncio.gather` on **all** tasks in the pool, then permanently closes the pool. + Calls `asyncio.gather` on **all** tasks in the pool, then closes it. + + After this method returns, no more tasks can be started in the pool. The `lock()` method must have been called prior to this. @@ -473,7 +474,9 @@ class BaseTaskPool: class TaskPool(BaseTaskPool): """ - General task pool class. Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib. + General purpose task pool class. + + Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib. A `TaskPool` instance can manage an arbitrary number of concurrent tasks from any coroutine function. Tasks in the pool can all belong to the same coroutine function, @@ -506,12 +509,15 @@ class TaskPool(BaseTaskPool): log.debug("%s cancelled and forgot meta tasks from group %s", str(self), group_name) def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None: + """See base class.""" self._cancel_group_meta_tasks(group_name) super()._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg) async def cancel_group(self, group_name: str, msg: str = None) -> None: """ - Cancels an entire group of tasks. The task group is subsequently forgotten by the pool. + Cancels an entire group of tasks. + + The task group is subsequently forgotten by the pool. If any methods such as `map()` launched meta tasks belonging to that group, these meta tasks are cancelled before the actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will @@ -529,7 +535,7 @@ class TaskPool(BaseTaskPool): async def cancel_all(self, msg: str = None) -> None: """ - Cancels all tasks still running within the pool. (This includes all meta tasks.) + Cancels all tasks still running within the pool (including meta tasks). If any methods such as `map()` launched meta tasks, these meta tasks are cancelled before the actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will **never even start**. In the @@ -569,12 +575,13 @@ class TaskPool(BaseTaskPool): async def flush(self, return_exceptions: bool = False): """ - Calls `asyncio.gather` on all ended/cancelled tasks from the pool, and forgets the tasks. + Calls `asyncio.gather` on all ended/cancelled tasks in the pool. - This method exists mainly to free up memory of unneeded `Task` objects. It also gets rid of unneeded meta tasks. + The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded + `Task` objects. It also gets rid of unneeded meta tasks. - This method blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the - callbacks registered for the tasks block. + It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks + registered for the tasks block. Args: return_exceptions (optional): Passed directly into `gather`. @@ -587,7 +594,9 @@ class TaskPool(BaseTaskPool): async def gather_and_close(self, return_exceptions: bool = False): """ - Calls `asyncio.gather` on **all** tasks in the pool, then permanently closes the pool. + Calls `asyncio.gather` on **all** tasks in the pool, then closes it. + + After this method returns, no more tasks can be started in the pool. The `lock()` method must have been called prior to this. @@ -596,7 +605,6 @@ class TaskPool(BaseTaskPool): which may not even be possible (depending on what the iterable of arguments represents). If you want to avoid this, make sure to call `cancel_all()` prior to this. - This method may also block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of the callbacks registered for a task blocks for whatever reason. @@ -662,9 +670,13 @@ class TaskPool(BaseTaskPool): async def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None, num: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ - Creates an arbitrary number of coroutines with the supplied arguments and runs them as new tasks in the pool. + Creates tasks with the supplied arguments to be run in the pool. + + Each coroutine looks like `func(*args, **kwargs)`, meaning the `args` and `kwargs` are unpacked and passed + into `func` before creating each task, and this is done `num` times. + + All the new tasks are added to the same task group. - Each coroutine looks like `func(*args, **kwargs)`. All the new tasks are added to the same task group. This method blocks, **only if** the pool has not enough room to accommodate `num` new tasks. Args: @@ -788,9 +800,10 @@ class TaskPool(BaseTaskPool): async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: """ - Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. + Creates tasks in the pool with arguments from the supplied iterable. Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `arg_iter`. + All the new tasks are added to the same task group. The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at @@ -847,10 +860,11 @@ class TaskPool(BaseTaskPool): async def map(self, func: CoroutineFunc, arg_iter: ArgsT, group_size: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ - An asyncio-task-based equivalent of the `multiprocessing.pool.Pool.map` method. + A task-based equivalent of the `multiprocessing.pool.Pool.map` method. Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. + All the new tasks are added to the same task group. The `group_size` determines the maximum number of tasks spawned this way that shall be running concurrently at @@ -946,6 +960,7 @@ class SimpleTaskPool(BaseTaskPool): end_callback: EndCB = None, cancel_callback: CancelCB = None, pool_size: int = inf, name: str = None) -> None: """ + Initializes all required attributes. Args: func: @@ -964,6 +979,9 @@ class SimpleTaskPool(BaseTaskPool): The maximum number of tasks allowed to run concurrently in the pool name (optional): An optional name for the pool. + + Raises: + `NotCoroutine` if `func` is not a coroutine function. """ if not iscoroutinefunction(func): raise exceptions.NotCoroutine(f"Not a coroutine function: {func}") @@ -976,7 +994,7 @@ class SimpleTaskPool(BaseTaskPool): @property def func_name(self) -> str: - """Returns the name of the coroutine function used in the pool.""" + """Name of the coroutine function used in the pool.""" return self._func.__name__ async def _start_one(self) -> int: @@ -984,18 +1002,18 @@ class SimpleTaskPool(BaseTaskPool): return await self._start_task(self._func(*self._args, **self._kwargs), end_callback=self._end_callback, cancel_callback=self._cancel_callback) - async def start(self, num: int = 1) -> List[int]: - """Starts `num` new tasks within the pool and returns their IDs as a list.""" + async def start(self, num: int) -> List[int]: + """Starts `num` new tasks within the pool and returns their IDs.""" ids = await gather(*(self._start_one() for _ in range(num))) - assert isinstance(ids, list) # for PyCharm (see above to-do-item) + assert isinstance(ids, list) # for PyCharm return ids - def stop(self, num: int = 1) -> List[int]: + def stop(self, num: int) -> List[int]: """ - Cancels `num` running tasks within the pool and returns their IDs as a list. + Cancels `num` running tasks within the pool and returns their IDs. The tasks are canceled in LIFO order, meaning tasks started later will be stopped before those started earlier. - If `num` is greater than or equal to the number of currently running tasks, naturally all tasks are cancelled. + If `num` is greater than or equal to the number of currently running tasks, all tasks are cancelled. """ ids = [] for i, task_id in enumerate(reversed(self._tasks_running)): @@ -1006,5 +1024,5 @@ class SimpleTaskPool(BaseTaskPool): return ids def stop_all(self) -> List[int]: - """Cancels all running tasks and returns their IDs as a list.""" + """Cancels all running tasks and returns their IDs.""" return self.stop(self.num_running) diff --git a/tests/test_pool.py b/tests/test_pool.py index 7b62cc1..3c11d25 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -84,7 +84,6 @@ class BaseTaskPoolTestCase(CommonTestCase): def test_init(self): self.assertEqual(0, self.task_pool._num_started) - self.assertEqual(0, self.task_pool._num_cancellations) self.assertFalse(self.task_pool._locked) self.assertFalse(self.task_pool._closed) @@ -114,14 +113,14 @@ class BaseTaskPoolTestCase(CommonTestCase): def test_pool_size(self): self.pool_size_patcher.stop() - self.task_pool._pool_size = self.TEST_POOL_SIZE + self.task_pool._enough_room._value = self.TEST_POOL_SIZE self.assertEqual(self.TEST_POOL_SIZE, self.task_pool.pool_size) with self.assertRaises(ValueError): self.task_pool.pool_size = -1 self.task_pool.pool_size = new_size = 69 - self.assertEqual(new_size, self.task_pool._pool_size) + self.assertEqual(new_size, self.task_pool._enough_room._value) def test_is_locked(self): self.task_pool._locked = FOO @@ -145,21 +144,14 @@ class BaseTaskPoolTestCase(CommonTestCase): self.task_pool._tasks_running = {1: FOO, 2: BAR, 3: BAZ} self.assertEqual(3, self.task_pool.num_running) - def test_num_cancellations(self): - self.task_pool._num_cancellations = 3 - self.assertEqual(3, self.task_pool.num_cancellations) + def test_num_cancelled(self): + self.task_pool._tasks_cancelled = {1: FOO, 2: BAR, 3: BAZ} + self.assertEqual(3, self.task_pool.num_cancelled) def test_num_ended(self): self.task_pool._tasks_ended = {1: FOO, 2: BAR, 3: BAZ} self.assertEqual(3, self.task_pool.num_ended) - def test_num_finished(self): - self.task_pool._num_cancellations = num_cancellations = 69 - num_ended = 420 - self.task_pool._tasks_ended = {i: FOO for i in range(num_ended)} - self.task_pool._tasks_cancelled = mock_cancelled_dict = {1: FOO, 2: BAR, 3: BAZ} - self.assertEqual(num_ended - num_cancellations + len(mock_cancelled_dict), self.task_pool.num_finished) - def test_is_full(self): self.assertEqual(self.task_pool._enough_room.locked(), self.task_pool.is_full) @@ -200,12 +192,10 @@ class BaseTaskPoolTestCase(CommonTestCase): @patch.object(pool.BaseTaskPool, '_task_name', return_value=FOO) async def test__task_cancellation(self, mock__task_name: MagicMock, mock_execute_optional: AsyncMock): task_id, mock_task, mock_callback = 1, MagicMock(), MagicMock() - self.task_pool._num_cancellations = cancelled = 3 self.task_pool._tasks_running[task_id] = mock_task self.assertIsNone(await self.task_pool._task_cancellation(task_id, mock_callback)) self.assertNotIn(task_id, self.task_pool._tasks_running) self.assertEqual(mock_task, self.task_pool._tasks_cancelled[task_id]) - self.assertEqual(cancelled + 1, self.task_pool._num_cancellations) mock__task_name.assert_called_with(task_id) mock_execute_optional.assert_awaited_once_with(mock_callback, args=(task_id, ))