Compare commits

...

5 Commits

7 changed files with 41 additions and 32 deletions

View File

@ -22,7 +22,7 @@ copyright = '2022 Daniil Fajnberg'
author = 'Daniil Fajnberg'
# The full version, including alpha/beta/rc tags
release = '1.1.1'
release = '1.1.4'
# -- General configuration ---------------------------------------------------

View File

@ -1,6 +1,6 @@
[metadata]
name = asyncio-taskpool
version = 1.1.1
version = 1.1.4
author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks

View File

@ -85,6 +85,7 @@ class ControlClient(ABC):
"""
self._connected = True
writer.write(json.dumps(self._client_info()).encode())
writer.write(b'\n')
await writer.drain()
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
print("Type '-h' to get help and usage instructions for all available commands.\n")
@ -131,6 +132,7 @@ class ControlClient(ABC):
try:
# Send the command to the server.
writer.write(cmd.encode())
writer.write(b'\n')
await writer.drain()
except ConnectionError as e:
self._connected = False

View File

@ -32,7 +32,7 @@ from typing import Callable, Optional, Union, TYPE_CHECKING
from .parser import ControlParser
from ..exceptions import CommandError, HelpRequested, ParserError
from ..pool import TaskPool, SimpleTaskPool
from ..internals.constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES
from ..internals.constants import CLIENT_INFO, CMD, CMD_OK
from ..internals.helpers import return_or_exception
if TYPE_CHECKING:
@ -103,7 +103,7 @@ class ControlSession:
elif param.kind == param.VAR_POSITIONAL:
var_pos = kwargs.pop(param.name)
output = await return_or_exception(method, *normal_pos, *var_pos, **kwargs)
self._writer.write(CMD_OK if output is None else str(output).encode())
self._response_buffer.write(CMD_OK.decode() if output is None else str(output))
async def _exec_property_and_respond(self, prop: property, **kwargs) -> None:
"""
@ -122,10 +122,10 @@ class ControlSession:
if kwargs:
log.debug("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
await return_or_exception(prop.fset, self._pool, **kwargs)
self._writer.write(CMD_OK)
self._response_buffer.write(CMD_OK.decode())
else:
log.debug("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode())
self._response_buffer.write(str(await return_or_exception(prop.fget, self._pool)))
async def client_handshake(self) -> None:
"""
@ -134,7 +134,8 @@ class ControlSession:
Client info is retrieved, server info is sent back, and the
:class:`ControlParser <asyncio_taskpool.control.parser.ControlParser>` is set up.
"""
client_info = json.loads((await self._reader.read(SESSION_MSG_BYTES)).decode().strip())
msg = (await self._reader.readline()).decode().strip()
client_info = json.loads(msg)
log.debug("%s connected", self._client_class_name)
parser_kwargs = {
'stream': self._response_buffer,
@ -146,7 +147,7 @@ class ControlSession:
self._parser.add_subparsers(title="Commands",
metavar="(A command followed by '-h' or '--help' will show command-specific help.)")
self._parser.add_class_commands(self._pool.__class__)
self._writer.write(str(self._pool).encode())
self._writer.write(str(self._pool).encode() + b'\n')
await self._writer.drain()
async def _parse_command(self, msg: str) -> None:
@ -187,12 +188,12 @@ class ControlSession:
It will obviously block indefinitely.
"""
while self._control_server.is_serving():
msg = (await self._reader.read(SESSION_MSG_BYTES)).decode().strip()
msg = (await self._reader.readline()).decode().strip()
if not msg:
log.debug("%s disconnected", self._client_class_name)
break
await self._parse_command(msg)
response = self._response_buffer.getvalue()
response = self._response_buffer.getvalue() + "\n"
self._response_buffer.seek(0)
self._response_buffer.truncate()
self._writer.write(response.encode())

View File

@ -765,9 +765,10 @@ class TaskPool(BaseTaskPool):
def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates tasks in the pool with arguments from the supplied iterable.
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `arg_iter`.
The method is a task-based equivalent of the `multiprocessing.pool.Pool.map` method.
All the new tasks are added to the same task group.
@ -819,10 +820,10 @@ class TaskPool(BaseTaskPool):
def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`.
Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. The method is a task-based
equivalent of the `multiprocessing.pool.Pool.map` method.
All the new tasks are added to the same task group.
@ -876,6 +877,8 @@ class TaskPool(BaseTaskPool):
def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
as positional arguments to the function.
Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`.
@ -893,6 +896,8 @@ class TaskPool(BaseTaskPool):
def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be
unpacked as keyword-arguments to the function.
Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.

View File

@ -71,7 +71,7 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
self.assertIsNone(await self.client._server_handshake(self.mock_reader, self.mock_writer))
self.assertTrue(self.client._connected)
mock__client_info.assert_called_once_with()
self.mock_write.assert_called_once_with(json.dumps(mock_info).encode())
self.mock_write.assert_has_calls([call(json.dumps(mock_info).encode()), call(b'\n')])
self.mock_drain.assert_awaited_once_with()
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
self.mock_print.assert_has_calls([
@ -121,7 +121,7 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
mock__get_command.return_value = cmd = FOO + BAR + ' 123'
self.mock_drain.side_effect = err = ConnectionError()
self.assertIsNone(await self.client._interact(self.mock_reader, self.mock_writer))
self.mock_write.assert_called_once_with(cmd.encode())
self.mock_write.assert_has_calls([call(cmd.encode()), call(b'\n')])
self.mock_drain.assert_awaited_once_with()
self.mock_read.assert_not_awaited()
self.mock_print.assert_called_once_with(err, file=sys.stderr)
@ -133,7 +133,7 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
self.mock_print.reset_mock()
self.assertIsNone(await self.client._interact(self.mock_reader, self.mock_writer))
self.mock_write.assert_called_once_with(cmd.encode())
self.mock_write.assert_has_calls([call(cmd.encode()), call(b'\n')])
self.mock_drain.assert_awaited_once_with()
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
self.mock_print.assert_called_once_with(FOO)

View File

@ -26,7 +26,7 @@ from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch, call
from asyncio_taskpool.control import session
from asyncio_taskpool.internals.constants import CLIENT_INFO, CMD, SESSION_MSG_BYTES
from asyncio_taskpool.internals.constants import CLIENT_INFO, CMD
from asyncio_taskpool.exceptions import HelpRequested
from asyncio_taskpool.pool import SimpleTaskPool
@ -74,7 +74,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock_return_or_exception.assert_awaited_once_with(
method, self.mock_pool, test_arg1, test_arg2, *test_var_args, **test_rest
)
self.mock_writer.write.assert_called_once_with(session.CMD_OK)
self.assertEqual(session.CMD_OK.decode(), self.session._response_buffer.getvalue())
@patch.object(session, 'return_or_exception')
async def test__exec_property_and_respond(self, mock_return_or_exception: AsyncMock):
@ -85,15 +85,16 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock_return_or_exception.return_value = None
self.assertIsNone(await self.session._exec_property_and_respond(prop, **kwargs))
mock_return_or_exception.assert_awaited_once_with(prop_set, self.mock_pool, **kwargs)
self.mock_writer.write.assert_called_once_with(session.CMD_OK)
self.assertEqual(session.CMD_OK.decode(), self.session._response_buffer.getvalue())
mock_return_or_exception.reset_mock()
self.mock_writer.write.reset_mock()
self.session._response_buffer.seek(0)
self.session._response_buffer.truncate()
mock_return_or_exception.return_value = val = 420.69
self.assertIsNone(await self.session._exec_property_and_respond(prop))
mock_return_or_exception.assert_awaited_once_with(prop_get, self.mock_pool)
self.mock_writer.write.assert_called_once_with(str(val).encode())
self.assertEqual(str(val), self.session._response_buffer.getvalue())
@patch.object(session, 'ControlParser')
async def test_client_handshake(self, mock_parser_cls: MagicMock):
@ -102,8 +103,8 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock_parser_cls.return_value = mock_parser
width = 5678
msg = ' ' + json.dumps({CLIENT_INFO.TERMINAL_WIDTH: width, FOO: BAR}) + ' '
mock_read = AsyncMock(return_value=msg.encode())
self.mock_reader.read = mock_read
mock_readline = AsyncMock(return_value=msg.encode())
self.mock_reader.readline = mock_readline
self.mock_writer.drain = AsyncMock()
expected_parser_kwargs = {
'stream': self.session._response_buffer,
@ -117,11 +118,11 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
}
self.assertIsNone(await self.session.client_handshake())
self.assertEqual(mock_parser, self.session._parser)
mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
mock_readline.assert_awaited_once_with()
mock_parser_cls.assert_called_once_with(**expected_parser_kwargs)
mock_add_subparsers.assert_called_once_with(**expected_subparsers_kwargs)
mock_add_class_commands.assert_called_once_with(self.mock_pool.__class__)
self.mock_writer.write.assert_called_once_with(str(self.mock_pool).encode())
self.mock_writer.write.assert_called_once_with(str(self.mock_pool).encode() + b'\n')
self.mock_writer.drain.assert_awaited_once_with()
@patch.object(session.ControlSession, '_exec_property_and_respond')
@ -190,27 +191,27 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
@patch.object(session.ControlSession, '_parse_command')
async def test_listen(self, mock__parse_command: AsyncMock):
def make_reader_return_empty():
self.mock_reader.read.return_value = b''
self.mock_reader.readline.return_value = b''
self.mock_writer.drain = AsyncMock(side_effect=make_reader_return_empty)
msg = "fascinating"
self.mock_reader.read = AsyncMock(return_value=f' {msg} '.encode())
self.mock_reader.readline = AsyncMock(return_value=f' {msg} '.encode())
response = FOO + BAR + FOO
self.session._response_buffer.write(response)
self.assertIsNone(await self.session.listen())
self.mock_reader.read.assert_has_awaits([call(SESSION_MSG_BYTES), call(SESSION_MSG_BYTES)])
self.mock_reader.readline.assert_has_awaits([call(), call()])
mock__parse_command.assert_awaited_once_with(msg)
self.assertEqual('', self.session._response_buffer.getvalue())
self.mock_writer.write.assert_called_once_with(response.encode())
self.mock_writer.write.assert_called_once_with(response.encode() + b'\n')
self.mock_writer.drain.assert_awaited_once_with()
self.mock_reader.read.reset_mock()
self.mock_reader.readline.reset_mock()
mock__parse_command.reset_mock()
self.mock_writer.write.reset_mock()
self.mock_writer.drain.reset_mock()
self.mock_server.is_serving = MagicMock(return_value=False)
self.assertIsNone(await self.session.listen())
self.mock_reader.read.assert_not_awaited()
self.mock_reader.readline.assert_not_awaited()
mock__parse_command.assert_not_awaited()
self.mock_writer.write.assert_not_called()
self.mock_writer.drain.assert_not_awaited()