diff --git a/setup.cfg b/setup.cfg index b9b0a3e..934e13a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = asyncio-taskpool -version = 0.1.3 +version = 0.1.4 author = Daniil Fajnberg author_email = mail@daniil.fajnberg.de description = Dynamically manage pools of asyncio tasks diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 2c08b39..de6a589 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -498,7 +498,22 @@ class TaskPool(BaseTaskPool): await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) await execute_optional(end_callback, args=(task_id,)) - def _fill_args_queue(self, q: Queue, args_iter: ArgsT, num_tasks: int) -> int: + def _fill_args_queue(self, q: Queue, args_iter: ArgsT, num_tasks: int) -> None: + """ + Helper function for `_map()`. + Takes the iterable of function arguments `args_iter` and adds up to `num_tasks` to the arguments queue `q`. + + If the iterable contains less than `num_tasks` elements, nothing else happens. + Otherwise the `_queue_producer` is started with the arguments queue and and iterator of the remaining arguments. + + Args: + q: + The (empty) new `asyncio.Queue` to hold the function arguments passed as `args_iter`. + args_iter: + The iterable of function arguments passed into `_map()` to use for creating the new tasks. + num_tasks: + The maximum number of the new tasks to run concurrently that was passed into `_map()`. + """ args_iter = iter(args_iter) try: # Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of @@ -509,14 +524,12 @@ class TaskPool(BaseTaskPool): # If we get here, this means that the number of elements in the arguments iterator was less than the # specified `num_tasks`. Thus, the number of tasks to start immediately will be the size of the queue. # The `_queue_producer` won't be necessary, since we already put all the elements in the queue. - num_tasks = q.qsize() - else: - # There may be more elements in the arguments iterator, so we need the `_queue_producer`. - # It will have exclusive access to the `args_iter` from now on. - # If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting - # the next item in it. - create_task(self._queue_producer(q, args_iter)) - return num_tasks + return + # There may be more elements in the arguments iterator, so we need the `_queue_producer`. + # It will have exclusive access to the `args_iter` from now on. + # If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting + # the next item in it. + create_task(self._queue_producer(q, args_iter)) async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1, end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: @@ -528,7 +541,7 @@ class TaskPool(BaseTaskPool): This method blocks, **only if** there is not enough room in the pool for the first batch of new tasks. - It sets up an internal queue which is filled while consuming the arguments iterable. + It sets up an internal arguments queue which is continuously filled while consuming the arguments iterable. The queue's `join()` method is added to the pool's `_before_gathering` list. Args: @@ -554,8 +567,8 @@ class TaskPool(BaseTaskPool): raise exceptions.PoolIsClosed("Cannot start new tasks") args_queue = Queue(maxsize=num_tasks) self._before_gathering.append(join_queue(args_queue)) - num_tasks = self._fill_args_queue(args_queue, args_iter, num_tasks) - for _ in range(num_tasks): + self._fill_args_queue(args_queue, args_iter, num_tasks) + for _ in range(args_queue.qsize()): # This is where blocking can occur, if the pool is full. await self._queue_consumer(args_queue, func, arg_stars=arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)