From ed376b6f82db07c1456acb416d89a60c2ecdeb48 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Fri, 4 Feb 2022 17:41:10 +0100 Subject: [PATCH] refactoring, additional command, improvements --- src/asyncio_taskpool/client.py | 15 +++++++++---- src/asyncio_taskpool/constants.py | 1 + src/asyncio_taskpool/server.py | 37 +++++++++++++++++++++---------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/asyncio_taskpool/client.py b/src/asyncio_taskpool/client.py index e63d52d..c27ac06 100644 --- a/src/asyncio_taskpool/client.py +++ b/src/asyncio_taskpool/client.py @@ -22,14 +22,21 @@ class ControlClient(ABC): msg = input("> ").strip().lower() except EOFError: msg = constants.CLIENT_EXIT + except KeyboardInterrupt: + print() + return if msg == constants.CLIENT_EXIT: writer.close() self._connected = False return - writer.write(msg.encode()) - await writer.drain() - print("Command sent; awaiting response...") - print("Server response:", (await reader.read(constants.MSG_BYTES)).decode()) + try: + writer.write(msg.encode()) + await writer.drain() + except ConnectionError as e: + self._connected = False + print(e, file=sys.stderr) + return + print((await reader.read(constants.MSG_BYTES)).decode()) async def start(self): reader, writer = await self.open_connection(**self._conn_kwargs) diff --git a/src/asyncio_taskpool/constants.py b/src/asyncio_taskpool/constants.py index 954c692..b3dbce2 100644 --- a/src/asyncio_taskpool/constants.py +++ b/src/asyncio_taskpool/constants.py @@ -3,4 +3,5 @@ MSG_BYTES = 1024 CMD_START = 'start' CMD_STOP = 'stop' CMD_STOP_ALL = 'stop_all' +CMD_SIZE = 'size' CLIENT_EXIT = 'exit' diff --git a/src/asyncio_taskpool/server.py b/src/asyncio_taskpool/server.py index 27231e8..72a8cbf 100644 --- a/src/asyncio_taskpool/server.py +++ b/src/asyncio_taskpool/server.py @@ -45,44 +45,57 @@ class ControlServer(ABC): self._server_kwargs = server_kwargs self._server: Optional[AbstractServer] = None - def _start_tasks(self, num: int, writer: StreamWriter) -> None: - log.debug("Client requests starting %s tasks", num) + def _start_tasks(self, writer: StreamWriter, num: int = None) -> None: + if num is None: + num = 1 + log.debug("%s requests starting %s %s", self.client_class.__name__, num, tasks_str(num)) self._pool.start(num) size = self._pool.size writer.write(f"{num} new {tasks_str(num)} started! {size} {tasks_str(size)} active now.".encode()) - def _stop_tasks(self, num: int, writer: StreamWriter) -> None: - log.debug("Client requests stopping %s tasks", num) + def _stop_tasks(self, writer: StreamWriter, num: int = None) -> None: + if num is None: + num = 1 + log.debug("%s requests stopping %s %s", self.client_class.__name__, num, tasks_str(num)) num = self._pool.stop(num) # the requested number may be greater than the total number of running tasks size = self._pool.size writer.write(f"{num} {tasks_str(num)} stopped! {size} {tasks_str(size)} left.".encode()) def _stop_all_tasks(self, writer: StreamWriter) -> None: - log.debug("Client requests stopping all tasks") + log.debug("%s requests stopping all tasks", self.client_class.__name__) num = self._pool.stop_all() writer.write(f"Remaining {num} {tasks_str(num)} stopped!".encode()) - async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None: - log.debug("%s connected", self.client_class.__name__) - writer.write(f"{self.__class__.__name__} for {self._pool}".encode()) - await writer.drain() - while True: + def _pool_size(self, writer: StreamWriter) -> None: + log.debug("%s requests pool size", self.client_class.__name__) + writer.write(f'{self._pool.size}'.encode()) + + async def _listen(self, reader: StreamReader, writer: StreamWriter) -> None: + while self._server.is_serving(): msg = (await reader.read(constants.MSG_BYTES)).decode().strip() if not msg: log.debug("%s disconnected", self.client_class.__name__) break cmd, arg = get_cmd_arg(msg) if cmd == constants.CMD_START: - self._start_tasks(arg, writer) + self._start_tasks(writer, arg) elif cmd == constants.CMD_STOP: - self._stop_tasks(arg, writer) + self._stop_tasks(writer, arg) elif cmd == constants.CMD_STOP_ALL: self._stop_all_tasks(writer) + elif cmd == constants.CMD_SIZE: + self._pool_size(writer) else: log.debug("%s sent invalid command: %s", self.client_class.__name__, msg) writer.write(b"Invalid command!") await writer.drain() + async def _client_connected_cb(self, reader: StreamReader, writer: StreamWriter) -> None: + log.debug("%s connected", self.client_class.__name__) + writer.write(f"{self.__class__.__name__} for {self._pool}".encode()) + await writer.drain() + await self._listen(reader, writer) + async def _serve_forever(self) -> None: try: async with self._server: