small improvement to the TaskPool._map args queue; added docstring to helper method

This commit is contained in:
Daniil Fajnberg 2022-02-08 12:23:16 +01:00
parent 3c69740c8d
commit d48b20818f
2 changed files with 26 additions and 13 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.1.3 version = 0.1.4
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -498,7 +498,22 @@ class TaskPool(BaseTaskPool):
await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)
await execute_optional(end_callback, args=(task_id,)) await execute_optional(end_callback, args=(task_id,))
def _fill_args_queue(self, q: Queue, args_iter: ArgsT, num_tasks: int) -> int: def _fill_args_queue(self, q: Queue, args_iter: ArgsT, num_tasks: int) -> None:
"""
Helper function for `_map()`.
Takes the iterable of function arguments `args_iter` and adds up to `num_tasks` to the arguments queue `q`.
If the iterable contains less than `num_tasks` elements, nothing else happens.
Otherwise the `_queue_producer` is started with the arguments queue and and iterator of the remaining arguments.
Args:
q:
The (empty) new `asyncio.Queue` to hold the function arguments passed as `args_iter`.
args_iter:
The iterable of function arguments passed into `_map()` to use for creating the new tasks.
num_tasks:
The maximum number of the new tasks to run concurrently that was passed into `_map()`.
"""
args_iter = iter(args_iter) args_iter = iter(args_iter)
try: try:
# Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of # Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of
@ -509,14 +524,12 @@ class TaskPool(BaseTaskPool):
# If we get here, this means that the number of elements in the arguments iterator was less than the # If we get here, this means that the number of elements in the arguments iterator was less than the
# specified `num_tasks`. Thus, the number of tasks to start immediately will be the size of the queue. # specified `num_tasks`. Thus, the number of tasks to start immediately will be the size of the queue.
# The `_queue_producer` won't be necessary, since we already put all the elements in the queue. # The `_queue_producer` won't be necessary, since we already put all the elements in the queue.
num_tasks = q.qsize() return
else:
# There may be more elements in the arguments iterator, so we need the `_queue_producer`. # There may be more elements in the arguments iterator, so we need the `_queue_producer`.
# It will have exclusive access to the `args_iter` from now on. # It will have exclusive access to the `args_iter` from now on.
# If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting # If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting
# the next item in it. # the next item in it.
create_task(self._queue_producer(q, args_iter)) create_task(self._queue_producer(q, args_iter))
return num_tasks
async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1, async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
@ -528,7 +541,7 @@ class TaskPool(BaseTaskPool):
This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks.
It sets up an internal queue which is filled while consuming the arguments iterable. It sets up an internal arguments queue which is continuously filled while consuming the arguments iterable.
The queue's `join()` method is added to the pool's `_before_gathering` list. The queue's `join()` method is added to the pool's `_before_gathering` list.
Args: Args:
@ -554,8 +567,8 @@ class TaskPool(BaseTaskPool):
raise exceptions.PoolIsClosed("Cannot start new tasks") raise exceptions.PoolIsClosed("Cannot start new tasks")
args_queue = Queue(maxsize=num_tasks) args_queue = Queue(maxsize=num_tasks)
self._before_gathering.append(join_queue(args_queue)) self._before_gathering.append(join_queue(args_queue))
num_tasks = self._fill_args_queue(args_queue, args_iter, num_tasks) self._fill_args_queue(args_queue, args_iter, num_tasks)
for _ in range(num_tasks): for _ in range(args_queue.qsize()):
# This is where blocking can occur, if the pool is full. # This is where blocking can occur, if the pool is full.
await self._queue_consumer(args_queue, func, await self._queue_consumer(args_queue, func,
arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)