diff --git a/src/asyncio_taskpool/exceptions.py b/src/asyncio_taskpool/exceptions.py index 9171b26..0b25408 100644 --- a/src/asyncio_taskpool/exceptions.py +++ b/src/asyncio_taskpool/exceptions.py @@ -14,7 +14,7 @@ class AlreadyCancelled(TaskEnded): pass -class AlreadyFinished(TaskEnded): +class AlreadyEnded(TaskEnded): pass diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 232d058..e40e632 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -46,10 +46,22 @@ class BaseTaskPool: @property def pool_size(self) -> int: + """Returns the maximum number of concurrently running tasks currently set in the pool.""" return self._pool_size @pool_size.setter def pool_size(self, value: int) -> None: + """ + Sets the maximum number of concurrently running tasks in the pool. + + Args: + value: + A non-negative integer. + NOTE: Increasing the pool size will immediately start tasks that are awaiting enough room to run. + + Raises: + `ValueError` if `value` is less than 0. + """ if value < 0: raise ValueError("Pool size can not be less than 0") self._enough_room._value = value @@ -107,6 +119,17 @@ class BaseTaskPool: return f'{self}_Task-{task_id}' async def _task_cancellation(self, task_id: int, custom_callback: CancelCallbackT = None) -> None: + """ + Universal callback to be run upon any task in the pool being cancelled. + Required for keeping track of running/cancelled tasks and proper logging. + + Args: + task_id: + The ID of the task that has been cancelled. + custom_callback (optional): + A callback to execute after cancellation of the task. + It is run at the end of this function with the `task_id` as its only positional argument. + """ log.debug("Cancelling %s ...", self._task_name(task_id)) self._cancelled[task_id] = self._running.pop(task_id) self._num_cancelled += 1 @@ -114,6 +137,18 @@ class BaseTaskPool: await _execute_function(custom_callback, args=(task_id, )) async def _task_ending(self, task_id: int, custom_callback: EndCallbackT = None) -> None: + """ + Universal callback to be run upon any task in the pool ending its work. + Required for keeping track of running/cancelled/ended tasks and proper logging. + Also releases room in the task pool for potentially waiting tasks. + + Args: + task_id: + The ID of the task that has reached its end. + custom_callback (optional): + A callback to execute after the task has ended. + It is run at the end of this function with the `task_id` as its only positional argument. + """ try: self._ended[task_id] = self._running.pop(task_id) except KeyError: @@ -125,6 +160,22 @@ class BaseTaskPool: async def _task_wrapper(self, awaitable: Awaitable, task_id: int, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> Any: + """ + Universal wrapper around every task to be run in the pool. + Returns/raises whatever the wrapped coroutine does. + + Args: + awaitable: + The actual coroutine to be run within the task pool. + task_id: + The ID of the newly created task. + end_callback (optional): + A callback to execute after the task has ended. + It is run with the `task_id` as its only positional argument. + cancel_callback (optional): + A callback to execute after cancellation of the task. + It is run with the `task_id` as its only positional argument. + """ log.info("Started %s", self._task_name(task_id)) try: return await awaitable @@ -135,6 +186,26 @@ class BaseTaskPool: async def _start_task(self, awaitable: Awaitable, ignore_closed: bool = False, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> int: + """ + Starts a coroutine as a new task in the pool. + This method blocks, **only if** there the pool is full. + Returns/raises whatever the wrapped coroutine does. + + Args: + awaitable: + The actual coroutine to be run within the task pool. + ignore_closed (optional): + If `True`, even if the pool is closed, the task will still be started. + end_callback (optional): + A callback to execute after the task has ended. + It is run with the `task_id` as its only positional argument. + cancel_callback (optional): + A callback to execute after cancellation of the task. + It is run with the `task_id` as its only positional argument. + + Raises: + `asyncio_taskpool.exceptions.PoolIsClosed` if the pool has been closed and `ignore_closed` is `False`. + """ if not (self.is_open or ignore_closed): raise exceptions.PoolIsClosed("Cannot start new tasks") await self._enough_room.acquire() @@ -151,38 +222,110 @@ class BaseTaskPool: return task_id def _get_running_task(self, task_id: int) -> Task: + """ + Gets a running task by its task ID. + + Args: + task_id: The ID of a task still running within the pool. + + Raises: + `asyncio_taskpool.exceptions.AlreadyCancelled` if the task with `task_id` has been (recently) cancelled. + `asyncio_taskpool.exceptions.AlreadyEnded` if the task with `task_id` has ended (recently). + `asyncio_taskpool.exceptions.InvalidTaskID` if no task with `task_id` is known to the pool. + """ try: return self._running[task_id] except KeyError: if self._cancelled.get(task_id): raise exceptions.AlreadyCancelled(f"{self._task_name(task_id)} has already been cancelled") if self._ended.get(task_id): - raise exceptions.AlreadyFinished(f"{self._task_name(task_id)} has finished running") + raise exceptions.AlreadyEnded(f"{self._task_name(task_id)} has finished running") raise exceptions.InvalidTaskID(f"No task with ID {task_id} found in {self}") def _cancel_task(self, task_id: int, msg: str = None) -> None: + """ + Cancels the running task with the specified ID. + + Args: + task_id: The ID of a task running within the pool that should be cancelled. + msg (optional): Passed to the `Task.cancel()` method. + """ self._get_running_task(task_id).cancel(msg=msg) def cancel(self, *task_ids: int, msg: str = None) -> None: + """ + Cancels the tasks with the specified IDs. + + Each task ID must belong to a task still running within the pool. Otherwise one of the following exceptions will + be raised: + - `AlreadyCancelled` if one of the `task_ids` belongs to a task that has been (recently) cancelled. + - `AlreadyEnded` if one of the `task_ids` belongs to a task that has ended (recently). + - `InvalidTaskID` if any of the `task_ids` is not known to the pool. + Note that once a pool has been flushed, any IDs of tasks that have ended previously will be forgotten. + + Args: + task_ids: + Arbitrary number of integers. Each must be an ID of a task still running within the pool. + msg (optional): + Passed to the `Task.cancel()` method of every task specified by the `task_ids`. + """ tasks = [self._get_running_task(task_id) for task_id in task_ids] for task in tasks: task.cancel(msg=msg) async def cancel_all(self, msg: str = None) -> None: + """ + Cancels all tasks still running within the pool. + This method blocks, **only if** a currently unknown number of coroutine functions have been registered to be + run as tasks. This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a + number smaller than the number of arguments from `args_iter`. + TODO: Consider changing this behaviour. + + Args: + msg (optional): + Passed to the `Task.cancel()` method of every task specified by the `task_ids`. + """ await self._all_tasks_known_flag.wait() for task in self._running.values(): task.cancel(msg=msg) async def flush(self, return_exceptions: bool = False): + """ + Calls `asyncio.gather` on all ended/cancelled tasks from the task pool, returns their results, and forgets them. + This method blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the + callbacks registered for the tasks block. + + Args: + return_exceptions (optional): Passed directly into `gather`. + """ results = await gather(*self._ended.values(), *self._cancelled.values(), return_exceptions=return_exceptions) self._ended = self._cancelled = {} return results def close(self) -> None: + """Disallows any more tasks to be started in the pool.""" self._open = False log.info("%s is closed!", str(self)) async def gather(self, return_exceptions: bool = False): + """ + Calls `asyncio.gather` on all tasks from the task pool, returns their results, and forgets them. + The `close()` method must have been called prior to this. + + This method blocks, if a currently unknown number of coroutine functions have been registered to be run as + tasks. This can happen, if for example the `TaskPool.map` method was called with `num_tasks` set to a number + smaller than the number of arguments from `args_iter`. + TODO: Consider changing this behaviour. + + This method also blocks, if any of the tasks block while catching a `asyncio.CancelledError` or any of the + callbacks registered for the tasks block. + + Args: + return_exceptions (optional): Passed directly into `gather`. + + Raises: + `asyncio_taskpool.exceptions.PoolStillOpen` if the pool has not been closed yet. + """ if self._open: raise exceptions.PoolStillOpen("Pool must be closed, before tasks can be gathered") await self._all_tasks_known_flag.wait()