From 5a72a6d1d11752a5ee1b8f6f7efa0f32ef042aa2 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Sat, 7 May 2022 14:44:43 +0200 Subject: [PATCH] Fix server's control session reading from socket; improve docstrings --- docs/source/conf.py | 2 +- setup.cfg | 2 +- src/asyncio_taskpool/control/client.py | 2 ++ src/asyncio_taskpool/control/session.py | 7 ++++--- src/asyncio_taskpool/pool.py | 13 +++++++++---- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index cdbf36a..4a589eb 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -22,7 +22,7 @@ copyright = '2022 Daniil Fajnberg' author = 'Daniil Fajnberg' # The full version, including alpha/beta/rc tags -release = '1.1.1' +release = '1.1.2' # -- General configuration --------------------------------------------------- diff --git a/setup.cfg b/setup.cfg index 3681ced..69013bd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = asyncio-taskpool -version = 1.1.1 +version = 1.1.2 author = Daniil Fajnberg author_email = mail@daniil.fajnberg.de description = Dynamically manage pools of asyncio tasks diff --git a/src/asyncio_taskpool/control/client.py b/src/asyncio_taskpool/control/client.py index 6bcda10..e0d5edb 100644 --- a/src/asyncio_taskpool/control/client.py +++ b/src/asyncio_taskpool/control/client.py @@ -85,6 +85,7 @@ class ControlClient(ABC): """ self._connected = True writer.write(json.dumps(self._client_info()).encode()) + writer.write(b'\n') await writer.drain() print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode()) print("Type '-h' to get help and usage instructions for all available commands.\n") @@ -131,6 +132,7 @@ class ControlClient(ABC): try: # Send the command to the server. writer.write(cmd.encode()) + writer.write(b'\n') await writer.drain() except ConnectionError as e: self._connected = False diff --git a/src/asyncio_taskpool/control/session.py b/src/asyncio_taskpool/control/session.py index 24786a2..9df3a33 100644 --- a/src/asyncio_taskpool/control/session.py +++ b/src/asyncio_taskpool/control/session.py @@ -32,7 +32,7 @@ from typing import Callable, Optional, Union, TYPE_CHECKING from .parser import ControlParser from ..exceptions import CommandError, HelpRequested, ParserError from ..pool import TaskPool, SimpleTaskPool -from ..internals.constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES +from ..internals.constants import CLIENT_INFO, CMD, CMD_OK from ..internals.helpers import return_or_exception if TYPE_CHECKING: @@ -134,7 +134,8 @@ class ControlSession: Client info is retrieved, server info is sent back, and the :class:`ControlParser ` is set up. """ - client_info = json.loads((await self._reader.read(SESSION_MSG_BYTES)).decode().strip()) + msg = (await self._reader.readline()).decode().strip() + client_info = json.loads(msg) log.debug("%s connected", self._client_class_name) parser_kwargs = { 'stream': self._response_buffer, @@ -187,7 +188,7 @@ class ControlSession: It will obviously block indefinitely. """ while self._control_server.is_serving(): - msg = (await self._reader.read(SESSION_MSG_BYTES)).decode().strip() + msg = (await self._reader.readline()).decode().strip() if not msg: log.debug("%s disconnected", self._client_class_name) break diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index 58a1c71..ca31b4d 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -765,9 +765,10 @@ class TaskPool(BaseTaskPool): def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None: """ - Creates tasks in the pool with arguments from the supplied iterable. + Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `arg_iter`. + The method is a task-based equivalent of the `multiprocessing.pool.Pool.map` method. All the new tasks are added to the same task group. @@ -819,10 +820,10 @@ class TaskPool(BaseTaskPool): def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ - A task-based equivalent of the `multiprocessing.pool.Pool.map` method. - Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. - Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. + + Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. The method is a task-based + equivalent of the `multiprocessing.pool.Pool.map` method. All the new tasks are added to the same task group. @@ -876,6 +877,8 @@ class TaskPool(BaseTaskPool): def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ + Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. + Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked as positional arguments to the function. Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`. @@ -893,6 +896,8 @@ class TaskPool(BaseTaskPool): def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1, group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str: """ + Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool. + Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be unpacked as keyword-arguments to the function. Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.