generated from daniil-berg/boilerplate-py
Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
3fb451a00e | |||
be03097bf4 |
@ -1,6 +1,6 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = asyncio-taskpool
|
name = asyncio-taskpool
|
||||||
version = 0.2.1
|
version = 0.3.1
|
||||||
author = Daniil Fajnberg
|
author = Daniil Fajnberg
|
||||||
author_email = mail@daniil.fajnberg.de
|
author_email = mail@daniil.fajnberg.de
|
||||||
description = Dynamically manage pools of asyncio tasks
|
description = Dynamically manage pools of asyncio tasks
|
||||||
|
@ -19,6 +19,8 @@ Classes of control clients for a simply interface to a task pool control server.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from asyncio.streams import StreamReader, StreamWriter, open_unix_connection
|
from asyncio.streams import StreamReader, StreamWriter, open_unix_connection
|
||||||
@ -34,10 +36,20 @@ class ControlClient(ABC):
|
|||||||
async def open_connection(self, **kwargs) -> ClientConnT:
|
async def open_connection(self, **kwargs) -> ClientConnT:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def client_info() -> dict:
|
||||||
|
return {'width': shutil.get_terminal_size().columns}
|
||||||
|
|
||||||
def __init__(self, **conn_kwargs) -> None:
|
def __init__(self, **conn_kwargs) -> None:
|
||||||
self._conn_kwargs = conn_kwargs
|
self._conn_kwargs = conn_kwargs
|
||||||
self._connected: bool = False
|
self._connected: bool = False
|
||||||
|
|
||||||
|
async def _server_handshake(self, reader: StreamReader, writer: StreamWriter) -> None:
|
||||||
|
self._connected = True
|
||||||
|
writer.write(json.dumps(self.client_info()).encode())
|
||||||
|
await writer.drain()
|
||||||
|
print("Connected to", (await reader.read(constants.MSG_BYTES)).decode())
|
||||||
|
|
||||||
async def _interact(self, reader: StreamReader, writer: StreamWriter) -> None:
|
async def _interact(self, reader: StreamReader, writer: StreamWriter) -> None:
|
||||||
try:
|
try:
|
||||||
msg = input("> ").strip().lower()
|
msg = input("> ").strip().lower()
|
||||||
@ -64,8 +76,7 @@ class ControlClient(ABC):
|
|||||||
if reader is None:
|
if reader is None:
|
||||||
print("Failed to connect.", file=sys.stderr)
|
print("Failed to connect.", file=sys.stderr)
|
||||||
return
|
return
|
||||||
self._connected = True
|
await self._server_handshake(reader, writer)
|
||||||
print("Connected to", (await reader.read(constants.MSG_BYTES)).decode())
|
|
||||||
while self._connected:
|
while self._connected:
|
||||||
await self._interact(reader, writer)
|
await self._interact(reader, writer)
|
||||||
print("Disconnected from control server.")
|
print("Disconnected from control server.")
|
||||||
|
@ -20,10 +20,13 @@ Constants used by more than one module in the package.
|
|||||||
|
|
||||||
|
|
||||||
PACKAGE_NAME = 'asyncio_taskpool'
|
PACKAGE_NAME = 'asyncio_taskpool'
|
||||||
MSG_BYTES = 1024
|
MSG_BYTES = 1024000
|
||||||
|
CMD = 'command'
|
||||||
|
CMD_NAME = 'name'
|
||||||
|
CMD_POOL_SIZE = 'pool-size'
|
||||||
|
CMD_NUM_RUNNING = 'num-running'
|
||||||
CMD_START = 'start'
|
CMD_START = 'start'
|
||||||
CMD_STOP = 'stop'
|
CMD_STOP = 'stop'
|
||||||
CMD_STOP_ALL = 'stop_all'
|
CMD_STOP_ALL = 'stop-all'
|
||||||
CMD_NUM_RUNNING = 'num_running'
|
CMD_FUNC_NAME = 'func-name'
|
||||||
CMD_FUNC = 'func'
|
|
||||||
CLIENT_EXIT = 'exit'
|
CLIENT_EXIT = 'exit'
|
||||||
|
@ -49,3 +49,11 @@ class PoolStillUnlocked(PoolException):
|
|||||||
|
|
||||||
class NotCoroutine(PoolException):
|
class NotCoroutine(PoolException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ServerException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HelpRequested(ServerException):
|
||||||
|
pass
|
||||||
|
@ -19,9 +19,11 @@ Miscellaneous helper functions.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import re
|
||||||
from asyncio.coroutines import iscoroutinefunction
|
from asyncio.coroutines import iscoroutinefunction
|
||||||
from asyncio.queues import Queue
|
from asyncio.queues import Queue
|
||||||
from typing import Any, Optional
|
from inspect import getdoc
|
||||||
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
from .types import T, AnyCallableT, ArgsT, KwArgsT
|
from .types import T, AnyCallableT, ArgsT, KwArgsT
|
||||||
|
|
||||||
@ -48,3 +50,21 @@ def star_function(function: AnyCallableT, arg: Any, arg_stars: int = 0) -> T:
|
|||||||
|
|
||||||
async def join_queue(q: Queue) -> None:
|
async def join_queue(q: Queue) -> None:
|
||||||
await q.join()
|
await q.join()
|
||||||
|
|
||||||
|
|
||||||
|
def tasks_str(num: int) -> str:
|
||||||
|
return "tasks" if num != 1 else "task"
|
||||||
|
|
||||||
|
|
||||||
|
def get_first_doc_line(obj: object) -> str:
|
||||||
|
return getdoc(obj).strip().split("\n", 1)[0]
|
||||||
|
|
||||||
|
|
||||||
|
async def return_or_exception(_function_to_execute: AnyCallableT, *args, **kwargs) -> Union[T, Exception]:
|
||||||
|
try:
|
||||||
|
if iscoroutinefunction(_function_to_execute):
|
||||||
|
return await _function_to_execute(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
return _function_to_execute(*args, **kwargs)
|
||||||
|
except Exception as e:
|
||||||
|
return e
|
||||||
|
@ -78,6 +78,7 @@ class BaseTaskPool:
|
|||||||
log.debug("%s initialized", str(self))
|
log.debug("%s initialized", str(self))
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
"""Returns the name of the task pool."""
|
||||||
return f'{self.__class__.__name__}-{self._name or self._idx}'
|
return f'{self.__class__.__name__}-{self._name or self._idx}'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -26,126 +26,126 @@ from asyncio.exceptions import CancelledError
|
|||||||
from asyncio.streams import StreamReader, StreamWriter, start_unix_server
|
from asyncio.streams import StreamReader, StreamWriter, start_unix_server
|
||||||
from asyncio.tasks import Task, create_task
|
from asyncio.tasks import Task, create_task
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Tuple, Union, Optional
|
from typing import Optional, Union
|
||||||
|
|
||||||
from . import constants
|
|
||||||
from .pool import SimpleTaskPool
|
|
||||||
from .client import ControlClient, UnixControlClient
|
from .client import ControlClient, UnixControlClient
|
||||||
|
from .pool import TaskPool, SimpleTaskPool
|
||||||
|
from .session import ControlSession
|
||||||
|
from .types import ConnectedCallbackT
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def tasks_str(num: int) -> str:
|
|
||||||
return "tasks" if num != 1 else "task"
|
|
||||||
|
|
||||||
|
|
||||||
def get_cmd_arg(msg: str) -> Union[Tuple[str, Optional[int]], Tuple[None, None]]:
|
|
||||||
cmd = msg.strip().split(' ', 1)
|
|
||||||
if len(cmd) > 1:
|
|
||||||
try:
|
|
||||||
return cmd[0], int(cmd[1])
|
|
||||||
except ValueError:
|
|
||||||
return None, None
|
|
||||||
return cmd[0], None
|
|
||||||
|
|
||||||
|
|
||||||
class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool
|
class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool
|
||||||
client_class = ControlClient
|
"""
|
||||||
|
Abstract base class for a task pool control server.
|
||||||
|
|
||||||
|
This class acts as a wrapper around an async server instance and initializes a `ControlSession` upon a client
|
||||||
|
connecting to it. The entire interface is defined within that session class.
|
||||||
|
"""
|
||||||
|
_client_class = ControlClient
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@property
|
||||||
|
def client_class_name(cls) -> str:
|
||||||
|
"""Returns the name of the control client class matching the server class."""
|
||||||
|
return cls._client_class.__name__
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def get_server_instance(self, client_connected_cb, **kwargs) -> AbstractServer:
|
async def _get_server_instance(self, client_connected_cb: ConnectedCallbackT, **kwargs) -> AbstractServer:
|
||||||
|
"""
|
||||||
|
Initializes, starts, and returns an async server instance (Unix or TCP type).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client_connected_cb:
|
||||||
|
The callback for when a client connects to the server (as per `asyncio.start_server` or
|
||||||
|
`asyncio.start_unix_server`); will always be the internal `_client_connected_cb` method.
|
||||||
|
**kwargs (optional):
|
||||||
|
Keyword arguments to pass into the function that starts the server.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The running server object (a type of `asyncio.Server`).
|
||||||
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def final_callback(self) -> None:
|
def _final_callback(self) -> None:
|
||||||
|
"""The method to run after the server's `serve_forever` methods ends for whatever reason."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
|
||||||
self._pool: SimpleTaskPool = pool
|
"""
|
||||||
|
Initializes by merely saving the internal attributes, but without starting the server yet.
|
||||||
|
The task pool must be passed here and can not be set/changed afterwards. This means a control server is always
|
||||||
|
tied to one specific task pool.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pool:
|
||||||
|
An instance of a `BaseTaskPool` subclass to tie the server to.
|
||||||
|
**server_kwargs (optional):
|
||||||
|
Keyword arguments that will be passed into the function that starts the server.
|
||||||
|
"""
|
||||||
|
self._pool: Union[TaskPool, SimpleTaskPool] = pool
|
||||||
self._server_kwargs = server_kwargs
|
self._server_kwargs = server_kwargs
|
||||||
self._server: Optional[AbstractServer] = None
|
self._server: Optional[AbstractServer] = None
|
||||||
|
|
||||||
async def _start_tasks(self, writer: StreamWriter, num: int = None) -> None:
|
@property
|
||||||
if num is None:
|
def pool(self) -> Union[TaskPool, SimpleTaskPool]:
|
||||||
num = 1
|
"""Read-only property for accessing the task pool instance controlled by the server."""
|
||||||
log.debug("%s requests starting %s %s", self.client_class.__name__, num, tasks_str(num))
|
return self._pool
|
||||||
writer.write(str(await self._pool.start(num)).encode())
|
|
||||||
|
|
||||||
def _stop_tasks(self, writer: StreamWriter, num: int = None) -> None:
|
def is_serving(self) -> bool:
|
||||||
if num is None:
|
"""Wrapper around the `asyncio.Server.is_serving` method."""
|
||||||
num = 1
|
return self._server.is_serving()
|
||||||
log.debug("%s requests stopping %s %s", self.client_class.__name__, num, tasks_str(num))
|
|
||||||
# the requested number may be greater than the total number of running tasks
|
|
||||||
writer.write(str(self._pool.stop(num)).encode())
|
|
||||||
|
|
||||||
def _stop_all_tasks(self, writer: StreamWriter) -> None:
|
|
||||||
log.debug("%s requests stopping all tasks", self.client_class.__name__)
|
|
||||||
writer.write(str(self._pool.stop_all()).encode())
|
|
||||||
|
|
||||||
def _pool_size(self, writer: StreamWriter) -> None:
|
|
||||||
log.debug("%s requests number of running tasks", self.client_class.__name__)
|
|
||||||
writer.write(str(self._pool.num_running).encode())
|
|
||||||
|
|
||||||
def _pool_func(self, writer: StreamWriter) -> None:
|
|
||||||
log.debug("%s requests pool function", self.client_class.__name__)
|
|
||||||
writer.write(self._pool.func_name.encode())
|
|
||||||
|
|
||||||
async def _listen(self, reader: StreamReader, writer: StreamWriter) -> None:
|
|
||||||
while self._server.is_serving():
|
|
||||||
msg = (await reader.read(constants.MSG_BYTES)).decode().strip()
|
|
||||||
if not msg:
|
|
||||||
log.debug("%s disconnected", self.client_class.__name__)
|
|
||||||
break
|
|
||||||
cmd, arg = get_cmd_arg(msg)
|
|
||||||
if cmd == constants.CMD_START:
|
|
||||||
await self._start_tasks(writer, arg)
|
|
||||||
elif cmd == constants.CMD_STOP:
|
|
||||||
self._stop_tasks(writer, arg)
|
|
||||||
elif cmd == constants.CMD_STOP_ALL:
|
|
||||||
self._stop_all_tasks(writer)
|
|
||||||
elif cmd == constants.CMD_NUM_RUNNING:
|
|
||||||
self._pool_size(writer)
|
|
||||||
elif cmd == constants.CMD_FUNC:
|
|
||||||
self._pool_func(writer)
|
|
||||||
else:
|
|
||||||
log.debug("%s sent invalid command: %s", self.client_class.__name__, msg)
|
|
||||||
writer.write(b"Invalid command!")
|
|
||||||
await writer.drain()
|
|
||||||
|
|
||||||
async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None:
|
async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None:
|
||||||
log.debug("%s connected", self.client_class.__name__)
|
"""
|
||||||
writer.write(str(self._pool).encode())
|
The universal client callback that will be passed into the `_get_server_instance` method.
|
||||||
await writer.drain()
|
Instantiates a control session, performs the client handshake, and enters the session's `listen` loop.
|
||||||
await self._listen(reader, writer)
|
"""
|
||||||
|
session = ControlSession(self, reader, writer)
|
||||||
|
await session.client_handshake()
|
||||||
|
await session.listen()
|
||||||
|
|
||||||
async def _serve_forever(self) -> None:
|
async def _serve_forever(self) -> None:
|
||||||
|
"""
|
||||||
|
To be run as an `asyncio.Task` by the following method.
|
||||||
|
Serves as a wrapper around the the `asyncio.Server.serve_forever` method that ensures that the `_final_callback`
|
||||||
|
method is called, when the former method ends for whatever reason.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
async with self._server:
|
async with self._server:
|
||||||
await self._server.serve_forever()
|
await self._server.serve_forever()
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
log.debug("%s stopped", self.__class__.__name__)
|
log.debug("%s stopped", self.__class__.__name__)
|
||||||
finally:
|
finally:
|
||||||
self.final_callback()
|
self._final_callback()
|
||||||
|
|
||||||
async def serve_forever(self) -> Task:
|
async def serve_forever(self) -> Task:
|
||||||
|
"""
|
||||||
|
This method actually starts the server and begins listening to client connections on the specified interface.
|
||||||
|
It should never block because the serving will be performed in a separate task.
|
||||||
|
"""
|
||||||
log.debug("Starting %s...", self.__class__.__name__)
|
log.debug("Starting %s...", self.__class__.__name__)
|
||||||
self._server = await self.get_server_instance(self._client_connected_cb, **self._server_kwargs)
|
self._server = await self._get_server_instance(self._client_connected_cb, **self._server_kwargs)
|
||||||
return create_task(self._serve_forever())
|
return create_task(self._serve_forever())
|
||||||
|
|
||||||
|
|
||||||
class UnixControlServer(ControlServer):
|
class UnixControlServer(ControlServer):
|
||||||
client_class = UnixControlClient
|
"""Task pool control server class that exposes a unix socket for control clients to connect to."""
|
||||||
|
_client_class = UnixControlClient
|
||||||
|
|
||||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
||||||
self._socket_path = Path(server_kwargs.pop('path'))
|
self._socket_path = Path(server_kwargs.pop('path'))
|
||||||
super().__init__(pool, **server_kwargs)
|
super().__init__(pool, **server_kwargs)
|
||||||
|
|
||||||
async def get_server_instance(self, client_connected_cb, **kwargs) -> AbstractServer:
|
async def _get_server_instance(self, client_connected_cb: ConnectedCallbackT, **kwargs) -> AbstractServer:
|
||||||
srv = await start_unix_server(client_connected_cb, self._socket_path, **kwargs)
|
server = await start_unix_server(client_connected_cb, self._socket_path, **kwargs)
|
||||||
log.debug("Opened socket '%s'", str(self._socket_path))
|
log.debug("Opened socket '%s'", str(self._socket_path))
|
||||||
return srv
|
return server
|
||||||
|
|
||||||
def final_callback(self) -> None:
|
def _final_callback(self) -> None:
|
||||||
|
"""Removes the unix socket on which the server was listening."""
|
||||||
self._socket_path.unlink()
|
self._socket_path.unlink()
|
||||||
log.debug("Removed socket '%s'", str(self._socket_path))
|
log.debug("Removed socket '%s'", str(self._socket_path))
|
||||||
|
218
src/asyncio_taskpool/session.py
Normal file
218
src/asyncio_taskpool/session.py
Normal file
@ -0,0 +1,218 @@
|
|||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from argparse import ArgumentError, ArgumentParser, HelpFormatter, Namespace
|
||||||
|
from asyncio.streams import StreamReader, StreamWriter
|
||||||
|
from typing import Callable, Optional, Type, Union, TYPE_CHECKING
|
||||||
|
|
||||||
|
from . import constants
|
||||||
|
from .exceptions import HelpRequested
|
||||||
|
from .helpers import get_first_doc_line, return_or_exception, tasks_str
|
||||||
|
from .pool import TaskPool, SimpleTaskPool
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .server import ControlServer
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
NUM = 'num'
|
||||||
|
WIDTH = 'width'
|
||||||
|
|
||||||
|
|
||||||
|
class CommandParser(ArgumentParser):
|
||||||
|
@staticmethod
|
||||||
|
def help_formatter_factory(terminal_width: int) -> Type[HelpFormatter]:
|
||||||
|
class ClientHelpFormatter(HelpFormatter):
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
kwargs[WIDTH] = terminal_width
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
return ClientHelpFormatter
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
parent: CommandParser = kwargs.pop('parent', None)
|
||||||
|
self._stream_writer: StreamWriter = parent.stream_writer if parent else kwargs.pop('writer')
|
||||||
|
self._terminal_width: int = parent.terminal_width if parent else kwargs.pop(WIDTH)
|
||||||
|
kwargs.setdefault('formatter_class', self.help_formatter_factory(self._terminal_width))
|
||||||
|
kwargs.setdefault('exit_on_error', False)
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_writer(self) -> StreamWriter:
|
||||||
|
return self._stream_writer
|
||||||
|
|
||||||
|
@property
|
||||||
|
def terminal_width(self) -> int:
|
||||||
|
return self._terminal_width
|
||||||
|
|
||||||
|
def _print_message(self, message: str, *args, **kwargs) -> None:
|
||||||
|
if message:
|
||||||
|
self.stream_writer.write(message.encode())
|
||||||
|
|
||||||
|
def exit(self, status: int = 0, message: str = None) -> None:
|
||||||
|
if message:
|
||||||
|
self._print_message(message)
|
||||||
|
|
||||||
|
def print_help(self, file=None) -> None:
|
||||||
|
super().print_help(file)
|
||||||
|
raise HelpRequested
|
||||||
|
|
||||||
|
|
||||||
|
class ControlSession:
|
||||||
|
def __init__(self, server: 'ControlServer', reader: StreamReader, writer: StreamWriter) -> None:
|
||||||
|
self._control_server: 'ControlServer' = server
|
||||||
|
self._pool: Union[TaskPool, SimpleTaskPool] = server.pool
|
||||||
|
self._client_class_name = server.client_class_name
|
||||||
|
self._reader: StreamReader = reader
|
||||||
|
self._writer: StreamWriter = writer
|
||||||
|
self._parser: Optional[CommandParser] = None
|
||||||
|
|
||||||
|
def _add_base_commands(self):
|
||||||
|
subparsers = self._parser.add_subparsers(title="Commands", dest=constants.CMD)
|
||||||
|
subparsers.add_parser(
|
||||||
|
constants.CMD_NAME,
|
||||||
|
prog=constants.CMD_NAME,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.__str__),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
subparser_pool_size = subparsers.add_parser(
|
||||||
|
constants.CMD_POOL_SIZE,
|
||||||
|
prog=constants.CMD_POOL_SIZE,
|
||||||
|
help="Get/set the maximum number of tasks in the pool",
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
subparser_pool_size.add_argument(
|
||||||
|
NUM,
|
||||||
|
nargs='?',
|
||||||
|
help=f"If passed a number: {get_first_doc_line(self._pool.__class__.pool_size.fset)} "
|
||||||
|
f"If omitted: {get_first_doc_line(self._pool.__class__.pool_size.fget)}"
|
||||||
|
)
|
||||||
|
subparsers.add_parser(
|
||||||
|
constants.CMD_NUM_RUNNING,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.num_running.fget),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
return subparsers
|
||||||
|
|
||||||
|
def _add_simple_commands(self):
|
||||||
|
subparsers = self._add_base_commands()
|
||||||
|
subparser = subparsers.add_parser(
|
||||||
|
constants.CMD_START,
|
||||||
|
prog=constants.CMD_START,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.start),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
subparser.add_argument(
|
||||||
|
NUM,
|
||||||
|
nargs='?',
|
||||||
|
type=int,
|
||||||
|
default=1,
|
||||||
|
help="Number of tasks to start. Defaults to 1."
|
||||||
|
)
|
||||||
|
subparser = subparsers.add_parser(
|
||||||
|
constants.CMD_STOP,
|
||||||
|
prog=constants.CMD_STOP,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.stop),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
subparser.add_argument(
|
||||||
|
NUM,
|
||||||
|
nargs='?',
|
||||||
|
type=int,
|
||||||
|
default=1,
|
||||||
|
help="Number of tasks to stop. Defaults to 1."
|
||||||
|
)
|
||||||
|
subparsers.add_parser(
|
||||||
|
constants.CMD_STOP_ALL,
|
||||||
|
prog=constants.CMD_STOP_ALL,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.stop_all),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
subparsers.add_parser(
|
||||||
|
constants.CMD_FUNC_NAME,
|
||||||
|
prog=constants.CMD_FUNC_NAME,
|
||||||
|
help=get_first_doc_line(self._pool.__class__.func_name.fget),
|
||||||
|
parent=self._parser,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _init_parser(self, client_terminal_width: int) -> None:
|
||||||
|
self._parser = CommandParser(prog='', writer=self._writer, width=client_terminal_width)
|
||||||
|
if isinstance(self._pool, TaskPool):
|
||||||
|
pass # TODO
|
||||||
|
elif isinstance(self._pool, SimpleTaskPool):
|
||||||
|
self._add_simple_commands()
|
||||||
|
|
||||||
|
async def client_handshake(self) -> None:
|
||||||
|
client_info = json.loads((await self._reader.read(constants.MSG_BYTES)).decode().strip())
|
||||||
|
log.debug("%s connected", self._client_class_name)
|
||||||
|
self._init_parser(client_info[WIDTH])
|
||||||
|
self._writer.write(str(self._pool).encode())
|
||||||
|
await self._writer.drain()
|
||||||
|
|
||||||
|
async def _write_function_output(self, func: Callable, *args, **kwargs) -> None:
|
||||||
|
output = await return_or_exception(func, *args, **kwargs)
|
||||||
|
self._writer.write(b"ok" if output is None else str(output).encode())
|
||||||
|
|
||||||
|
async def _cmd_name(self, **_kwargs) -> None:
|
||||||
|
log.debug("%s requests task pool name", self._client_class_name)
|
||||||
|
await self._write_function_output(self._pool.__class__.__str__, self._pool)
|
||||||
|
|
||||||
|
async def _cmd_pool_size(self, **kwargs) -> None:
|
||||||
|
num = kwargs.get(NUM)
|
||||||
|
if num is None:
|
||||||
|
log.debug("%s requests pool size", self._client_class_name)
|
||||||
|
await self._write_function_output(self._pool.__class__.pool_size.fget, self._pool)
|
||||||
|
else:
|
||||||
|
log.debug("%s requests setting pool size to %s", self._client_class_name, num)
|
||||||
|
await self._write_function_output(self._pool.__class__.pool_size.fset, self._pool, int(num))
|
||||||
|
|
||||||
|
async def _cmd_num_running(self, **_kwargs) -> None:
|
||||||
|
log.debug("%s requests number of running tasks", self._client_class_name)
|
||||||
|
await self._write_function_output(self._pool.__class__.num_running.fget, self._pool)
|
||||||
|
|
||||||
|
async def _cmd_start(self, **kwargs) -> None:
|
||||||
|
num = kwargs[NUM]
|
||||||
|
log.debug("%s requests starting %s %s", self._client_class_name, num, tasks_str(num))
|
||||||
|
await self._write_function_output(self._pool.start, num)
|
||||||
|
|
||||||
|
async def _cmd_stop(self, **kwargs) -> None:
|
||||||
|
num = kwargs[NUM]
|
||||||
|
log.debug("%s requests stopping %s %s", self._client_class_name, num, tasks_str(num))
|
||||||
|
await self._write_function_output(self._pool.stop, num)
|
||||||
|
|
||||||
|
async def _cmd_stop_all(self, **_kwargs) -> None:
|
||||||
|
log.debug("%s requests stopping all tasks", self._client_class_name)
|
||||||
|
await self._write_function_output(self._pool.stop_all)
|
||||||
|
|
||||||
|
async def _cmd_func_name(self, **_kwargs) -> None:
|
||||||
|
log.debug("%s requests pool function name", self._client_class_name)
|
||||||
|
await self._write_function_output(self._pool.__class__.func_name.fget, self._pool)
|
||||||
|
|
||||||
|
async def _execute_command(self, args: Namespace) -> None:
|
||||||
|
args = vars(args)
|
||||||
|
cmd: str = args.pop(constants.CMD, None)
|
||||||
|
if cmd is not None:
|
||||||
|
method = getattr(self, f'_cmd_{cmd.replace("-", "_")}')
|
||||||
|
await method(**args)
|
||||||
|
|
||||||
|
async def _parse_command(self, msg: str) -> None:
|
||||||
|
try:
|
||||||
|
args, argv = self._parser.parse_known_args(msg.split(' '))
|
||||||
|
except ArgumentError as e:
|
||||||
|
self._writer.write(str(e).encode())
|
||||||
|
return
|
||||||
|
except HelpRequested:
|
||||||
|
return
|
||||||
|
if argv:
|
||||||
|
log.debug("%s sent unknown arguments: %s", self._client_class_name, msg)
|
||||||
|
self._writer.write(b"Invalid command!")
|
||||||
|
return
|
||||||
|
await self._execute_command(args)
|
||||||
|
|
||||||
|
async def listen(self) -> None:
|
||||||
|
while self._control_server.is_serving():
|
||||||
|
msg = (await self._reader.read(constants.MSG_BYTES)).decode().strip()
|
||||||
|
if not msg:
|
||||||
|
log.debug("%s disconnected", self._client_class_name)
|
||||||
|
break
|
||||||
|
await self._parse_command(msg)
|
||||||
|
await self._writer.drain()
|
@ -28,10 +28,11 @@ T = TypeVar('T')
|
|||||||
ArgsT = Iterable[Any]
|
ArgsT = Iterable[Any]
|
||||||
KwArgsT = Mapping[str, Any]
|
KwArgsT = Mapping[str, Any]
|
||||||
|
|
||||||
AnyCallableT = Callable[[...], Union[Awaitable[T], T]]
|
AnyCallableT = Callable[[...], Union[T, Awaitable[T]]]
|
||||||
CoroutineFunc = Callable[[...], Awaitable[Any]]
|
CoroutineFunc = Callable[[...], Awaitable[Any]]
|
||||||
|
|
||||||
EndCallbackT = Callable
|
EndCallbackT = Callable
|
||||||
CancelCallbackT = Callable
|
CancelCallbackT = Callable
|
||||||
|
|
||||||
|
ConnectedCallbackT = Callable[[StreamReader, StreamWriter], Awaitable[None]]
|
||||||
ClientConnT = Union[Tuple[StreamReader, StreamWriter], Tuple[None, None]]
|
ClientConnT = Union[Tuple[StreamReader, StreamWriter], Tuple[None, None]]
|
||||||
|
@ -86,3 +86,11 @@ class HelpersTestCase(IsolatedAsyncioTestCase):
|
|||||||
mock_queue = MagicMock(join=mock_join)
|
mock_queue = MagicMock(join=mock_join)
|
||||||
self.assertIsNone(await helpers.join_queue(mock_queue))
|
self.assertIsNone(await helpers.join_queue(mock_queue))
|
||||||
mock_join.assert_awaited_once_with()
|
mock_join.assert_awaited_once_with()
|
||||||
|
|
||||||
|
def test_task_str(self):
|
||||||
|
self.assertEqual("task", helpers.tasks_str(1))
|
||||||
|
self.assertEqual("tasks", helpers.tasks_str(0))
|
||||||
|
self.assertEqual("tasks", helpers.tasks_str(-1))
|
||||||
|
self.assertEqual("tasks", helpers.tasks_str(2))
|
||||||
|
self.assertEqual("tasks", helpers.tasks_str(-10))
|
||||||
|
self.assertEqual("tasks", helpers.tasks_str(42))
|
||||||
|
143
tests/test_server.py
Normal file
143
tests/test_server.py
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest import IsolatedAsyncioTestCase
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
from asyncio_taskpool import server
|
||||||
|
from asyncio_taskpool.client import ControlClient, UnixControlClient
|
||||||
|
|
||||||
|
|
||||||
|
FOO, BAR = 'foo', 'bar'
|
||||||
|
|
||||||
|
|
||||||
|
class ControlServerTestCase(IsolatedAsyncioTestCase):
|
||||||
|
log_lvl: int
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls) -> None:
|
||||||
|
cls.log_lvl = server.log.level
|
||||||
|
server.log.setLevel(999)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls) -> None:
|
||||||
|
server.log.setLevel(cls.log_lvl)
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.abstract_patcher = patch('asyncio_taskpool.server.ControlServer.__abstractmethods__', set())
|
||||||
|
self.mock_abstract_methods = self.abstract_patcher.start()
|
||||||
|
self.mock_pool = MagicMock()
|
||||||
|
self.kwargs = {FOO: 123, BAR: 456}
|
||||||
|
self.server = server.ControlServer(pool=self.mock_pool, **self.kwargs)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.abstract_patcher.stop()
|
||||||
|
|
||||||
|
def test_client_class_name(self):
|
||||||
|
self.assertEqual(ControlClient.__name__, server.ControlServer.client_class_name)
|
||||||
|
|
||||||
|
async def test_abstract(self):
|
||||||
|
with self.assertRaises(NotImplementedError):
|
||||||
|
args = [AsyncMock()]
|
||||||
|
await self.server._get_server_instance(*args)
|
||||||
|
with self.assertRaises(NotImplementedError):
|
||||||
|
self.server._final_callback()
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
self.assertEqual(self.mock_pool, self.server._pool)
|
||||||
|
self.assertEqual(self.kwargs, self.server._server_kwargs)
|
||||||
|
self.assertIsNone(self.server._server)
|
||||||
|
|
||||||
|
def test_pool(self):
|
||||||
|
self.assertEqual(self.mock_pool, self.server.pool)
|
||||||
|
|
||||||
|
def test_is_serving(self):
|
||||||
|
self.server._server = MagicMock(is_serving=MagicMock(return_value=FOO + BAR))
|
||||||
|
self.assertEqual(FOO + BAR, self.server.is_serving())
|
||||||
|
|
||||||
|
@patch.object(server, 'ControlSession')
|
||||||
|
async def test__client_connected_cb(self, mock_client_session_cls: MagicMock):
|
||||||
|
mock_client_handshake, mock_listen = AsyncMock(), AsyncMock()
|
||||||
|
mock_client_session_cls.return_value = MagicMock(client_handshake=mock_client_handshake, listen=mock_listen)
|
||||||
|
mock_reader, mock_writer = MagicMock(), MagicMock()
|
||||||
|
self.assertIsNone(await self.server._client_connected_cb(mock_reader, mock_writer))
|
||||||
|
mock_client_session_cls.assert_called_once_with(self.server, mock_reader, mock_writer)
|
||||||
|
mock_client_handshake.assert_awaited_once_with()
|
||||||
|
mock_listen.assert_awaited_once_with()
|
||||||
|
|
||||||
|
@patch.object(server.ControlServer, '_final_callback')
|
||||||
|
async def test__serve_forever(self, mock__final_callback: MagicMock):
|
||||||
|
mock_aenter, mock_serve_forever = AsyncMock(), AsyncMock(side_effect=asyncio.CancelledError)
|
||||||
|
self.server._server = MagicMock(__aenter__=mock_aenter, serve_forever=mock_serve_forever)
|
||||||
|
with self.assertLogs(server.log, logging.DEBUG):
|
||||||
|
self.assertIsNone(await self.server._serve_forever())
|
||||||
|
mock_aenter.assert_awaited_once_with()
|
||||||
|
mock_serve_forever.assert_awaited_once_with()
|
||||||
|
mock__final_callback.assert_called_once_with()
|
||||||
|
|
||||||
|
mock_aenter.reset_mock()
|
||||||
|
mock_serve_forever.reset_mock(side_effect=True)
|
||||||
|
mock__final_callback.reset_mock()
|
||||||
|
|
||||||
|
self.assertIsNone(await self.server._serve_forever())
|
||||||
|
mock_aenter.assert_awaited_once_with()
|
||||||
|
mock_serve_forever.assert_awaited_once_with()
|
||||||
|
mock__final_callback.assert_called_once_with()
|
||||||
|
|
||||||
|
@patch.object(server, 'create_task')
|
||||||
|
@patch.object(server.ControlServer, '_serve_forever', new_callable=MagicMock())
|
||||||
|
@patch.object(server.ControlServer, '_get_server_instance')
|
||||||
|
async def test_serve_forever(self, mock__get_server_instance: AsyncMock, mock__serve_forever: MagicMock,
|
||||||
|
mock_create_task: MagicMock):
|
||||||
|
mock__serve_forever.return_value = mock_awaitable = 'some_coroutine'
|
||||||
|
mock_create_task.return_value = expected_output = 12345
|
||||||
|
output = await self.server.serve_forever()
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
mock__get_server_instance.assert_awaited_once_with(self.server._client_connected_cb, **self.kwargs)
|
||||||
|
mock__serve_forever.assert_called_once_with()
|
||||||
|
mock_create_task.assert_called_once_with(mock_awaitable)
|
||||||
|
|
||||||
|
|
||||||
|
class UnixControlServerTestCase(IsolatedAsyncioTestCase):
|
||||||
|
log_lvl: int
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls) -> None:
|
||||||
|
cls.log_lvl = server.log.level
|
||||||
|
server.log.setLevel(999)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls) -> None:
|
||||||
|
server.log.setLevel(cls.log_lvl)
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.base_init_patcher = patch.object(server.ControlServer, '__init__')
|
||||||
|
self.mock_base_init = self.base_init_patcher.start()
|
||||||
|
self.mock_pool = MagicMock()
|
||||||
|
self.path = '/tmp/asyncio_taskpool'
|
||||||
|
self.kwargs = {FOO: 123, BAR: 456}
|
||||||
|
self.server = server.UnixControlServer(pool=self.mock_pool, path=self.path, **self.kwargs)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.base_init_patcher.stop()
|
||||||
|
|
||||||
|
def test__client_class(self):
|
||||||
|
self.assertEqual(UnixControlClient, self.server._client_class)
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
self.assertEqual(Path(self.path), self.server._socket_path)
|
||||||
|
self.mock_base_init.assert_called_once_with(self.mock_pool, **self.kwargs)
|
||||||
|
|
||||||
|
@patch.object(server, 'start_unix_server')
|
||||||
|
async def test__get_server_instance(self, mock_start_unix_server: AsyncMock):
|
||||||
|
mock_start_unix_server.return_value = expected_output = 'totally_a_server'
|
||||||
|
mock_callback, mock_kwargs = MagicMock(), {'a': 1, 'b': 2}
|
||||||
|
args = [mock_callback]
|
||||||
|
output = await self.server._get_server_instance(*args, **mock_kwargs)
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
mock_start_unix_server.assert_called_once_with(mock_callback, Path(self.path), **mock_kwargs)
|
||||||
|
|
||||||
|
def test__final_callback(self):
|
||||||
|
self.server._socket_path = MagicMock()
|
||||||
|
self.assertIsNone(self.server._final_callback())
|
||||||
|
self.server._socket_path.unlink.assert_called_once_with()
|
Reference in New Issue
Block a user