Compare commits

...

2 Commits

Author SHA1 Message Date
3fb451a00e docstrings and full test coverage for server module; small adjustments 2022-02-13 16:17:50 +01:00
be03097bf4 massive overhaul of the control server interface;
making use of `ArgumentParser` for client commands;
new `ControlSession` object instantiated upon connection
2022-02-12 22:51:52 +01:00
11 changed files with 500 additions and 87 deletions

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.2.1 version = 0.3.1
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -19,6 +19,8 @@ Classes of control clients for a simply interface to a task pool control server.
""" """
import json
import shutil
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from asyncio.streams import StreamReader, StreamWriter, open_unix_connection from asyncio.streams import StreamReader, StreamWriter, open_unix_connection
@ -34,10 +36,20 @@ class ControlClient(ABC):
async def open_connection(self, **kwargs) -> ClientConnT: async def open_connection(self, **kwargs) -> ClientConnT:
raise NotImplementedError raise NotImplementedError
@staticmethod
def client_info() -> dict:
return {'width': shutil.get_terminal_size().columns}
def __init__(self, **conn_kwargs) -> None: def __init__(self, **conn_kwargs) -> None:
self._conn_kwargs = conn_kwargs self._conn_kwargs = conn_kwargs
self._connected: bool = False self._connected: bool = False
async def _server_handshake(self, reader: StreamReader, writer: StreamWriter) -> None:
self._connected = True
writer.write(json.dumps(self.client_info()).encode())
await writer.drain()
print("Connected to", (await reader.read(constants.MSG_BYTES)).decode())
async def _interact(self, reader: StreamReader, writer: StreamWriter) -> None: async def _interact(self, reader: StreamReader, writer: StreamWriter) -> None:
try: try:
msg = input("> ").strip().lower() msg = input("> ").strip().lower()
@ -64,8 +76,7 @@ class ControlClient(ABC):
if reader is None: if reader is None:
print("Failed to connect.", file=sys.stderr) print("Failed to connect.", file=sys.stderr)
return return
self._connected = True await self._server_handshake(reader, writer)
print("Connected to", (await reader.read(constants.MSG_BYTES)).decode())
while self._connected: while self._connected:
await self._interact(reader, writer) await self._interact(reader, writer)
print("Disconnected from control server.") print("Disconnected from control server.")

View File

@ -20,10 +20,13 @@ Constants used by more than one module in the package.
PACKAGE_NAME = 'asyncio_taskpool' PACKAGE_NAME = 'asyncio_taskpool'
MSG_BYTES = 1024 MSG_BYTES = 1024000
CMD = 'command'
CMD_NAME = 'name'
CMD_POOL_SIZE = 'pool-size'
CMD_NUM_RUNNING = 'num-running'
CMD_START = 'start' CMD_START = 'start'
CMD_STOP = 'stop' CMD_STOP = 'stop'
CMD_STOP_ALL = 'stop_all' CMD_STOP_ALL = 'stop-all'
CMD_NUM_RUNNING = 'num_running' CMD_FUNC_NAME = 'func-name'
CMD_FUNC = 'func'
CLIENT_EXIT = 'exit' CLIENT_EXIT = 'exit'

View File

@ -49,3 +49,11 @@ class PoolStillUnlocked(PoolException):
class NotCoroutine(PoolException): class NotCoroutine(PoolException):
pass pass
class ServerException(Exception):
pass
class HelpRequested(ServerException):
pass

View File

@ -19,9 +19,11 @@ Miscellaneous helper functions.
""" """
import re
from asyncio.coroutines import iscoroutinefunction from asyncio.coroutines import iscoroutinefunction
from asyncio.queues import Queue from asyncio.queues import Queue
from typing import Any, Optional from inspect import getdoc
from typing import Any, Optional, Union
from .types import T, AnyCallableT, ArgsT, KwArgsT from .types import T, AnyCallableT, ArgsT, KwArgsT
@ -48,3 +50,21 @@ def star_function(function: AnyCallableT, arg: Any, arg_stars: int = 0) -> T:
async def join_queue(q: Queue) -> None: async def join_queue(q: Queue) -> None:
await q.join() await q.join()
def tasks_str(num: int) -> str:
return "tasks" if num != 1 else "task"
def get_first_doc_line(obj: object) -> str:
return getdoc(obj).strip().split("\n", 1)[0]
async def return_or_exception(_function_to_execute: AnyCallableT, *args, **kwargs) -> Union[T, Exception]:
try:
if iscoroutinefunction(_function_to_execute):
return await _function_to_execute(*args, **kwargs)
else:
return _function_to_execute(*args, **kwargs)
except Exception as e:
return e

