From d05f84b2c3c306faa0291ff2144da03b316501d2 Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Mon, 7 Mar 2022 14:34:40 +0100 Subject: [PATCH] additional base commands for control server --- src/asyncio_taskpool/constants.py | 11 +++++ src/asyncio_taskpool/session.py | 67 ++++++++++++++++++++++++++++--- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/src/asyncio_taskpool/constants.py b/src/asyncio_taskpool/constants.py index 9d2b749..5266469 100644 --- a/src/asyncio_taskpool/constants.py +++ b/src/asyncio_taskpool/constants.py @@ -37,10 +37,21 @@ class CLIENT_INFO: class CMD: __slots__ = () + # Base commands: CMD = 'command' NAME = 'name' POOL_SIZE = 'pool-size' + IS_LOCKED = 'is-locked' + LOCK = 'lock' + UNLOCK = 'unlock' NUM_RUNNING = 'num-running' + NUM_CANCELLATIONS = 'num-cancellations' + NUM_ENDED = 'num-ended' + NUM_FINISHED = 'num-finished' + IS_FULL = 'is-full' + GET_GROUP_IDS = 'get-group-ids' + + # Simple commands: START = 'start' STOP = 'stop' STOP_ALL = 'stop-all' diff --git a/src/asyncio_taskpool/session.py b/src/asyncio_taskpool/session.py index 2216076..97f7982 100644 --- a/src/asyncio_taskpool/session.py +++ b/src/asyncio_taskpool/session.py @@ -23,7 +23,7 @@ import logging import json from argparse import ArgumentError, HelpFormatter from asyncio.streams import StreamReader, StreamWriter -from typing import Callable, Optional, Union, TYPE_CHECKING +from typing import Callable, Optional, Type, Union, TYPE_CHECKING from .constants import CMD, SESSION_WRITER, SESSION_MSG_BYTES, CLIENT_INFO from .exceptions import HelpRequested, NotATaskPool, UnknownTaskPoolClass @@ -108,19 +108,36 @@ class ControlSession: These include commands mapping to the following pool methods: - __str__ - pool_size (get/set property) + - is_locked + - lock & unlock - num_running """ - self._add_command(CMD.NAME, short_help=get_first_doc_line(self._pool.__class__.__str__)) + cls: Type[BaseTaskPool] = self._pool.__class__ + self._add_command(CMD.NAME, short_help=get_first_doc_line(cls.__str__)) self._add_command( CMD.POOL_SIZE, short_help="Get/set the maximum number of tasks in the pool.", formatter_class=HelpFormatter ).add_optional_num_argument( default=None, - help=f"If passed a number: {get_first_doc_line(self._pool.__class__.pool_size.fset)} " - f"If omitted: {get_first_doc_line(self._pool.__class__.pool_size.fget)}" + help=f"If passed a number: {get_first_doc_line(cls.pool_size.fset)} " + f"If omitted: {get_first_doc_line(cls.pool_size.fget)}" + ) + self._add_command(CMD.IS_LOCKED, short_help=get_first_doc_line(cls.is_locked.fget)) + self._add_command(CMD.LOCK, short_help=get_first_doc_line(cls.lock)) + self._add_command(CMD.UNLOCK, short_help=get_first_doc_line(cls.unlock)) + self._add_command(CMD.NUM_RUNNING, short_help=get_first_doc_line(cls.num_running.fget)) + self._add_command(CMD.NUM_CANCELLATIONS, short_help=get_first_doc_line(cls.num_cancellations.fget)) + self._add_command(CMD.NUM_ENDED, short_help=get_first_doc_line(cls.num_ended.fget)) + self._add_command(CMD.NUM_FINISHED, short_help=get_first_doc_line(cls.num_finished.fget)) + self._add_command(CMD.IS_FULL, short_help=get_first_doc_line(cls.is_full.fget)) + self._add_command( + CMD.GET_GROUP_IDS, short_help=get_first_doc_line(cls.get_group_ids) + ).add_argument( + 'group_name', + nargs='*', + help="Must be a name of a task group that exists within the pool." ) - self._add_command(CMD.NUM_RUNNING, short_help=get_first_doc_line(self._pool.__class__.num_running.fget)) def _add_simple_commands(self) -> None: """ @@ -227,11 +244,51 @@ class ControlSession: log.debug("%s requests setting pool size to %s", self._client_class_name, num) await self._write_function_output(self._pool.__class__.pool_size.fset, self._pool, num) + async def _cmd_is_locked(self, **_kwargs) -> None: + """Maps to the `is_locked` property of any task pool class.""" + log.debug("%s checks locked status", self._client_class_name) + await self._write_function_output(self._pool.__class__.is_locked.fget, self._pool) + + async def _cmd_lock(self, **_kwargs) -> None: + """Maps to the `lock` method of any task pool class.""" + log.debug("%s requests locking the pool", self._client_class_name) + await self._write_function_output(self._pool.lock) + + async def _cmd_unlock(self, **_kwargs) -> None: + """Maps to the `unlock` method of any task pool class.""" + log.debug("%s requests unlocking the pool", self._client_class_name) + await self._write_function_output(self._pool.unlock) + async def _cmd_num_running(self, **_kwargs) -> None: """Maps to the `num_running` property of any task pool class.""" log.debug("%s requests number of running tasks", self._client_class_name) await self._write_function_output(self._pool.__class__.num_running.fget, self._pool) + async def _cmd_num_cancellations(self, **_kwargs) -> None: + """Maps to the `num_cancellations` property of any task pool class.""" + log.debug("%s requests number of cancelled tasks", self._client_class_name) + await self._write_function_output(self._pool.__class__.num_cancellations.fget, self._pool) + + async def _cmd_num_ended(self, **_kwargs) -> None: + """Maps to the `num_ended` property of any task pool class.""" + log.debug("%s requests number of ended tasks", self._client_class_name) + await self._write_function_output(self._pool.__class__.num_ended.fget, self._pool) + + async def _cmd_num_finished(self, **_kwargs) -> None: + """Maps to the `num_finished` property of any task pool class.""" + log.debug("%s requests number of finished tasks", self._client_class_name) + await self._write_function_output(self._pool.__class__.num_finished.fget, self._pool) + + async def _cmd_is_full(self, **_kwargs) -> None: + """Maps to the `is_full` property of any task pool class.""" + log.debug("%s checks full status", self._client_class_name) + await self._write_function_output(self._pool.__class__.is_full.fget, self._pool) + + async def _cmd_get_group_ids(self, **kwargs) -> None: + """Maps to the `get_group_ids` method of any task pool class.""" + log.debug("%s requests task ids for groups %s", self._client_class_name, kwargs['group_name']) + await self._write_function_output(self._pool.get_group_ids, *kwargs['group_name']) + async def _cmd_start(self, **kwargs) -> None: """Maps to the `start` method of the `SimpleTaskPool` class.""" num = kwargs[NUM]