factored out the queue setup of TaskPool._map

This commit is contained in:
Daniil Fajnberg 2022-02-08 12:09:21 +01:00
parent 586023f722
commit 3c69740c8d
3 changed files with 29 additions and 20 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.1.2 version = 0.1.3
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -1,4 +1,5 @@
from asyncio.coroutines import iscoroutinefunction from asyncio.coroutines import iscoroutinefunction
from asyncio.queues import Queue
from typing import Any, Optional from typing import Any, Optional
from .types import T, AnyCallableT, ArgsT, KwArgsT from .types import T, AnyCallableT, ArgsT, KwArgsT
@ -22,3 +23,7 @@ def star_function(function: AnyCallableT, arg: Any, arg_stars: int = 0) -> T:
if arg_stars == 2: if arg_stars == 2:
return function(**arg) return function(**arg)
raise ValueError(f"Invalid argument arg_stars={arg_stars}; must be 0, 1, or 2.") raise ValueError(f"Invalid argument arg_stars={arg_stars}; must be 0, 1, or 2.")
async def join_queue(q: Queue) -> None:
await q.join()

View File

@ -10,7 +10,7 @@ from math import inf
from typing import Any, Awaitable, Dict, Iterable, Iterator, List from typing import Any, Awaitable, Dict, Iterable, Iterator, List
from . import exceptions from . import exceptions
from .helpers import execute_optional, star_function from .helpers import execute_optional, star_function, join_queue
from .types import ArgsT, KwArgsT, CoroutineFunc, EndCallbackT, CancelCallbackT from .types import ArgsT, KwArgsT, CoroutineFunc, EndCallbackT, CancelCallbackT
@ -498,6 +498,26 @@ class TaskPool(BaseTaskPool):
await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback) await self._queue_consumer(q, func, arg_stars, end_callback=end_callback, cancel_callback=cancel_callback)
await execute_optional(end_callback, args=(task_id,)) await execute_optional(end_callback, args=(task_id,))
def _fill_args_queue(self, q: Queue, args_iter: ArgsT, num_tasks: int) -> int:
args_iter = iter(args_iter)
try:
# Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of
# tasks, which will be at most `num_tasks` (meaning the queue will be full).
for i in range(num_tasks):
q.put_nowait(next(args_iter))
except StopIteration:
# If we get here, this means that the number of elements in the arguments iterator was less than the
# specified `num_tasks`. Thus, the number of tasks to start immediately will be the size of the queue.
# The `_queue_producer` won't be necessary, since we already put all the elements in the queue.
num_tasks = q.qsize()
else:
# There may be more elements in the arguments iterator, so we need the `_queue_producer`.
# It will have exclusive access to the `args_iter` from now on.
# If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting
# the next item in it.
create_task(self._queue_producer(q, args_iter))
return num_tasks
async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1, async def _map(self, func: CoroutineFunc, args_iter: ArgsT, arg_stars: int = 0, num_tasks: int = 1,
end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None: end_callback: EndCallbackT = None, cancel_callback: CancelCallbackT = None) -> None:
""" """
@ -533,24 +553,8 @@ class TaskPool(BaseTaskPool):
if not self.is_open: if not self.is_open:
raise exceptions.PoolIsClosed("Cannot start new tasks") raise exceptions.PoolIsClosed("Cannot start new tasks")
args_queue = Queue(maxsize=num_tasks) args_queue = Queue(maxsize=num_tasks)
self._before_gathering.append(args_queue.join()) self._before_gathering.append(join_queue(args_queue))
args_iter = iter(args_iter) num_tasks = self._fill_args_queue(args_queue, args_iter, num_tasks)
try:
# Here we guarantee that the queue will contain as many arguments as needed for starting the first batch of
# tasks, which will be at most `num_tasks` (meaning the queue will be full).
for i in range(num_tasks):
args_queue.put_nowait(next(args_iter))
except StopIteration:
# If we get here, this means that the number of elements in the arguments iterator was less than the
# specified `num_tasks`. Thus, the number of tasks to start immediately will be the size of the queue.
# The `_queue_producer` won't be necessary, since we already put all the elements in the queue.
num_tasks = args_queue.qsize()
else:
# There may be more elements in the arguments iterator, so we need the `_queue_producer`.
# It will have exclusive access to the `args_iter` from now on.
# If the queue is full already, it will wait until one of the tasks in the first batch ends, before putting
# the next item in it.
create_task(self._queue_producer(args_queue, args_iter))
for _ in range(num_tasks): for _ in range(num_tasks):
# This is where blocking can occur, if the pool is full. # This is where blocking can occur, if the pool is full.
await self._queue_consumer(args_queue, func, await self._queue_consumer(args_queue, func,