View File

@ -78,6 +78,7 @@ class BaseTaskPool:
log.debug("%s initialized", str(self)) log.debug("%s initialized", str(self))
def __str__(self) -> str: def __str__(self) -> str:
"""Returns the name of the task pool."""
return f'{self.__class__.__name__}-{self._name or self._idx}' return f'{self.__class__.__name__}-{self._name or self._idx}'
@property @property

View File

@ -26,126 +26,126 @@ from asyncio.exceptions import CancelledError
from asyncio.streams import StreamReader, StreamWriter, start_unix_server from asyncio.streams import StreamReader, StreamWriter, start_unix_server
from asyncio.tasks import Task, create_task from asyncio.tasks import Task, create_task
from pathlib import Path from pathlib import Path
from typing import Tuple, Union, Optional from typing import Optional, Union
from . import constants
from .pool import SimpleTaskPool
from .client import ControlClient, UnixControlClient from .client import ControlClient, UnixControlClient
from .pool import TaskPool, SimpleTaskPool
from .session import ControlSession
from .types import ConnectedCallbackT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def tasks_str(num: int) -> str:
return "tasks" if num != 1 else "task"
def get_cmd_arg(msg: str) -> Union[Tuple[str, Optional[int]], Tuple[None, None]]:
cmd = msg.strip().split(' ', 1)
if len(cmd) > 1:
try:
return cmd[0], int(cmd[1])
except ValueError:
return None, None
return cmd[0], None
class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool
client_class = ControlClient """
Abstract base class for a task pool control server.
This class acts as a wrapper around an async server instance and initializes a `ControlSession` upon a client
connecting to it. The entire interface is defined within that session class.
"""
_client_class = ControlClient
@classmethod
@property
def client_class_name(cls) -> str:
"""Returns the name of the control client class matching the server class."""
return cls._client_class.__name__
@abstractmethod @abstractmethod
async def get_server_instance(self, client_connected_cb, **kwargs) -> AbstractServer: async def _get_server_instance(self, client_connected_cb: ConnectedCallbackT, **kwargs) -> AbstractServer:
"""
Initializes, starts, and returns an async server instance (Unix or TCP type).
Args:
client_connected_cb:
The callback for when a client connects to the server (as per `asyncio.start_server` or
`asyncio.start_unix_server`); will always be the internal `_client_connected_cb` method.
**kwargs (optional):
Keyword arguments to pass into the function that starts the server.
Returns:
The running server object (a type of `asyncio.Server`).
"""
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def final_callback(self) -> None: def _final_callback(self) -> None:
"""The method to run after the server's `serve_forever` methods ends for whatever reason."""
raise NotImplementedError raise NotImplementedError
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None: def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
self._pool: SimpleTaskPool = pool """
Initializes by merely saving the internal attributes, but without starting the server yet.
The task pool must be passed here and can not be set/changed afterwards. This means a control server is always
tied to one specific task pool.
Args:
pool:
An instance of a `BaseTaskPool` subclass to tie the server to.
**server_kwargs (optional):
Keyword arguments that will be passed into the function that starts the server.
"""
self._pool: Union[TaskPool, SimpleTaskPool] = pool
self._server_kwargs = server_kwargs self._server_kwargs = server_kwargs
self._server: Optional[AbstractServer] = None self._server: Optional[AbstractServer] = None
async def _start_tasks(self, writer: StreamWriter, num: int = None) -> None: @property
if num is None: def pool(self) -> Union[TaskPool, SimpleTaskPool]:
num = 1 """Read-only property for accessing the task pool instance controlled by the server."""
log.debug("%s requests starting %s %s", self.client_class.__name__, num, tasks_str(num)) return self._pool
writer.write(str(await self._pool.start(num)).encode())
def _stop_tasks(self, writer: StreamWriter, num: int = None) -> None: def is_serving(self) -> bool:
if num is None: """Wrapper around the `asyncio.Server.is_serving` method."""
num = 1 return self._server.is_serving()
log.debug("%s requests stopping %s %s", self.client_class.__name__, num, tasks_str(num))
# the requested number may be greater than the total number of running tasks
writer.write(str(self._pool.stop(num)).encode())
def _stop_all_tasks(self, writer: StreamWriter) -> None:
log.debug("%s requests stopping all tasks", self.client_class.__name__)
writer.write(str(self._pool.stop_all()).encode())
def _pool_size(self, writer: StreamWriter) -> None:
log.debug("%s requests number of running tasks", self.client_class.__name__)
writer.write(str(self._pool.num_running).encode())
def _pool_func(self, writer: StreamWriter) -> None:
log.debug("%s requests pool function", self.client_class.__name__)
writer.write(self._pool.func_name.encode())
async def _listen(self, reader: StreamReader, writer: StreamWriter) -> None:
while self._server.is_serving():
msg = (await reader.read(constants.MSG_BYTES)).decode().strip()
if not msg:
log.debug("%s disconnected", self.client_class.__name__)
break
cmd, arg = get_cmd_arg(msg)
if cmd == constants.CMD_START:
await self._start_tasks(writer, arg)
elif cmd == constants.CMD_STOP:
self._stop_tasks(writer, arg)
elif cmd == constants.CMD_STOP_ALL:
self._stop_all_tasks(writer)
elif cmd == constants.CMD_NUM_RUNNING:
self._pool_size(writer)
elif cmd == constants.CMD_FUNC:
self._pool_func(writer)
else:
log.debug("%s sent invalid command: %s", self.client_class.__name__, msg)
writer.write(b"Invalid command!")
await writer.drain()
async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None: async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None:
log.debug("%s connected", self.client_class.__name__) """
writer.write(str(self._pool).encode()) The universal client callback that will be passed into the `_get_server_instance` method.
await writer.drain() Instantiates a control session, performs the client handshake, and enters the session's `listen` loop.
await self._listen(reader, writer) """
session = ControlSession(self, reader, writer)
await session.client_handshake()
await session.listen()
async def _serve_forever(self) -> None: async def _serve_forever(self) -> None:
"""
To be run as an `asyncio.Task` by the following method.
Serves as a wrapper around the the `asyncio.Server.serve_forever` method that ensures that the `_final_callback`
method is called, when the former method ends for whatever reason.
"""
try: try:
async with self._server: async with self._server:
await self._server.serve_forever() await self._server.serve_forever()
except CancelledError: except CancelledError:
log.debug("%s stopped", self.__class__.__name__) log.debug("%s stopped", self.__class__.__name__)
finally: finally:
self.final_callback() self._final_callback()
async def serve_forever(self) -> Task: async def serve_forever(self) -> Task:
"""
This method actually starts the server and begins listening to client connections on the specified interface.
It should never block because the serving will be performed in a separate task.
"""
log.debug("Starting %s...", self.__class__.__name__) log.debug("Starting %s...", self.__class__.__name__)
self._server = await self.get_server_instance(self._client_connected_cb, **self._server_kwargs) self._server = await self._get_server_instance(self._client_connected_cb, **self._server_kwargs)
return create_task(self._serve_forever()) return create_task(self._serve_forever())
class UnixControlServer(ControlServer): class UnixControlServer(ControlServer):
client_class = UnixControlClient """Task pool control server class that exposes a unix socket for control clients to connect to."""
_client_class = UnixControlClient
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None: def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
self._socket_path = Path(server_kwargs.pop('path')) self._socket_path = Path(server_kwargs.pop('path'))
super().__init__(pool, **server_kwargs) super().__init__(pool, **server_kwargs)
async def get_server_instance(self, client_connected_cb, **kwargs) -> AbstractServer: async def _get_server_instance(self, client_connected_cb: ConnectedCallbackT, **kwargs) -> AbstractServer:
srv = await start_unix_server(client_connected_cb, self._socket_path, **kwargs) server = await start_unix_server(client_connected_cb, self._socket_path, **kwargs)
log.debug("Opened socket '%s'", str(self._socket_path)) log.debug("Opened socket '%s'", str(self._socket_path))
return srv return server
def final_callback(self) -> None: def _final_callback(self) -> None:
"""Removes the unix socket on which the server was listening."""
self._socket_path.unlink() self._socket_path.unlink()
log.debug("Removed socket '%s'", str(self._socket_path)) log.debug("Removed socket '%s'", str(self._socket_path))

