Compare commits

...

5 Commits

19 changed files with 278 additions and 74 deletions

View File

@ -2,9 +2,18 @@
**Dynamically manage pools of asyncio tasks** **Dynamically manage pools of asyncio tasks**
## Contents
- [Contents](#contents)
- [Summary](#summary)
- [Usage](#usage)
- [Installation](#installation)
- [Dependencies](#dependencies)
- [Testing](#testing)
- [License](#license)
## Summary ## Summary
A task pool is an object with a simple interface for aggregating and dynamically managing asynchronous tasks. A **task pool** is an object with a simple interface for aggregating and dynamically managing asynchronous tasks.
With an interface that is intentionally similar to the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class from the standard library, the `TaskPool` provides you such methods as `apply`, `map`, and `starmap` to execute coroutines concurrently as [`asyncio.Task`](https://docs.python.org/3/library/asyncio-task.html#task-object) objects. There is no limitation imposed on what kind of tasks can be run or in what combination, when new ones can be added, or when they can be cancelled. With an interface that is intentionally similar to the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class from the standard library, the `TaskPool` provides you such methods as `apply`, `map`, and `starmap` to execute coroutines concurrently as [`asyncio.Task`](https://docs.python.org/3/library/asyncio-task.html#task-object) objects. There is no limitation imposed on what kind of tasks can be run or in what combination, when new ones can be added, or when they can be cancelled.
@ -22,7 +31,7 @@ from asyncio_taskpool import SimpleTaskPool
... ...
async def work(foo, bar): ... async def work(_foo, _bar): ...
... ...
@ -55,7 +64,7 @@ Python Version 3.8+, tested on Linux
## Testing ## Testing
Install `asyncio-taskpool[dev]` dependencies or just manually install `coverage` with `pip`. Install `asyncio-taskpool[dev]` dependencies or just manually install [`coverage`](https://coverage.readthedocs.io/en/latest/) with `pip`.
Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests and receive the coverage report. Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests and receive the coverage report.
## License ## License
@ -64,6 +73,6 @@ Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests an
The full license texts for the [GNU GPLv3.0](COPYING) and the [GNU LGPLv3.0](COPYING.LESSER) are included in this repository. If not, see https://www.gnu.org/licenses/. The full license texts for the [GNU GPLv3.0](COPYING) and the [GNU LGPLv3.0](COPYING.LESSER) are included in this repository. If not, see https://www.gnu.org/licenses/.
## Copyright ---
© 2022 Daniil Fajnberg © 2022 Daniil Fajnberg

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.6.0 version = 0.6.3
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -19,5 +19,5 @@ Brings the main classes up to package level for import convenience.
""" """
from .control.server import TCPControlServer, UnixControlServer
from .pool import TaskPool, SimpleTaskPool from .pool import TaskPool, SimpleTaskPool
from .server import TCPControlServer, UnixControlServer

View File

View File

@ -19,37 +19,30 @@ CLI client entry point.
""" """
import sys
from argparse import ArgumentParser from argparse import ArgumentParser
from asyncio import run from asyncio import run
from pathlib import Path from pathlib import Path
from typing import Dict, Any from typing import Any, Dict, Sequence
from ..constants import PACKAGE_NAME
from ..pool import TaskPool
from .client import ControlClient, TCPControlClient, UnixControlClient from .client import ControlClient, TCPControlClient, UnixControlClient
from .constants import PACKAGE_NAME
from .pool import TaskPool
from .server import TCPControlServer, UnixControlServer from .server import TCPControlServer, UnixControlServer
CONN_TYPE = 'conn_type' CLIENT_CLASS = 'client_class'
UNIX, TCP = 'unix', 'tcp' UNIX, TCP = 'unix', 'tcp'
SOCKET_PATH = 'path' SOCKET_PATH = 'path'
HOST, PORT = 'host', 'port' HOST, PORT = 'host', 'port'
def parse_cli() -> Dict[str, Any]: def parse_cli(args: Sequence[str] = None) -> Dict[str, Any]:
parser = ArgumentParser( parser = ArgumentParser(
prog=PACKAGE_NAME, prog=f'{PACKAGE_NAME}.control',
description=f"CLI based {ControlClient.__name__} for {PACKAGE_NAME}" description=f"Simple CLI based {ControlClient.__name__} for {PACKAGE_NAME}"
)
subparsers = parser.add_subparsers(title="Connection types", dest=CONN_TYPE)
unix_parser = subparsers.add_parser(UNIX, help="Connect via unix socket")
unix_parser.add_argument(
SOCKET_PATH,
type=Path,
help=f"Path to the unix socket on which the {UnixControlServer.__name__} for the {TaskPool.__name__} is "
f"listening."
) )
subparsers = parser.add_subparsers(title="Connection types")
tcp_parser = subparsers.add_parser(TCP, help="Connect via TCP socket") tcp_parser = subparsers.add_parser(TCP, help="Connect via TCP socket")
tcp_parser.add_argument( tcp_parser.add_argument(
HOST, HOST,
@ -60,19 +53,25 @@ def parse_cli() -> Dict[str, Any]:
type=int, type=int,
help=f"Port that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on." help=f"Port that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on."
) )
return vars(parser.parse_args()) tcp_parser.set_defaults(**{CLIENT_CLASS: TCPControlClient})
unix_parser = subparsers.add_parser(UNIX, help="Connect via unix socket")
unix_parser.add_argument(
SOCKET_PATH,
type=Path,
help=f"Path to the unix socket on which the {UnixControlServer.__name__} for the {TaskPool.__name__} is "
f"listening."
)
unix_parser.set_defaults(**{CLIENT_CLASS: UnixControlClient})
return vars(parser.parse_args(args))
async def main(): async def main():
kwargs = parse_cli() kwargs = parse_cli()
if kwargs[CONN_TYPE] == UNIX: client_cls = kwargs.pop(CLIENT_CLASS)
client = UnixControlClient(socket_path=kwargs[SOCKET_PATH]) await client_cls(**kwargs).start()
elif kwargs[CONN_TYPE] == TCP:
client = TCPControlClient(host=kwargs[HOST], port=kwargs[PORT])
else:
print("Invalid connection type", file=sys.stderr)
sys.exit(2)
await client.start()
if __name__ == '__main__': if __name__ == '__main__':
run(main()) run(main())

View File

@ -27,8 +27,8 @@ from asyncio.streams import StreamReader, StreamWriter, open_connection
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
from .constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES from ..constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES
from .types import ClientConnT, PathT from ..types import ClientConnT, PathT
class ControlClient(ABC): class ControlClient(ABC):
@ -76,6 +76,7 @@ class ControlClient(ABC):
writer.write(json.dumps(self.client_info()).encode()) writer.write(json.dumps(self.client_info()).encode())
await writer.drain() await writer.drain()
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode()) print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
print("Type '-h' to get help and usage instructions for all available commands.\n")
def _get_command(self, writer: StreamWriter) -> Optional[str]: def _get_command(self, writer: StreamWriter) -> Optional[str]:
""" """

View File

@ -25,9 +25,9 @@ from inspect import Parameter, getmembers, isfunction, signature
from shutil import get_terminal_size from shutil import get_terminal_size
from typing import Callable, Container, Dict, Set, Type, TypeVar from typing import Callable, Container, Dict, Set, Type, TypeVar
from .constants import CLIENT_INFO, CMD, STREAM_WRITER from ..constants import CLIENT_INFO, CMD, STREAM_WRITER
from .exceptions import HelpRequested from ..exceptions import HelpRequested
from .helpers import get_first_doc_line from ..helpers import get_first_doc_line
FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter]) FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter])

View File

@ -28,10 +28,10 @@ from asyncio.tasks import Task, create_task
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
from ..pool import TaskPool, SimpleTaskPool
from ..types import ConnectedCallbackT
from .client import ControlClient, TCPControlClient, UnixControlClient from .client import ControlClient, TCPControlClient, UnixControlClient
from .pool import TaskPool, SimpleTaskPool
from .session import ControlSession from .session import ControlSession
from .types import ConnectedCallbackT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -136,7 +136,7 @@ class TCPControlServer(ControlServer):
"""Task pool control server class that exposes a TCP socket for control clients to connect to.""" """Task pool control server class that exposes a TCP socket for control clients to connect to."""
_client_class = TCPControlClient _client_class = TCPControlClient
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None: def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
self._host = server_kwargs.pop('host') self._host = server_kwargs.pop('host')
self._port = server_kwargs.pop('port') self._port = server_kwargs.pop('port')
super().__init__(pool, **server_kwargs) super().__init__(pool, **server_kwargs)
@ -154,7 +154,7 @@ class UnixControlServer(ControlServer):
"""Task pool control server class that exposes a unix socket for control clients to connect to.""" """Task pool control server class that exposes a unix socket for control clients to connect to."""
_client_class = UnixControlClient _client_class = UnixControlClient
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None: def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
from asyncio.streams import start_unix_server from asyncio.streams import start_unix_server
self._start_unix_server = start_unix_server self._start_unix_server = start_unix_server
self._socket_path = Path(server_kwargs.pop('path')) self._socket_path = Path(server_kwargs.pop('path'))

View File

@ -26,10 +26,10 @@ from asyncio.streams import StreamReader, StreamWriter
from inspect import isfunction, signature from inspect import isfunction, signature
from typing import Callable, Optional, Union, TYPE_CHECKING from typing import Callable, Optional, Union, TYPE_CHECKING
from .constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES, STREAM_WRITER from ..constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES, STREAM_WRITER
from .exceptions import HelpRequested from ..exceptions import CommandError, HelpRequested
from .helpers import return_or_exception from ..helpers import return_or_exception
from .pool import TaskPool, SimpleTaskPool from ..pool import TaskPool, SimpleTaskPool
from .parser import ControlParser from .parser import ControlParser
if TYPE_CHECKING: if TYPE_CHECKING:
@ -85,7 +85,7 @@ class ControlSession:
Must correspond to the arguments expected by the `method`. Must correspond to the arguments expected by the `method`.
Correctly unpacks arbitrary-length positional and keyword-arguments. Correctly unpacks arbitrary-length positional and keyword-arguments.
""" """
log.warning("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__) log.debug("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__)
normal_pos, var_pos = [], [] normal_pos, var_pos = [], []
for param in signature(method).parameters.values(): for param in signature(method).parameters.values():
if param.name == 'self': if param.name == 'self':
@ -112,11 +112,11 @@ class ControlSession:
executed and the response written to the stream will be its return value (as an encoded string). executed and the response written to the stream will be its return value (as an encoded string).
""" """
if kwargs: if kwargs:
log.warning("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__) log.debug("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
await return_or_exception(prop.fset, self._pool, **kwargs) await return_or_exception(prop.fset, self._pool, **kwargs)
self._writer.write(CMD_OK) self._writer.write(CMD_OK)
else: else:
log.warning("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__) log.debug("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode()) self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode())
async def client_handshake(self) -> None: async def client_handshake(self) -> None:
@ -131,7 +131,7 @@ class ControlSession:
STREAM_WRITER: self._writer, STREAM_WRITER: self._writer,
CLIENT_INFO.TERMINAL_WIDTH: client_info[CLIENT_INFO.TERMINAL_WIDTH], CLIENT_INFO.TERMINAL_WIDTH: client_info[CLIENT_INFO.TERMINAL_WIDTH],
'prog': '', 'prog': '',
'usage': f'%(prog)s [-h] [{CMD}] ...' 'usage': f'[-h] [{CMD}] ...'
} }
self._parser = ControlParser(**parser_kwargs) self._parser = ControlParser(**parser_kwargs)
self._parser.add_subparsers(title="Commands", self._parser.add_subparsers(title="Commands",
@ -154,15 +154,19 @@ class ControlSession:
try: try:
kwargs = vars(self._parser.parse_args(msg.split(' '))) kwargs = vars(self._parser.parse_args(msg.split(' ')))
except ArgumentError as e: except ArgumentError as e:
log.debug("%s got an ArgumentError", self._client_class_name)
self._writer.write(str(e).encode()) self._writer.write(str(e).encode())
return return
except HelpRequested: except HelpRequested:
log.debug("%s received usage help", self._client_class_name)
return return
command = kwargs.pop(CMD) command = kwargs.pop(CMD)
if isfunction(command): if isfunction(command):
await self._exec_method_and_respond(command, **kwargs) await self._exec_method_and_respond(command, **kwargs)
elif isinstance(command, property): elif isinstance(command, property):
await self._exec_property_and_respond(command, **kwargs) await self._exec_property_and_respond(command, **kwargs)
else:
self._writer.write(str(CommandError(f"Unknown command object: {command}")).encode())
async def listen(self) -> None: async def listen(self) -> None:
""" """

View File

@ -65,3 +65,7 @@ class ServerException(Exception):
class HelpRequested(ServerException): class HelpRequested(ServerException):
pass pass
class CommandError(ServerException):
pass

View File

@ -120,7 +120,7 @@ class BaseTaskPool:
@property @property
def is_locked(self) -> bool: def is_locked(self) -> bool:
"""Returns `True` if more the pool has been locked (see below).""" """Returns `True` if the pool has been locked (see below)."""
return self._locked return self._locked
def lock(self) -> None: def lock(self) -> None:

View File

View File

@ -0,0 +1,45 @@
from pathlib import Path
from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch
from asyncio_taskpool.control.client import TCPControlClient, UnixControlClient
from asyncio_taskpool.control import __main__ as module
class CLITestCase(IsolatedAsyncioTestCase):
def test_parse_cli(self):
socket_path = '/some/path/to.sock'
args = [module.UNIX, socket_path]
expected_kwargs = {
module.CLIENT_CLASS: UnixControlClient,
module.SOCKET_PATH: Path(socket_path)
}
parsed_kwargs = module.parse_cli(args)
self.assertDictEqual(expected_kwargs, parsed_kwargs)
host, port = '1.2.3.4', '1234'
args = [module.TCP, host, port]
expected_kwargs = {
module.CLIENT_CLASS: TCPControlClient,
module.HOST: host,
module.PORT: int(port)
}
parsed_kwargs = module.parse_cli(args)
self.assertDictEqual(expected_kwargs, parsed_kwargs)
with patch('sys.stderr'):
with self.assertRaises(SystemExit):
module.parse_cli(['invalid', 'foo', 'bar'])
@patch.object(module, 'parse_cli')
async def test_main(self, mock_parse_cli: MagicMock):
mock_client_start = AsyncMock()
mock_client = MagicMock(start=mock_client_start)
mock_client_cls = MagicMock(return_value=mock_client)
mock_client_kwargs = {'foo': 123, 'bar': 456, 'baz': 789}
mock_parse_cli.return_value = {module.CLIENT_CLASS: mock_client_cls} | mock_client_kwargs
self.assertIsNone(await module.main())
mock_parse_cli.assert_called_once_with()
mock_client_cls.assert_called_once_with(**mock_client_kwargs)
mock_client_start.assert_awaited_once_with()

View File

@ -25,9 +25,9 @@ import shutil
import sys import sys
from pathlib import Path from pathlib import Path
from unittest import IsolatedAsyncioTestCase, skipIf from unittest import IsolatedAsyncioTestCase, skipIf
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, call, patch
from asyncio_taskpool import client from asyncio_taskpool.control import client
from asyncio_taskpool.constants import CLIENT_INFO, SESSION_MSG_BYTES from asyncio_taskpool.constants import CLIENT_INFO, SESSION_MSG_BYTES
@ -37,7 +37,7 @@ FOO, BAR = 'foo', 'bar'
class ControlClientTestCase(IsolatedAsyncioTestCase): class ControlClientTestCase(IsolatedAsyncioTestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.abstract_patcher = patch('asyncio_taskpool.client.ControlClient.__abstractmethods__', set()) self.abstract_patcher = patch('asyncio_taskpool.control.client.ControlClient.__abstractmethods__', set())
self.print_patcher = patch.object(client, 'print') self.print_patcher = patch.object(client, 'print')
self.mock_abstract_methods = self.abstract_patcher.start() self.mock_abstract_methods = self.abstract_patcher.start()
self.mock_print = self.print_patcher.start() self.mock_print = self.print_patcher.start()
@ -74,7 +74,10 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
self.mock_write.assert_called_once_with(json.dumps(mock_info).encode()) self.mock_write.assert_called_once_with(json.dumps(mock_info).encode())
self.mock_drain.assert_awaited_once_with() self.mock_drain.assert_awaited_once_with()
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES) self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
self.mock_print.assert_called_once_with("Connected to", self.mock_read.return_value.decode()) self.mock_print.assert_has_calls([
call("Connected to", self.mock_read.return_value.decode()),
call("Type '-h' to get help and usage instructions for all available commands.\n")
])
@patch.object(client, 'input') @patch.object(client, 'input')
def test__get_command(self, mock_input: MagicMock): def test__get_command(self, mock_input: MagicMock):
@ -172,6 +175,43 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
self.mock_print.assert_called_once_with("Disconnected from control server.") self.mock_print.assert_called_once_with("Disconnected from control server.")
class TCPControlClientTestCase(IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.base_init_patcher = patch.object(client.ControlClient, '__init__')
self.mock_base_init = self.base_init_patcher.start()
self.host, self.port = 'localhost', 12345
self.kwargs = {FOO: 123, BAR: 456}
self.client = client.TCPControlClient(host=self.host, port=self.port, **self.kwargs)
def tearDown(self) -> None:
self.base_init_patcher.stop()
def test_init(self):
self.assertEqual(self.host, self.client._host)
self.assertEqual(self.port, self.client._port)
self.mock_base_init.assert_called_once_with(**self.kwargs)
@patch.object(client, 'print')
@patch.object(client, 'open_connection')
async def test__open_connection(self, mock_open_connection: AsyncMock, mock_print: MagicMock):
mock_open_connection.return_value = expected_output = 'something'
kwargs = {'a': 1, 'b': 2}
output = await self.client._open_connection(**kwargs)
self.assertEqual(expected_output, output)
mock_open_connection.assert_awaited_once_with(self.host, self.port, **kwargs)
mock_print.assert_not_called()
mock_open_connection.reset_mock()
mock_open_connection.side_effect = e = ConnectionError()
output1, output2 = await self.client._open_connection(**kwargs)
self.assertIsNone(output1)
self.assertIsNone(output2)
mock_open_connection.assert_awaited_once_with(self.host, self.port, **kwargs)
mock_print.assert_called_once_with(str(e), file=sys.stderr)
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(") @skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
class UnixControlClientTestCase(IsolatedAsyncioTestCase): class UnixControlClientTestCase(IsolatedAsyncioTestCase):

View File

@ -24,7 +24,7 @@ from inspect import signature
from unittest import TestCase from unittest import TestCase
from unittest.mock import MagicMock, call, patch from unittest.mock import MagicMock, call, patch
from asyncio_taskpool import parser from asyncio_taskpool.control import parser
from asyncio_taskpool.exceptions import HelpRequested from asyncio_taskpool.exceptions import HelpRequested
@ -157,6 +157,14 @@ class ControlServerTestCase(TestCase):
mock_add_property_command.assert_called_once_with(FooBar.prop, FooBar.__name__, **common_kwargs) mock_add_property_command.assert_called_once_with(FooBar.prop, FooBar.__name__, **common_kwargs)
mock_set_defaults.assert_has_calls([call(**{x: FooBar.method}), call(**{x: FooBar.prop})]) mock_set_defaults.assert_has_calls([call(**{x: FooBar.method}), call(**{x: FooBar.prop})])
@patch.object(parser.ArgumentParser, 'add_subparsers')
def test_add_subparsers(self, mock_base_add_subparsers: MagicMock):
args, kwargs = [1, 2, 42], {FOO: 123, BAR: 456}
mock_base_add_subparsers.return_value = mock_action = MagicMock()
output = self.parser.add_subparsers(*args, **kwargs)
self.assertEqual(mock_action, output)
mock_base_add_subparsers.assert_called_once_with(*args, **kwargs)
def test__print_message(self): def test__print_message(self):
self.stream_writer.write = MagicMock() self.stream_writer.write = MagicMock()
self.assertIsNone(self.parser._print_message('')) self.assertIsNone(self.parser._print_message(''))

View File

@ -26,8 +26,8 @@ from pathlib import Path
from unittest import IsolatedAsyncioTestCase, skipIf from unittest import IsolatedAsyncioTestCase, skipIf
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from asyncio_taskpool import server from asyncio_taskpool.control import server
from asyncio_taskpool.client import ControlClient, UnixControlClient from asyncio_taskpool.control.client import ControlClient, TCPControlClient, UnixControlClient
FOO, BAR = 'foo', 'bar' FOO, BAR = 'foo', 'bar'
@ -46,7 +46,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
server.log.setLevel(cls.log_lvl) server.log.setLevel(cls.log_lvl)
def setUp(self) -> None: def setUp(self) -> None:
self.abstract_patcher = patch('asyncio_taskpool.server.ControlServer.__abstractmethods__', set()) self.abstract_patcher = patch('asyncio_taskpool.control.server.ControlServer.__abstractmethods__', set())
self.mock_abstract_methods = self.abstract_patcher.start() self.mock_abstract_methods = self.abstract_patcher.start()
self.mock_pool = MagicMock() self.mock_pool = MagicMock()
self.kwargs = {FOO: 123, BAR: 456} self.kwargs = {FOO: 123, BAR: 456}
@ -120,6 +120,50 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock_create_task.assert_called_once_with(mock_awaitable) mock_create_task.assert_called_once_with(mock_awaitable)
class TCPControlServerTestCase(IsolatedAsyncioTestCase):
log_lvl: int
@classmethod
def setUpClass(cls) -> None:
cls.log_lvl = server.log.level
server.log.setLevel(999)
@classmethod
def tearDownClass(cls) -> None:
server.log.setLevel(cls.log_lvl)
def setUp(self) -> None:
self.base_init_patcher = patch.object(server.ControlServer, '__init__')
self.mock_base_init = self.base_init_patcher.start()
self.mock_pool = MagicMock()
self.host, self.port = 'localhost', 12345
self.kwargs = {FOO: 123, BAR: 456}
self.server = server.TCPControlServer(pool=self.mock_pool, host=self.host, port=self.port, **self.kwargs)
def tearDown(self) -> None:
self.base_init_patcher.stop()
def test__client_class(self):
self.assertEqual(TCPControlClient, self.server._client_class)
def test_init(self):
self.assertEqual(self.host, self.server._host)
self.assertEqual(self.port, self.server._port)
self.mock_base_init.assert_called_once_with(self.mock_pool, **self.kwargs)
@patch.object(server, 'start_server')
async def test__get_server_instance(self, mock_start_server: AsyncMock):
mock_start_server.return_value = expected_output = 'totally_a_server'
mock_callback, mock_kwargs = MagicMock(), {'a': 1, 'b': 2}
args = [mock_callback]
output = await self.server._get_server_instance(*args, **mock_kwargs)
self.assertEqual(expected_output, output)
mock_start_server.assert_called_once_with(mock_callback, self.host, self.port, **mock_kwargs)
def test__final_callback(self):
self.assertIsNone(self.server._final_callback())
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(") @skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
class UnixControlServerTestCase(IsolatedAsyncioTestCase): class UnixControlServerTestCase(IsolatedAsyncioTestCase):
log_lvl: int log_lvl: int

View File

@ -24,7 +24,7 @@ from argparse import ArgumentError, Namespace
from unittest import IsolatedAsyncioTestCase from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch, call from unittest.mock import AsyncMock, MagicMock, patch, call
from asyncio_taskpool import session from asyncio_taskpool.control import session
from asyncio_taskpool.constants import CLIENT_INFO, CMD, SESSION_MSG_BYTES, STREAM_WRITER from asyncio_taskpool.constants import CLIENT_INFO, CMD, SESSION_MSG_BYTES, STREAM_WRITER
from asyncio_taskpool.exceptions import HelpRequested from asyncio_taskpool.exceptions import HelpRequested
from asyncio_taskpool.pool import SimpleTaskPool from asyncio_taskpool.pool import SimpleTaskPool
@ -107,7 +107,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
STREAM_WRITER: self.mock_writer, STREAM_WRITER: self.mock_writer,
CLIENT_INFO.TERMINAL_WIDTH: width, CLIENT_INFO.TERMINAL_WIDTH: width,
'prog': '', 'prog': '',
'usage': f'%(prog)s [-h] [{CMD}] ...' 'usage': f'[-h] [{CMD}] ...'
} }
expected_subparsers_kwargs = { expected_subparsers_kwargs = {
'title': "Commands", 'title': "Commands",
@ -142,9 +142,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock__exec_method_and_respond.reset_mock() mock__exec_method_and_respond.reset_mock()
mock_parse_args.reset_mock() mock_parse_args.reset_mock()
mock_parse_args = MagicMock(return_value=Namespace(**{CMD: prop}, **kwargs)) mock_parse_args.return_value = Namespace(**{CMD: prop}, **kwargs)
self.session._parser = MagicMock(parse_args=mock_parse_args)
self.mock_writer.write = MagicMock()
self.assertIsNone(await self.session._parse_command(msg)) self.assertIsNone(await self.session._parse_command(msg))
mock_parse_args.assert_called_once_with(msg.split(' ')) mock_parse_args.assert_called_once_with(msg.split(' '))
self.mock_writer.write.assert_not_called() self.mock_writer.write.assert_not_called()
@ -154,6 +152,21 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock__exec_property_and_respond.reset_mock() mock__exec_property_and_respond.reset_mock()
mock_parse_args.reset_mock() mock_parse_args.reset_mock()
bad_command = 'definitely not a function or property'
mock_parse_args.return_value = Namespace(**{CMD: bad_command}, **kwargs)
with patch.object(session, 'CommandError') as cmd_err_cls:
cmd_err_cls.return_value = exc = MagicMock()
self.assertIsNone(await self.session._parse_command(msg))
cmd_err_cls.assert_called_once_with(f"Unknown command object: {bad_command}")
mock_parse_args.assert_called_once_with(msg.split(' '))
mock__exec_method_and_respond.assert_not_called()
mock__exec_property_and_respond.assert_not_called()
self.mock_writer.write.assert_called_once_with(str(exc).encode())
mock__exec_property_and_respond.reset_mock()
mock_parse_args.reset_mock()
self.mock_writer.write.reset_mock()
mock_parse_args.side_effect = exc = ArgumentError(MagicMock(), "oops") mock_parse_args.side_effect = exc = ArgumentError(MagicMock(), "oops")
self.assertIsNone(await self.session._parse_command(msg)) self.assertIsNone(await self.session._parse_command(msg))
mock_parse_args.assert_called_once_with(msg.split(' ')) mock_parse_args.assert_called_once_with(msg.split(' '))

View File

@ -1,14 +1,18 @@
# Using `asyncio-taskpool` # Using `asyncio-taskpool`
## Contents
- [Contents](#contents)
- [Minimal example for `SimpleTaskPool`](#minimal-example-for-simpletaskpool)
- [Advanced example for `TaskPool`](#advanced-example-for-taskpool)
- [Control server example](#control-server-example)
## Minimal example for `SimpleTaskPool` ## Minimal example for `SimpleTaskPool`
With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run. With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run.
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all. The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
The following demo code enables full log output first for additional clarity. It is complete and should work as is. The following demo script enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python ```python
import logging import logging
@ -48,7 +52,9 @@ if __name__ == '__main__':
asyncio.run(main()) asyncio.run(main())
``` ```
### Output <details>
<summary>Output: (Click to expand)</summary>
``` ```
SimpleTaskPool-0 initialized SimpleTaskPool-0 initialized
Started SimpleTaskPool-0_Task-0 Started SimpleTaskPool-0_Task-0
@ -78,6 +84,7 @@ Ended SimpleTaskPool-0_Task-1
> did 4 > did 4
> did 4 > did 4
``` ```
</details>
## Advanced example for `TaskPool` ## Advanced example for `TaskPool`
@ -85,9 +92,7 @@ This time, we want to start tasks from _different_ coroutine functions **and** w
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them. As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
The following demo code enables full log output first for additional clarity. It is complete and should work as is. The following demo script enables full log output first for additional clarity. It is complete and should work as is.
### Code
```python ```python
import logging import logging
@ -144,10 +149,9 @@ if __name__ == '__main__':
asyncio.run(main()) asyncio.run(main())
``` ```
### Output <details>
Additional comments for the output are provided with `<---` next to the output lines. <summary>Output: (Click to expand)</summary>
(Keep in mind that the logger and `print` asynchronously write to `stdout`.)
``` ```
TaskPool-0 initialized TaskPool-0 initialized
Started TaskPool-0_Task-0 Started TaskPool-0_Task-0
@ -229,4 +233,37 @@ Ended TaskPool-0_Task-5
> Done. > Done.
``` ```
(Added comments with `<---` next to the output lines.)
Keep in mind that the logger and `print` asynchronously write to `stdout`, so the order of lines in your output may be slightly different.
</details>
## Control server example
One of the main features of `asyncio_taskpool` is the ability to control a task pool "from the outside" at runtime.
The [example_server.py](./example_server.py) script launches a couple of worker tasks within a `SimpleTaskPool` instance and then starts a `TCPControlServer` instance for that task pool. The server is configured to locally bind to port `9999` and is stopped automatically after the "work" is done.
To run the script:
```shell
python usage/example_server.py
```
You can then connect to the server via the command line interface:
```shell
python -m asyncio_taskpool.control tcp localhost 9999
```
The CLI starts a `TCPControlClient` that connects to our example server. Once the connection is established, it gives you an input prompt allowing you to issue commands to the task pool:
```
Connected to SimpleTaskPool-0
Type '-h' to get help and usage instructions for all available commands.
>
```
It may be useful to run the server script and the client interface in two separate terminal windows side by side. The server script is configured with a verbose logger and will react to any commands issued by the client with detailed log messages in the terminal.
---
© 2022 Daniil Fajnberg © 2022 Daniil Fajnberg

View File

@ -65,12 +65,12 @@ async def main() -> None:
# We just put some integers into our queue, since all our workers actually do, is print an item and sleep for a bit. # We just put some integers into our queue, since all our workers actually do, is print an item and sleep for a bit.
for item in range(100): for item in range(100):
q.put_nowait(item) q.put_nowait(item)
pool = SimpleTaskPool(worker, (q,)) # initializes the pool pool = SimpleTaskPool(worker, args=(q,)) # initializes the pool
await pool.start(3) # launches three worker tasks await pool.start(3) # launches three worker tasks
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever() control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever()
# We block until `.task_done()` has been called once by our workers for every item placed into the queue. # We block until `.task_done()` has been called once by our workers for every item placed into the queue.
await q.join() await q.join()
# Since we don't need any "work" done anymore, we can lock our control server by cancelling the task. # Since we don't need any "work" done anymore, we can get rid of our control server by cancelling the task.
control_server_task.cancel() control_server_task.cancel()
# Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left, # Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left,
# we can now safely cancel their tasks. # we can now safely cancel their tasks.