Compare commits

..

2 Commits

Author SHA1 Message Date
44c03cc493 catching exceptions in _queue_consumer meta task 2022-03-16 16:57:03 +01:00
689a74c678 control interface now supports TaskPool instances:
dotted paths to coroutine functions can be passed to the parser as arguments for methods like `map`;
parser supports literal evaluation for the argument iterables in methods like `map`;
minor fixes
2022-03-16 11:27:27 +01:00
9 changed files with 106 additions and 32 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.6.4 version = 0.7.1
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks
@ -9,7 +9,7 @@ long_description_content_type = text/markdown
keywords = asyncio, concurrency, tasks, coroutines, asynchronous, server keywords = asyncio, concurrency, tasks, coroutines, asynchronous, server
url = https://git.fajnberg.de/daniil/asyncio-taskpool url = https://git.fajnberg.de/daniil/asyncio-taskpool
project_urls = project_urls =
Bug Tracker = https://git.fajnberg.de/daniil/asyncio-taskpool/issues Bug Tracker = https://github.com/daniil-berg/asyncio-taskpool/issues
classifiers = classifiers =
Development Status :: 3 - Alpha Development Status :: 3 - Alpha
Programming Language :: Python :: 3 Programming Language :: Python :: 3

View File

@ -20,14 +20,16 @@ This module contains the the definition of the `ControlParser` class used by a c
from argparse import Action, ArgumentParser, ArgumentDefaultsHelpFormatter, HelpFormatter, SUPPRESS from argparse import Action, ArgumentParser, ArgumentDefaultsHelpFormatter, HelpFormatter, SUPPRESS
from ast import literal_eval
from asyncio.streams import StreamWriter from asyncio.streams import StreamWriter
from inspect import Parameter, getmembers, isfunction, signature from inspect import Parameter, getmembers, isfunction, signature
from shutil import get_terminal_size from shutil import get_terminal_size
from typing import Any, Callable, Container, Dict, Set, Type, TypeVar from typing import Any, Callable, Container, Dict, Iterable, Set, Type, TypeVar
from ..constants import CLIENT_INFO, CMD, STREAM_WRITER from ..constants import CLIENT_INFO, CMD, STREAM_WRITER
from ..exceptions import HelpRequested, ParserError from ..exceptions import HelpRequested, ParserError
from ..helpers import get_first_doc_line from ..helpers import get_first_doc_line, resolve_dotted_path
from ..types import ArgsT, CancelCB, CoroutineFunc, EndCB, KwArgsT
FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter]) FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter])
@ -266,7 +268,7 @@ class ControlParser(ArgumentParser):
kwargs.setdefault('nargs', '*') kwargs.setdefault('nargs', '*')
if not kwargs.get('action') == 'store_true': if not kwargs.get('action') == 'store_true':
# Set the type from the parameter annotation. # Set the type from the parameter annotation.
kwargs.setdefault('type', _get_arg_type_wrapper(parameter.annotation)) kwargs.setdefault('type', _get_type_from_annotation(parameter.annotation))
return self.add_argument(*name_or_flags, **kwargs) return self.add_argument(*name_or_flags, **kwargs)
def add_function_args(self, function: Callable, omit: Container[str] = OMIT_PARAMS_DEFAULT) -> None: def add_function_args(self, function: Callable, omit: Container[str] = OMIT_PARAMS_DEFAULT) -> None:
@ -300,3 +302,11 @@ def _get_arg_type_wrapper(cls: Type) -> Callable[[Any], Any]:
# Copy the name of the class to maintain useful help messages when incorrect arguments are passed. # Copy the name of the class to maintain useful help messages when incorrect arguments are passed.
wrapper.__name__ = cls.__name__ wrapper.__name__ = cls.__name__
return wrapper return wrapper
def _get_type_from_annotation(annotation: Type) -> Callable[[Any], Any]:
if any(annotation is t for t in {CoroutineFunc, EndCB, CancelCB}):
annotation = resolve_dotted_path
if any(annotation is t for t in {ArgsT, KwArgsT, Iterable[ArgsT], Iterable[KwArgsT]}):
annotation = literal_eval
return _get_arg_type_wrapper(annotation)