View File

@ -0,0 +1,218 @@
import logging
import json
from argparse import ArgumentError, ArgumentParser, HelpFormatter, Namespace
from asyncio.streams import StreamReader, StreamWriter
from typing import Callable, Optional, Type, Union, TYPE_CHECKING
from . import constants
from .exceptions import HelpRequested
from .helpers import get_first_doc_line, return_or_exception, tasks_str
from .pool import TaskPool, SimpleTaskPool
if TYPE_CHECKING:
from .server import ControlServer
log = logging.getLogger(__name__)
NUM = 'num'
WIDTH = 'width'
class CommandParser(ArgumentParser):
@staticmethod
def help_formatter_factory(terminal_width: int) -> Type[HelpFormatter]:
class ClientHelpFormatter(HelpFormatter):
def __init__(self, *args, **kwargs) -> None:
kwargs[WIDTH] = terminal_width
super().__init__(*args, **kwargs)
return ClientHelpFormatter
def __init__(self, *args, **kwargs) -> None:
parent: CommandParser = kwargs.pop('parent', None)
self._stream_writer: StreamWriter = parent.stream_writer if parent else kwargs.pop('writer')
self._terminal_width: int = parent.terminal_width if parent else kwargs.pop(WIDTH)
kwargs.setdefault('formatter_class', self.help_formatter_factory(self._terminal_width))
kwargs.setdefault('exit_on_error', False)
super().__init__(*args, **kwargs)
@property
def stream_writer(self) -> StreamWriter:
return self._stream_writer
@property
def terminal_width(self) -> int:
return self._terminal_width
def _print_message(self, message: str, *args, **kwargs) -> None:
if message:
self.stream_writer.write(message.encode())
def exit(self, status: int = 0, message: str = None) -> None:
if message:
self._print_message(message)
def print_help(self, file=None) -> None:
super().print_help(file)
raise HelpRequested
class ControlSession:
def __init__(self, server: 'ControlServer', reader: StreamReader, writer: StreamWriter) -> None:
self._control_server: 'ControlServer' = server
self._pool: Union[TaskPool, SimpleTaskPool] = server.pool
self._client_class_name = server.client_class_name
self._reader: StreamReader = reader
self._writer: StreamWriter = writer
self._parser: Optional[CommandParser] = None
def _add_base_commands(self):
subparsers = self._parser.add_subparsers(title="Commands", dest=constants.CMD)
subparsers.add_parser(
constants.CMD_NAME,
prog=constants.CMD_NAME,
help=get_first_doc_line(self._pool.__class__.__str__),
parent=self._parser,
)
subparser_pool_size = subparsers.add_parser(
constants.CMD_POOL_SIZE,
prog=constants.CMD_POOL_SIZE,
help="Get/set the maximum number of tasks in the pool",
parent=self._parser,
)
subparser_pool_size.add_argument(
NUM,
nargs='?',
help=f"If passed a number: {get_first_doc_line(self._pool.__class__.pool_size.fset)} "
f"If omitted: {get_first_doc_line(self._pool.__class__.pool_size.fget)}"
)
subparsers.add_parser(
constants.CMD_NUM_RUNNING,
help=get_first_doc_line(self._pool.__class__.num_running.fget),
parent=self._parser,
)
return subparsers
def _add_simple_commands(self):
subparsers = self._add_base_commands()
subparser = subparsers.add_parser(
constants.CMD_START,
prog=constants.CMD_START,
help=get_first_doc_line(self._pool.__class__.start),
parent=self._parser,
)
subparser.add_argument(
NUM,
nargs='?',
type=int,
default=1,
help="Number of tasks to start. Defaults to 1."
)
subparser = subparsers.add_parser(
constants.CMD_STOP,
prog=constants.CMD_STOP,
help=get_first_doc_line(self._pool.__class__.stop),
parent=self._parser,
)
subparser.add_argument(
NUM,
nargs='?',
type=int,
default=1,
help="Number of tasks to stop. Defaults to 1."
)
subparsers.add_parser(
constants.CMD_STOP_ALL,
prog=constants.CMD_STOP_ALL,
help=get_first_doc_line(self._pool.__class__.stop_all),
parent=self._parser,
)
subparsers.add_parser(
constants.CMD_FUNC_NAME,
prog=constants.CMD_FUNC_NAME,
help=get_first_doc_line(self._pool.__class__.func_name.fget),
parent=self._parser,
)
def _init_parser(self, client_terminal_width: int) -> None:
self._parser = CommandParser(prog='', writer=self._writer, width=client_terminal_width)
if isinstance(self._pool, TaskPool):
pass # TODO
elif isinstance(self._pool, SimpleTaskPool):
self._add_simple_commands()
async def client_handshake(self) -> None:
client_info = json.loads((await self._reader.read(constants.MSG_BYTES)).decode().strip())
log.debug("%s connected", self._client_class_name)
self._init_parser(client_info[WIDTH])
self._writer.write(str(self._pool).encode())
await self._writer.drain()
async def _write_function_output(self, func: Callable, *args, **kwargs) -> None:
output = await return_or_exception(func, *args, **kwargs)
self._writer.write(b"ok" if output is None else str(output).encode())
async def _cmd_name(self, **_kwargs) -> None:
log.debug("%s requests task pool name", self._client_class_name)
await self._write_function_output(self._pool.__class__.__str__, self._pool)
async def _cmd_pool_size(self, **kwargs) -> None:
num = kwargs.get(NUM)
if num is None:
log.debug("%s requests pool size", self._client_class_name)
await self._write_function_output(self._pool.__class__.pool_size.fget, self._pool)
else:
log.debug("%s requests setting pool size to %s", self._client_class_name, num)
await self._write_function_output(self._pool.__class__.pool_size.fset, self._pool, int(num))
async def _cmd_num_running(self, **_kwargs) -> None:
log.debug("%s requests number of running tasks", self._client_class_name)
await self._write_function_output(self._pool.__class__.num_running.fget, self._pool)
async def _cmd_start(self, **kwargs) -> None:
num = kwargs[NUM]
log.debug("%s requests starting %s %s", self._client_class_name, num, tasks_str(num))
await self._write_function_output(self._pool.start, num)
async def _cmd_stop(self, **kwargs) -> None:
num = kwargs[NUM]
log.debug("%s requests stopping %s %s", self._client_class_name, num, tasks_str(num))
await self._write_function_output(self._pool.stop, num)
async def _cmd_stop_all(self, **_kwargs) -> None:
log.debug("%s requests stopping all tasks", self._client_class_name)
await self._write_function_output(self._pool.stop_all)
async def _cmd_func_name(self, **_kwargs) -> None:
log.debug("%s requests pool function name", self._client_class_name)
await self._write_function_output(self._pool.__class__.func_name.fget, self._pool)
async def _execute_command(self, args: Namespace) -> None:
args = vars(args)
cmd: str = args.pop(constants.CMD, None)
if cmd is not None:
method = getattr(self, f'_cmd_{cmd.replace("-", "_")}')
await method(**args)
async def _parse_command(self, msg: str) -> None:
try:
args, argv = self._parser.parse_known_args(msg.split(' '))
except ArgumentError as e:
self._writer.write(str(e).encode())
return
except HelpRequested:
return
if argv:
log.debug("%s sent unknown arguments: %s", self._client_class_name, msg)
self._writer.write(b"Invalid command!")
return
await self._execute_command(args)
async def listen(self) -> None:
while self._control_server.is_serving():
msg = (await self._reader.read(constants.MSG_BYTES)).decode().strip()
if not msg:
log.debug("%s disconnected", self._client_class_name)
break
await self._parse_command(msg)
await self._writer.drain()

View File

@ -28,10 +28,11 @@ T = TypeVar('T')
ArgsT = Iterable[Any] ArgsT = Iterable[Any]
KwArgsT = Mapping[str, Any] KwArgsT = Mapping[str, Any]
AnyCallableT = Callable[[...], Union[Awaitable[T], T]] AnyCallableT = Callable[[...], Union[T, Awaitable[T]]]
CoroutineFunc = Callable[[...], Awaitable[Any]] CoroutineFunc = Callable[[...], Awaitable[Any]]
EndCallbackT = Callable EndCallbackT = Callable
CancelCallbackT = Callable CancelCallbackT = Callable
ConnectedCallbackT = Callable[[StreamReader, StreamWriter], Awaitable[None]]
ClientConnT = Union[Tuple[StreamReader, StreamWriter], Tuple[None, None]] ClientConnT = Union[Tuple[StreamReader, StreamWriter], Tuple[None, None]]

View File

@ -86,3 +86,11 @@ class HelpersTestCase(IsolatedAsyncioTestCase):
mock_queue = MagicMock(join=mock_join) mock_queue = MagicMock(join=mock_join)
self.assertIsNone(await helpers.join_queue(mock_queue)) self.assertIsNone(await helpers.join_queue(mock_queue))
mock_join.assert_awaited_once_with() mock_join.assert_awaited_once_with()
def test_task_str(self):
self.assertEqual("task", helpers.tasks_str(1))
self.assertEqual("tasks", helpers.tasks_str(0))
self.assertEqual("tasks", helpers.tasks_str(-1))
self.assertEqual("tasks", helpers.tasks_str(2))
self.assertEqual("tasks", helpers.tasks_str(-10))
self.assertEqual("tasks", helpers.tasks_str(42))