View File

@ -37,7 +37,7 @@ from .session import ControlSession
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool class ControlServer(ABC):
""" """
Abstract base class for a task pool control server. Abstract base class for a task pool control server.

View File

@ -21,6 +21,7 @@ Miscellaneous helper functions. None of these should be considered part of the p
from asyncio.coroutines import iscoroutinefunction from asyncio.coroutines import iscoroutinefunction
from asyncio.queues import Queue from asyncio.queues import Queue
from importlib import import_module
from inspect import getdoc from inspect import getdoc
from typing import Any, Optional, Union from typing import Any, Optional, Union
@ -63,3 +64,22 @@ async def return_or_exception(_function_to_execute: AnyCallableT, *args, **kwarg
return _function_to_execute(*args, **kwargs) return _function_to_execute(*args, **kwargs)
except Exception as e: except Exception as e:
return e return e
def resolve_dotted_path(dotted_path: str) -> object:
"""
Resolves a dotted path to a global object and returns that object.
Algorithm shamelessly stolen from the `logging.config` module from the standard library.
"""
names = dotted_path.split('.')
module_name = names.pop(0)
found = import_module(module_name)
for name in names:
try:
found = getattr(found, name)
except AttributeError:
module_name += f'.{name}'
import_module(module_name)
found = getattr(found, name)
return found

View File

@ -775,8 +775,15 @@ class TaskPool(BaseTaskPool):
if next_arg is self._QUEUE_END_SENTINEL: if next_arg is self._QUEUE_END_SENTINEL:
# The `_queue_producer()` either reached the last argument or was cancelled. # The `_queue_producer()` either reached the last argument or was cancelled.
return return
await self._start_task(star_function(func, next_arg, arg_stars=arg_stars), group_name=group_name, try:
ignore_lock=True, end_callback=release_cb, cancel_callback=cancel_callback) await self._start_task(star_function(func, next_arg, arg_stars=arg_stars), group_name=group_name,
ignore_lock=True, end_callback=release_cb, cancel_callback=cancel_callback)
except Exception as e:
# This means an exception occurred during task **creation**, meaning no task has been created.
# It does not imply an error within the task itself.
log.exception("%s occurred while trying to create task: %s(%s%s)",
str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
map_semaphore.release()
async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, async def _map(self, group_name: str, group_size: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:

View File

@ -20,12 +20,16 @@ Unittests for the `asyncio_taskpool.control.parser` module.
from argparse import ArgumentParser, HelpFormatter, ArgumentDefaultsHelpFormatter, RawTextHelpFormatter, SUPPRESS from argparse import ArgumentParser, HelpFormatter, ArgumentDefaultsHelpFormatter, RawTextHelpFormatter, SUPPRESS
from ast import literal_eval
from inspect import signature from inspect import signature
from unittest import TestCase from unittest import TestCase
from unittest.mock import MagicMock, call, patch from unittest.mock import MagicMock, call, patch
from typing import Iterable
from asyncio_taskpool.control import parser from asyncio_taskpool.control import parser
from asyncio_taskpool.exceptions import HelpRequested, ParserError from asyncio_taskpool.exceptions import HelpRequested, ParserError
from asyncio_taskpool.helpers import resolve_dotted_path
from asyncio_taskpool.types import ArgsT, CancelCB, CoroutineFunc, EndCB, KwArgsT
FOO, BAR = 'foo', 'bar' FOO, BAR = 'foo', 'bar'
@ -194,11 +198,11 @@ class ControlServerTestCase(TestCase):
self.parser.print_help(arg) self.parser.print_help(arg)
mock_print_help.assert_called_once_with(arg) mock_print_help.assert_called_once_with(arg)
@patch.object(parser, '_get_arg_type_wrapper') @patch.object(parser, '_get_type_from_annotation')
@patch.object(parser.ArgumentParser, 'add_argument') @patch.object(parser.ArgumentParser, 'add_argument')
def test_add_function_arg(self, mock_add_argument: MagicMock, mock__get_arg_type_wrapper: MagicMock): def test_add_function_arg(self, mock_add_argument: MagicMock, mock__get_type_from_annotation: MagicMock):
mock_add_argument.return_value = expected_output = 'action' mock_add_argument.return_value = expected_output = 'action'
mock__get_arg_type_wrapper.return_value = mock_type = 'fake' mock__get_type_from_annotation.return_value = mock_type = 'fake'
foo_type, args_type, bar_type, baz_type, boo_type = tuple, str, int, float, complex foo_type, args_type, bar_type, baz_type, boo_type = tuple, str, int, float, complex
bar_default, baz_default, boo_default = 1, 0.1, 1j bar_default, baz_default, boo_default = 1, 0.1, 1j
@ -211,42 +215,42 @@ class ControlServerTestCase(TestCase):
kwargs = {FOO + BAR: 'xyz'} kwargs = {FOO + BAR: 'xyz'}
self.assertEqual(expected_output, self.parser.add_function_arg(param_foo, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_foo, **kwargs))
mock_add_argument.assert_called_once_with('foo', type=mock_type, **kwargs) mock_add_argument.assert_called_once_with('foo', type=mock_type, **kwargs)
mock__get_arg_type_wrapper.assert_called_once_with(foo_type) mock__get_type_from_annotation.assert_called_once_with(foo_type)
mock_add_argument.reset_mock() mock_add_argument.reset_mock()
mock__get_arg_type_wrapper.reset_mock() mock__get_type_from_annotation.reset_mock()
self.assertEqual(expected_output, self.parser.add_function_arg(param_args, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_args, **kwargs))
mock_add_argument.assert_called_once_with('args', nargs='*', type=mock_type, **kwargs) mock_add_argument.assert_called_once_with('args', nargs='*', type=mock_type, **kwargs)
mock__get_arg_type_wrapper.assert_called_once_with(args_type) mock__get_type_from_annotation.assert_called_once_with(args_type)
mock_add_argument.reset_mock() mock_add_argument.reset_mock()
mock__get_arg_type_wrapper.reset_mock() mock__get_type_from_annotation.reset_mock()
self.assertEqual(expected_output, self.parser.add_function_arg(param_bar, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_bar, **kwargs))
mock_add_argument.assert_called_once_with('-b', '--bar', default=bar_default, type=mock_type, **kwargs) mock_add_argument.assert_called_once_with('-b', '--bar', default=bar_default, type=mock_type, **kwargs)
mock__get_arg_type_wrapper.assert_called_once_with(bar_type) mock__get_type_from_annotation.assert_called_once_with(bar_type)
mock_add_argument.reset_mock() mock_add_argument.reset_mock()
mock__get_arg_type_wrapper.reset_mock() mock__get_type_from_annotation.reset_mock()
self.assertEqual(expected_output, self.parser.add_function_arg(param_baz, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_baz, **kwargs))
mock_add_argument.assert_called_once_with('-B', '--baz', default=baz_default, type=mock_type, **kwargs) mock_add_argument.assert_called_once_with('-B', '--baz', default=baz_default, type=mock_type, **kwargs)
mock__get_arg_type_wrapper.assert_called_once_with(baz_type) mock__get_type_from_annotation.assert_called_once_with(baz_type)
mock_add_argument.reset_mock() mock_add_argument.reset_mock()
mock__get_arg_type_wrapper.reset_mock() mock__get_type_from_annotation.reset_mock()
self.assertEqual(expected_output, self.parser.add_function_arg(param_boo, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_boo, **kwargs))
mock_add_argument.assert_called_once_with('--boo', default=boo_default, type=mock_type, **kwargs) mock_add_argument.assert_called_once_with('--boo', default=boo_default, type=mock_type, **kwargs)
mock__get_arg_type_wrapper.assert_called_once_with(boo_type) mock__get_type_from_annotation.assert_called_once_with(boo_type)
mock_add_argument.reset_mock() mock_add_argument.reset_mock()
mock__get_arg_type_wrapper.reset_mock() mock__get_type_from_annotation.reset_mock()
self.assertEqual(expected_output, self.parser.add_function_arg(param_flag, **kwargs)) self.assertEqual(expected_output, self.parser.add_function_arg(param_flag, **kwargs))
mock_add_argument.assert_called_once_with('-f', '--flag', action='store_true', **kwargs) mock_add_argument.assert_called_once_with('-f', '--flag', action='store_true', **kwargs)
mock__get_arg_type_wrapper.assert_not_called() mock__get_type_from_annotation.assert_not_called()
@patch.object(parser.ControlParser, 'add_function_arg') @patch.object(parser.ControlParser, 'add_function_arg')
def test_add_function_args(self, mock_add_function_arg: MagicMock): def test_add_function_args(self, mock_add_function_arg: MagicMock):
@ -266,3 +270,20 @@ class RestTestCase(TestCase):
self.assertEqual('int', type_wrap.__name__) self.assertEqual('int', type_wrap.__name__)
self.assertEqual(SUPPRESS, type_wrap(SUPPRESS)) self.assertEqual(SUPPRESS, type_wrap(SUPPRESS))
self.assertEqual(13, type_wrap('13')) self.assertEqual(13, type_wrap('13'))
@patch.object(parser, '_get_arg_type_wrapper')
def test__get_type_from_annotation(self, mock__get_arg_type_wrapper: MagicMock):
mock__get_arg_type_wrapper.return_value = expected_output = FOO + BAR
dotted_path_ann = [CoroutineFunc, EndCB, CancelCB]
literal_eval_ann = [ArgsT, KwArgsT, Iterable[ArgsT], Iterable[KwArgsT]]
any_other_ann = MagicMock()
for a in dotted_path_ann:
self.assertEqual(expected_output, parser._get_type_from_annotation(a))
mock__get_arg_type_wrapper.assert_has_calls(len(dotted_path_ann) * [call(resolve_dotted_path)])
mock__get_arg_type_wrapper.reset_mock()
for a in literal_eval_ann:
self.assertEqual(expected_output, parser._get_type_from_annotation(a))
mock__get_arg_type_wrapper.assert_has_calls(len(literal_eval_ann) * [call(literal_eval)])
mock__get_arg_type_wrapper.reset_mock()
self.assertEqual(expected_output, parser._get_type_from_annotation(any_other_ann))
mock__get_arg_type_wrapper.assert_called_once_with(any_other_ann)

View File

@ -20,7 +20,7 @@ Unittests for the `asyncio_taskpool.helpers` module.
from unittest import IsolatedAsyncioTestCase from unittest import IsolatedAsyncioTestCase
from unittest.mock import MagicMock, AsyncMock, NonCallableMagicMock from unittest.mock import MagicMock, AsyncMock, NonCallableMagicMock, call, patch
from asyncio_taskpool import helpers from asyncio_taskpool import helpers
@ -118,3 +118,13 @@ class HelpersTestCase(IsolatedAsyncioTestCase):
output = await helpers.return_or_exception(mock_func, *args, **kwargs) output = await helpers.return_or_exception(mock_func, *args, **kwargs)
self.assertEqual(test_exception, output) self.assertEqual(test_exception, output)
mock_func.assert_called_once_with(*args, **kwargs) mock_func.assert_called_once_with(*args, **kwargs)
def test_resolve_dotted_path(self):
from logging import WARNING
from urllib.request import urlopen
self.assertEqual(WARNING, helpers.resolve_dotted_path('logging.WARNING'))
self.assertEqual(urlopen, helpers.resolve_dotted_path('urllib.request.urlopen'))
with patch.object(helpers, 'import_module', return_value=object) as mock_import_module:
with self.assertRaises(AttributeError):
helpers.resolve_dotted_path('foo.bar.baz')
mock_import_module.assert_has_calls([call('foo'), call('foo.bar')])

View File

@ -603,28 +603,34 @@ class TaskPoolTestCase(CommonTestCase):
@patch.object(pool, 'star_function') @patch.object(pool, 'star_function')
@patch.object(pool.TaskPool, '_start_task') @patch.object(pool.TaskPool, '_start_task')
@patch.object(pool, 'Semaphore')
@patch.object(pool.TaskPool, '_get_map_end_callback') @patch.object(pool.TaskPool, '_get_map_end_callback')
async def test__queue_consumer(self, mock__get_map_end_callback: MagicMock, mock_semaphore_cls: MagicMock, @patch.object(pool, 'Semaphore')
async def test__queue_consumer(self, mock_semaphore_cls: MagicMock, mock__get_map_end_callback: MagicMock,
mock__start_task: AsyncMock, mock_star_function: MagicMock): mock__start_task: AsyncMock, mock_star_function: MagicMock):
mock__get_map_end_callback.return_value = map_cb = MagicMock()
mock_semaphore_cls.return_value = semaphore = Semaphore(3) mock_semaphore_cls.return_value = semaphore = Semaphore(3)
mock_star_function.return_value = awaitable = 'totally an awaitable' mock__get_map_end_callback.return_value = map_cb = MagicMock()
arg1, arg2 = 123456789, 'function argument' awaitable = 'totally an awaitable'
mock_star_function.side_effect = [awaitable, awaitable, Exception()]
arg1, arg2, bad = 123456789, 'function argument', None
mock_q_maxsize = 3 mock_q_maxsize = 3
mock_q = MagicMock(__aenter__=AsyncMock(side_effect=[arg1, arg2, pool.TaskPool._QUEUE_END_SENTINEL]), mock_q = MagicMock(__aenter__=AsyncMock(side_effect=[arg1, arg2, bad, pool.TaskPool._QUEUE_END_SENTINEL]),
__aexit__=AsyncMock(), maxsize=mock_q_maxsize) __aexit__=AsyncMock(), maxsize=mock_q_maxsize)
group_name, mock_func, stars = 'whatever', MagicMock(), 3 group_name, mock_func, stars = 'whatever', MagicMock(__name__="mock"), 3
end_cb, cancel_cb = MagicMock(), MagicMock() end_cb, cancel_cb = MagicMock(), MagicMock()
self.assertIsNone(await self.task_pool._queue_consumer(mock_q, group_name, mock_func, stars, end_cb, cancel_cb)) self.assertIsNone(await self.task_pool._queue_consumer(mock_q, group_name, mock_func, stars, end_cb, cancel_cb))
# We expect the semaphore to be acquired 3 times, then be released once after the exception occurs, then
# acquired once more when the `_QUEUE_END_SENTINEL` is reached. Since we initialized it with a value of 3,
# at the end of the loop, we expect it be locked.
self.assertTrue(semaphore.locked()) self.assertTrue(semaphore.locked())
mock_semaphore_cls.assert_called_once_with(mock_q_maxsize)
mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb) mock__get_map_end_callback.assert_called_once_with(semaphore, actual_end_callback=end_cb)
mock__start_task.assert_has_awaits(2 * [ mock__start_task.assert_has_awaits(2 * [
call(awaitable, group_name=group_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb) call(awaitable, group_name=group_name, ignore_lock=True, end_callback=map_cb, cancel_callback=cancel_cb)
]) ])
mock_star_function.assert_has_calls([ mock_star_function.assert_has_calls([
call(mock_func, arg1, arg_stars=stars), call(mock_func, arg1, arg_stars=stars),
call(mock_func, arg2, arg_stars=stars) call(mock_func, arg2, arg_stars=stars),
call(mock_func, bad, arg_stars=stars)
]) ])
@patch.object(pool, 'create_task') @patch.object(pool, 'create_task')

View File

@ -15,7 +15,7 @@ You should have received a copy of the GNU Lesser General Public License along w
If not, see <https://www.gnu.org/licenses/>.""" If not, see <https://www.gnu.org/licenses/>."""
__doc__ = """ __doc__ = """
Working example of a UnixControlServer in combination with the SimpleTaskPool. Working example of a TCPControlServer in combination with the SimpleTaskPool.
Use the main CLI client to interface at the socket. Use the main CLI client to interface at the socket.
""" """