143
tests/test_server.py Normal file
View File

@ -0,0 +1,143 @@
import asyncio
import logging
from pathlib import Path
from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch
from asyncio_taskpool import server
from asyncio_taskpool.client import ControlClient, UnixControlClient
FOO, BAR = 'foo', 'bar'
class ControlServerTestCase(IsolatedAsyncioTestCase):
log_lvl: int
@classmethod
def setUpClass(cls) -> None:
cls.log_lvl = server.log.level
server.log.setLevel(999)
@classmethod
def tearDownClass(cls) -> None:
server.log.setLevel(cls.log_lvl)
def setUp(self) -> None:
self.abstract_patcher = patch('asyncio_taskpool.server.ControlServer.__abstractmethods__', set())
self.mock_abstract_methods = self.abstract_patcher.start()
self.mock_pool = MagicMock()
self.kwargs = {FOO: 123, BAR: 456}
self.server = server.ControlServer(pool=self.mock_pool, **self.kwargs)
def tearDown(self) -> None:
self.abstract_patcher.stop()
def test_client_class_name(self):
self.assertEqual(ControlClient.__name__, server.ControlServer.client_class_name)
async def test_abstract(self):
with self.assertRaises(NotImplementedError):
args = [AsyncMock()]
await self.server._get_server_instance(*args)
with self.assertRaises(NotImplementedError):
self.server._final_callback()
def test_init(self):
self.assertEqual(self.mock_pool, self.server._pool)
self.assertEqual(self.kwargs, self.server._server_kwargs)
self.assertIsNone(self.server._server)
def test_pool(self):
self.assertEqual(self.mock_pool, self.server.pool)
def test_is_serving(self):
self.server._server = MagicMock(is_serving=MagicMock(return_value=FOO + BAR))
self.assertEqual(FOO + BAR, self.server.is_serving())
@patch.object(server, 'ControlSession')
async def test__client_connected_cb(self, mock_client_session_cls: MagicMock):
mock_client_handshake, mock_listen = AsyncMock(), AsyncMock()
mock_client_session_cls.return_value = MagicMock(client_handshake=mock_client_handshake, listen=mock_listen)
mock_reader, mock_writer = MagicMock(), MagicMock()
self.assertIsNone(await self.server._client_connected_cb(mock_reader, mock_writer))
mock_client_session_cls.assert_called_once_with(self.server, mock_reader, mock_writer)
mock_client_handshake.assert_awaited_once_with()
mock_listen.assert_awaited_once_with()
@patch.object(server.ControlServer, '_final_callback')
async def test__serve_forever(self, mock__final_callback: MagicMock):
mock_aenter, mock_serve_forever = AsyncMock(), AsyncMock(side_effect=asyncio.CancelledError)
self.server._server = MagicMock(__aenter__=mock_aenter, serve_forever=mock_serve_forever)
with self.assertLogs(server.log, logging.DEBUG):
self.assertIsNone(await self.server._serve_forever())
mock_aenter.assert_awaited_once_with()
mock_serve_forever.assert_awaited_once_with()
mock__final_callback.assert_called_once_with()
mock_aenter.reset_mock()
mock_serve_forever.reset_mock(side_effect=True)
mock__final_callback.reset_mock()
self.assertIsNone(await self.server._serve_forever())
mock_aenter.assert_awaited_once_with()
mock_serve_forever.assert_awaited_once_with()
mock__final_callback.assert_called_once_with()
@patch.object(server, 'create_task')
@patch.object(server.ControlServer, '_serve_forever', new_callable=MagicMock())
@patch.object(server.ControlServer, '_get_server_instance')
async def test_serve_forever(self, mock__get_server_instance: AsyncMock, mock__serve_forever: MagicMock,
mock_create_task: MagicMock):
mock__serve_forever.return_value = mock_awaitable = 'some_coroutine'
mock_create_task.return_value = expected_output = 12345
output = await self.server.serve_forever()
self.assertEqual(expected_output, output)
mock__get_server_instance.assert_awaited_once_with(self.server._client_connected_cb, **self.kwargs)
mock__serve_forever.assert_called_once_with()
mock_create_task.assert_called_once_with(mock_awaitable)
class UnixControlServerTestCase(IsolatedAsyncioTestCase):
log_lvl: int
@classmethod
def setUpClass(cls) -> None:
cls.log_lvl = server.log.level
server.log.setLevel(999)
@classmethod
def tearDownClass(cls) -> None:
server.log.setLevel(cls.log_lvl)
def setUp(self) -> None:
self.base_init_patcher = patch.object(server.ControlServer, '__init__')
self.mock_base_init = self.base_init_patcher.start()
self.mock_pool = MagicMock()
self.path = '/tmp/asyncio_taskpool'
self.kwargs = {FOO: 123, BAR: 456}
self.server = server.UnixControlServer(pool=self.mock_pool, path=self.path, **self.kwargs)
def tearDown(self) -> None:
self.base_init_patcher.stop()
def test__client_class(self):
self.assertEqual(UnixControlClient, self.server._client_class)
def test_init(self):
self.assertEqual(Path(self.path), self.server._socket_path)
self.mock_base_init.assert_called_once_with(self.mock_pool, **self.kwargs)
@patch.object(server, 'start_unix_server')
async def test__get_server_instance(self, mock_start_unix_server: AsyncMock):
mock_start_unix_server.return_value = expected_output = 'totally_a_server'
mock_callback, mock_kwargs = MagicMock(), {'a': 1, 'b': 2}
args = [mock_callback]
output = await self.server._get_server_instance(*args, **mock_kwargs)
self.assertEqual(expected_output, output)
mock_start_unix_server.assert_called_once_with(mock_callback, Path(self.path), **mock_kwargs)
def test__final_callback(self):
self.server._socket_path = MagicMock()
self.assertIsNone(self.server._final_callback())
self.server._socket_path.unlink.assert_called_once_with()