generated from daniil-berg/boilerplate-py
Compare commits
8 Commits
c72a5035ea
...
v0.7.0-lw
Author | SHA1 | Date | |
---|---|---|---|
689a74c678 | |||
3503c0bf44 | |||
3d104c979e | |||
a92e646411 | |||
3d84e1552b | |||
38f4ec1b06 | |||
6f082288d8 | |||
9fde231250 |
17
README.md
17
README.md
@ -2,9 +2,18 @@
|
|||||||
|
|
||||||
**Dynamically manage pools of asyncio tasks**
|
**Dynamically manage pools of asyncio tasks**
|
||||||
|
|
||||||
|
## Contents
|
||||||
|
- [Contents](#contents)
|
||||||
|
- [Summary](#summary)
|
||||||
|
- [Usage](#usage)
|
||||||
|
- [Installation](#installation)
|
||||||
|
- [Dependencies](#dependencies)
|
||||||
|
- [Testing](#testing)
|
||||||
|
- [License](#license)
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
A task pool is an object with a simple interface for aggregating and dynamically managing asynchronous tasks.
|
A **task pool** is an object with a simple interface for aggregating and dynamically managing asynchronous tasks.
|
||||||
|
|
||||||
With an interface that is intentionally similar to the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class from the standard library, the `TaskPool` provides you such methods as `apply`, `map`, and `starmap` to execute coroutines concurrently as [`asyncio.Task`](https://docs.python.org/3/library/asyncio-task.html#task-object) objects. There is no limitation imposed on what kind of tasks can be run or in what combination, when new ones can be added, or when they can be cancelled.
|
With an interface that is intentionally similar to the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class from the standard library, the `TaskPool` provides you such methods as `apply`, `map`, and `starmap` to execute coroutines concurrently as [`asyncio.Task`](https://docs.python.org/3/library/asyncio-task.html#task-object) objects. There is no limitation imposed on what kind of tasks can be run or in what combination, when new ones can be added, or when they can be cancelled.
|
||||||
|
|
||||||
@ -22,7 +31,7 @@ from asyncio_taskpool import SimpleTaskPool
|
|||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
async def work(foo, bar): ...
|
async def work(_foo, _bar): ...
|
||||||
|
|
||||||
|
|
||||||
...
|
...
|
||||||
@ -55,7 +64,7 @@ Python Version 3.8+, tested on Linux
|
|||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
Install `asyncio-taskpool[dev]` dependencies or just manually install `coverage` with `pip`.
|
Install `asyncio-taskpool[dev]` dependencies or just manually install [`coverage`](https://coverage.readthedocs.io/en/latest/) with `pip`.
|
||||||
Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests and receive the coverage report.
|
Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests and receive the coverage report.
|
||||||
|
|
||||||
## License
|
## License
|
||||||
@ -64,6 +73,6 @@ Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests an
|
|||||||
|
|
||||||
The full license texts for the [GNU GPLv3.0](COPYING) and the [GNU LGPLv3.0](COPYING.LESSER) are included in this repository. If not, see https://www.gnu.org/licenses/.
|
The full license texts for the [GNU GPLv3.0](COPYING) and the [GNU LGPLv3.0](COPYING.LESSER) are included in this repository. If not, see https://www.gnu.org/licenses/.
|
||||||
|
|
||||||
## Copyright
|
---
|
||||||
|
|
||||||
© 2022 Daniil Fajnberg
|
© 2022 Daniil Fajnberg
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = asyncio-taskpool
|
name = asyncio-taskpool
|
||||||
version = 0.6.0
|
version = 0.7.0
|
||||||
author = Daniil Fajnberg
|
author = Daniil Fajnberg
|
||||||
author_email = mail@daniil.fajnberg.de
|
author_email = mail@daniil.fajnberg.de
|
||||||
description = Dynamically manage pools of asyncio tasks
|
description = Dynamically manage pools of asyncio tasks
|
||||||
|
@ -19,5 +19,5 @@ Brings the main classes up to package level for import convenience.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
from .control.server import TCPControlServer, UnixControlServer
|
||||||
from .pool import TaskPool, SimpleTaskPool
|
from .pool import TaskPool, SimpleTaskPool
|
||||||
from .server import TCPControlServer, UnixControlServer
|
|
||||||
|
0
src/asyncio_taskpool/control/__init__.py
Normal file
0
src/asyncio_taskpool/control/__init__.py
Normal file
@ -19,37 +19,30 @@ CLI client entry point.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import sys
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from asyncio import run
|
from asyncio import run
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Any
|
from typing import Any, Dict, Sequence
|
||||||
|
|
||||||
|
from ..constants import PACKAGE_NAME
|
||||||
|
from ..pool import TaskPool
|
||||||
from .client import ControlClient, TCPControlClient, UnixControlClient
|
from .client import ControlClient, TCPControlClient, UnixControlClient
|
||||||
from .constants import PACKAGE_NAME
|
|
||||||
from .pool import TaskPool
|
|
||||||
from .server import TCPControlServer, UnixControlServer
|
from .server import TCPControlServer, UnixControlServer
|
||||||
|
|
||||||
|
|
||||||
CONN_TYPE = 'conn_type'
|
CLIENT_CLASS = 'client_class'
|
||||||
UNIX, TCP = 'unix', 'tcp'
|
UNIX, TCP = 'unix', 'tcp'
|
||||||
SOCKET_PATH = 'path'
|
SOCKET_PATH = 'path'
|
||||||
HOST, PORT = 'host', 'port'
|
HOST, PORT = 'host', 'port'
|
||||||
|
|
||||||
|
|
||||||
def parse_cli() -> Dict[str, Any]:
|
def parse_cli(args: Sequence[str] = None) -> Dict[str, Any]:
|
||||||
parser = ArgumentParser(
|
parser = ArgumentParser(
|
||||||
prog=PACKAGE_NAME,
|
prog=f'{PACKAGE_NAME}.control',
|
||||||
description=f"CLI based {ControlClient.__name__} for {PACKAGE_NAME}"
|
description=f"Simple CLI based {ControlClient.__name__} for {PACKAGE_NAME}"
|
||||||
)
|
|
||||||
subparsers = parser.add_subparsers(title="Connection types", dest=CONN_TYPE)
|
|
||||||
unix_parser = subparsers.add_parser(UNIX, help="Connect via unix socket")
|
|
||||||
unix_parser.add_argument(
|
|
||||||
SOCKET_PATH,
|
|
||||||
type=Path,
|
|
||||||
help=f"Path to the unix socket on which the {UnixControlServer.__name__} for the {TaskPool.__name__} is "
|
|
||||||
f"listening."
|
|
||||||
)
|
)
|
||||||
|
subparsers = parser.add_subparsers(title="Connection types")
|
||||||
|
|
||||||
tcp_parser = subparsers.add_parser(TCP, help="Connect via TCP socket")
|
tcp_parser = subparsers.add_parser(TCP, help="Connect via TCP socket")
|
||||||
tcp_parser.add_argument(
|
tcp_parser.add_argument(
|
||||||
HOST,
|
HOST,
|
||||||
@ -60,19 +53,25 @@ def parse_cli() -> Dict[str, Any]:
|
|||||||
type=int,
|
type=int,
|
||||||
help=f"Port that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on."
|
help=f"Port that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on."
|
||||||
)
|
)
|
||||||
return vars(parser.parse_args())
|
tcp_parser.set_defaults(**{CLIENT_CLASS: TCPControlClient})
|
||||||
|
|
||||||
|
unix_parser = subparsers.add_parser(UNIX, help="Connect via unix socket")
|
||||||
|
unix_parser.add_argument(
|
||||||
|
SOCKET_PATH,
|
||||||
|
type=Path,
|
||||||
|
help=f"Path to the unix socket on which the {UnixControlServer.__name__} for the {TaskPool.__name__} is "
|
||||||
|
f"listening."
|
||||||
|
)
|
||||||
|
unix_parser.set_defaults(**{CLIENT_CLASS: UnixControlClient})
|
||||||
|
|
||||||
|
return vars(parser.parse_args(args))
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
kwargs = parse_cli()
|
kwargs = parse_cli()
|
||||||
if kwargs[CONN_TYPE] == UNIX:
|
client_cls = kwargs.pop(CLIENT_CLASS)
|
||||||
client = UnixControlClient(socket_path=kwargs[SOCKET_PATH])
|
await client_cls(**kwargs).start()
|
||||||
elif kwargs[CONN_TYPE] == TCP:
|
|
||||||
client = TCPControlClient(host=kwargs[HOST], port=kwargs[PORT])
|
|
||||||
else:
|
|
||||||
print("Invalid connection type", file=sys.stderr)
|
|
||||||
sys.exit(2)
|
|
||||||
await client.start()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
run(main())
|
run(main())
|
@ -27,8 +27,8 @@ from asyncio.streams import StreamReader, StreamWriter, open_connection
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union
|
||||||
|
|
||||||
from .constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES
|
from ..constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES
|
||||||
from .types import ClientConnT, PathT
|
from ..types import ClientConnT, PathT
|
||||||
|
|
||||||
|
|
||||||
class ControlClient(ABC):
|
class ControlClient(ABC):
|
||||||
@ -41,7 +41,7 @@ class ControlClient(ABC):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def client_info() -> dict:
|
def _client_info() -> dict:
|
||||||
"""Returns a dictionary of client information relevant for the handshake with the server."""
|
"""Returns a dictionary of client information relevant for the handshake with the server."""
|
||||||
return {CLIENT_INFO.TERMINAL_WIDTH: shutil.get_terminal_size().columns}
|
return {CLIENT_INFO.TERMINAL_WIDTH: shutil.get_terminal_size().columns}
|
||||||
|
|
||||||
@ -73,9 +73,10 @@ class ControlClient(ABC):
|
|||||||
writer: The `asyncio.StreamWriter` returned by the `_open_connection()` method
|
writer: The `asyncio.StreamWriter` returned by the `_open_connection()` method
|
||||||
"""
|
"""
|
||||||
self._connected = True
|
self._connected = True
|
||||||
writer.write(json.dumps(self.client_info()).encode())
|
writer.write(json.dumps(self._client_info()).encode())
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
|
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
|
||||||
|
print("Type '-h' to get help and usage instructions for all available commands.\n")
|
||||||
|
|
||||||
def _get_command(self, writer: StreamWriter) -> Optional[str]:
|
def _get_command(self, writer: StreamWriter) -> Optional[str]:
|
||||||
"""
|
"""
|
@ -20,14 +20,16 @@ This module contains the the definition of the `ControlParser` class used by a c
|
|||||||
|
|
||||||
|
|
||||||
from argparse import Action, ArgumentParser, ArgumentDefaultsHelpFormatter, HelpFormatter, SUPPRESS
|
from argparse import Action, ArgumentParser, ArgumentDefaultsHelpFormatter, HelpFormatter, SUPPRESS
|
||||||
|
from ast import literal_eval
|
||||||
from asyncio.streams import StreamWriter
|
from asyncio.streams import StreamWriter
|
||||||
from inspect import Parameter, getmembers, isfunction, signature
|
from inspect import Parameter, getmembers, isfunction, signature
|
||||||
from shutil import get_terminal_size
|
from shutil import get_terminal_size
|
||||||
from typing import Callable, Container, Dict, Set, Type, TypeVar
|
from typing import Any, Callable, Container, Dict, Iterable, Set, Type, TypeVar
|
||||||
|
|
||||||
from .constants import CLIENT_INFO, CMD, STREAM_WRITER
|
from ..constants import CLIENT_INFO, CMD, STREAM_WRITER
|
||||||
from .exceptions import HelpRequested
|
from ..exceptions import HelpRequested, ParserError
|
||||||
from .helpers import get_first_doc_line
|
from ..helpers import get_first_doc_line, resolve_dotted_path
|
||||||
|
from ..types import ArgsT, CancelCB, CoroutineFunc, EndCB, KwArgsT
|
||||||
|
|
||||||
|
|
||||||
FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter])
|
FmtCls = TypeVar('FmtCls', bound=Type[HelpFormatter])
|
||||||
@ -35,7 +37,6 @@ ParsersDict = Dict[str, 'ControlParser']
|
|||||||
|
|
||||||
OMIT_PARAMS_DEFAULT = ('self', )
|
OMIT_PARAMS_DEFAULT = ('self', )
|
||||||
|
|
||||||
FORMATTER_CLASS = 'formatter_class'
|
|
||||||
NAME, PROG, HELP, DESCRIPTION = 'name', 'prog', 'help', 'description'
|
NAME, PROG, HELP, DESCRIPTION = 'name', 'prog', 'help', 'description'
|
||||||
|
|
||||||
|
|
||||||
@ -79,24 +80,23 @@ class ControlParser(ArgumentParser):
|
|||||||
def __init__(self, stream_writer: StreamWriter, terminal_width: int = None,
|
def __init__(self, stream_writer: StreamWriter, terminal_width: int = None,
|
||||||
**kwargs) -> None:
|
**kwargs) -> None:
|
||||||
"""
|
"""
|
||||||
Sets additional internal attributes depending on whether a parent-parser was defined.
|
Subclass of the `ArgumentParser` geared towards asynchronous interaction with an object "from the outside".
|
||||||
|
|
||||||
The `help_formatter_factory` is called and the returned class is mapped to the `FORMATTER_CLASS` keyword.
|
Allows directing output to a specified writer rather than stdout/stderr and setting terminal width explicitly.
|
||||||
By default, `exit_on_error` is set to `False` (as opposed to how the parent class handles it).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stream_writer:
|
stream_writer:
|
||||||
The instance of the `asyncio.StreamWriter` to use for message output.
|
The instance of the `asyncio.StreamWriter` to use for message output.
|
||||||
terminal_width (optional):
|
terminal_width (optional):
|
||||||
The terminal width to assume for all message formatting. Defaults to `shutil.get_terminal_size`.
|
The terminal width to use for all message formatting. Defaults to `shutil.get_terminal_size().columns`.
|
||||||
**kwargs(optional):
|
**kwargs(optional):
|
||||||
In addition to the regular `ArgumentParser` constructor parameters, this method expects the instance of
|
Passed to the parent class constructor. The exception is the `formatter_class` parameter: Even if a
|
||||||
the `StreamWriter` as well as the terminal width both to be passed explicitly, if the `parent` argument
|
class is specified, it will always be subclassed in the `help_formatter_factory`.
|
||||||
is empty.
|
Also, by default, `exit_on_error` is set to `False` (as opposed to how the parent class handles it).
|
||||||
"""
|
"""
|
||||||
self._stream_writer: StreamWriter = stream_writer
|
self._stream_writer: StreamWriter = stream_writer
|
||||||
self._terminal_width: int = terminal_width if terminal_width is not None else get_terminal_size().columns
|
self._terminal_width: int = terminal_width if terminal_width is not None else get_terminal_size().columns
|
||||||
kwargs[FORMATTER_CLASS] = self.help_formatter_factory(self._terminal_width, kwargs.get(FORMATTER_CLASS))
|
kwargs['formatter_class'] = self.help_formatter_factory(self._terminal_width, kwargs.get('formatter_class'))
|
||||||
kwargs.setdefault('exit_on_error', False)
|
kwargs.setdefault('exit_on_error', False)
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self._flags: Set[str] = set()
|
self._flags: Set[str] = set()
|
||||||
@ -219,7 +219,7 @@ class ControlParser(ArgumentParser):
|
|||||||
def error(self, message: str) -> None:
|
def error(self, message: str) -> None:
|
||||||
"""This just adds the custom `HelpRequested` exception after the parent class' method."""
|
"""This just adds the custom `HelpRequested` exception after the parent class' method."""
|
||||||
super().error(message=message)
|
super().error(message=message)
|
||||||
raise HelpRequested
|
raise ParserError
|
||||||
|
|
||||||
def print_help(self, file=None) -> None:
|
def print_help(self, file=None) -> None:
|
||||||
"""This just adds the custom `HelpRequested` exception after the parent class' method."""
|
"""This just adds the custom `HelpRequested` exception after the parent class' method."""
|
||||||
@ -267,9 +267,8 @@ class ControlParser(ArgumentParser):
|
|||||||
# This is to be able to later unpack an arbitrary number of positional arguments.
|
# This is to be able to later unpack an arbitrary number of positional arguments.
|
||||||
kwargs.setdefault('nargs', '*')
|
kwargs.setdefault('nargs', '*')
|
||||||
if not kwargs.get('action') == 'store_true':
|
if not kwargs.get('action') == 'store_true':
|
||||||
# The lambda wrapper around the type annotation is to avoid ValueError being raised on suppressed arguments.
|
# Set the type from the parameter annotation.
|
||||||
# See: https://bugs.python.org/issue36078
|
kwargs.setdefault('type', _get_type_from_annotation(parameter.annotation))
|
||||||
kwargs.setdefault('type', get_arg_type_wrapper(parameter.annotation))
|
|
||||||
return self.add_argument(*name_or_flags, **kwargs)
|
return self.add_argument(*name_or_flags, **kwargs)
|
||||||
|
|
||||||
def add_function_args(self, function: Callable, omit: Container[str] = OMIT_PARAMS_DEFAULT) -> None:
|
def add_function_args(self, function: Callable, omit: Container[str] = OMIT_PARAMS_DEFAULT) -> None:
|
||||||
@ -293,7 +292,21 @@ class ControlParser(ArgumentParser):
|
|||||||
self.add_function_arg(param, help=repr(param.annotation))
|
self.add_function_arg(param, help=repr(param.annotation))
|
||||||
|
|
||||||
|
|
||||||
def get_arg_type_wrapper(cls: Type) -> Callable:
|
def _get_arg_type_wrapper(cls: Type) -> Callable[[Any], Any]:
|
||||||
def wrapper(arg):
|
"""
|
||||||
return arg if arg is SUPPRESS else cls(arg)
|
Returns a wrapper for the constructor of `cls` to avoid a ValueError being raised on suppressed arguments.
|
||||||
|
|
||||||
|
See: https://bugs.python.org/issue36078
|
||||||
|
"""
|
||||||
|
def wrapper(arg: Any) -> Any: return arg if arg is SUPPRESS else cls(arg)
|
||||||
|
# Copy the name of the class to maintain useful help messages when incorrect arguments are passed.
|
||||||
|
wrapper.__name__ = cls.__name__
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def _get_type_from_annotation(annotation: Type) -> Callable[[Any], Any]:
|
||||||
|
if any(annotation is t for t in {CoroutineFunc, EndCB, CancelCB}):
|
||||||
|
annotation = resolve_dotted_path
|
||||||
|
if any(annotation is t for t in {ArgsT, KwArgsT, Iterable[ArgsT], Iterable[KwArgsT]}):
|
||||||
|
annotation = literal_eval
|
||||||
|
return _get_arg_type_wrapper(annotation)
|
@ -28,16 +28,16 @@ from asyncio.tasks import Task, create_task
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Union
|
from typing import Optional, Union
|
||||||
|
|
||||||
|
from ..pool import TaskPool, SimpleTaskPool
|
||||||
|
from ..types import ConnectedCallbackT
|
||||||
from .client import ControlClient, TCPControlClient, UnixControlClient
|
from .client import ControlClient, TCPControlClient, UnixControlClient
|
||||||
from .pool import TaskPool, SimpleTaskPool
|
|
||||||
from .session import ControlSession
|
from .session import ControlSession
|
||||||
from .types import ConnectedCallbackT
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ControlServer(ABC): # TODO: Implement interface for normal TaskPool instances, not just SimpleTaskPool
|
class ControlServer(ABC):
|
||||||
"""
|
"""
|
||||||
Abstract base class for a task pool control server.
|
Abstract base class for a task pool control server.
|
||||||
|
|
||||||
@ -125,6 +125,7 @@ class ControlServer(ABC): # TODO: Implement interface for normal TaskPool insta
|
|||||||
async def serve_forever(self) -> Task:
|
async def serve_forever(self) -> Task:
|
||||||
"""
|
"""
|
||||||
This method actually starts the server and begins listening to client connections on the specified interface.
|
This method actually starts the server and begins listening to client connections on the specified interface.
|
||||||
|
|
||||||
It should never block because the serving will be performed in a separate task.
|
It should never block because the serving will be performed in a separate task.
|
||||||
"""
|
"""
|
||||||
log.debug("Starting %s...", self.__class__.__name__)
|
log.debug("Starting %s...", self.__class__.__name__)
|
||||||
@ -136,7 +137,7 @@ class TCPControlServer(ControlServer):
|
|||||||
"""Task pool control server class that exposes a TCP socket for control clients to connect to."""
|
"""Task pool control server class that exposes a TCP socket for control clients to connect to."""
|
||||||
_client_class = TCPControlClient
|
_client_class = TCPControlClient
|
||||||
|
|
||||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
|
||||||
self._host = server_kwargs.pop('host')
|
self._host = server_kwargs.pop('host')
|
||||||
self._port = server_kwargs.pop('port')
|
self._port = server_kwargs.pop('port')
|
||||||
super().__init__(pool, **server_kwargs)
|
super().__init__(pool, **server_kwargs)
|
||||||
@ -154,7 +155,7 @@ class UnixControlServer(ControlServer):
|
|||||||
"""Task pool control server class that exposes a unix socket for control clients to connect to."""
|
"""Task pool control server class that exposes a unix socket for control clients to connect to."""
|
||||||
_client_class = UnixControlClient
|
_client_class = UnixControlClient
|
||||||
|
|
||||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
|
||||||
from asyncio.streams import start_unix_server
|
from asyncio.streams import start_unix_server
|
||||||
self._start_unix_server = start_unix_server
|
self._start_unix_server = start_unix_server
|
||||||
self._socket_path = Path(server_kwargs.pop('path'))
|
self._socket_path = Path(server_kwargs.pop('path'))
|
@ -26,10 +26,10 @@ from asyncio.streams import StreamReader, StreamWriter
|
|||||||
from inspect import isfunction, signature
|
from inspect import isfunction, signature
|
||||||
from typing import Callable, Optional, Union, TYPE_CHECKING
|
from typing import Callable, Optional, Union, TYPE_CHECKING
|
||||||
|
|
||||||
from .constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES, STREAM_WRITER
|
from ..constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES, STREAM_WRITER
|
||||||
from .exceptions import HelpRequested
|
from ..exceptions import CommandError, HelpRequested, ParserError
|
||||||
from .helpers import return_or_exception
|
from ..helpers import return_or_exception
|
||||||
from .pool import TaskPool, SimpleTaskPool
|
from ..pool import TaskPool, SimpleTaskPool
|
||||||
from .parser import ControlParser
|
from .parser import ControlParser
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@ -85,7 +85,7 @@ class ControlSession:
|
|||||||
Must correspond to the arguments expected by the `method`.
|
Must correspond to the arguments expected by the `method`.
|
||||||
Correctly unpacks arbitrary-length positional and keyword-arguments.
|
Correctly unpacks arbitrary-length positional and keyword-arguments.
|
||||||
"""
|
"""
|
||||||
log.warning("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__)
|
log.debug("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__)
|
||||||
normal_pos, var_pos = [], []
|
normal_pos, var_pos = [], []
|
||||||
for param in signature(method).parameters.values():
|
for param in signature(method).parameters.values():
|
||||||
if param.name == 'self':
|
if param.name == 'self':
|
||||||
@ -112,11 +112,11 @@ class ControlSession:
|
|||||||
executed and the response written to the stream will be its return value (as an encoded string).
|
executed and the response written to the stream will be its return value (as an encoded string).
|
||||||
"""
|
"""
|
||||||
if kwargs:
|
if kwargs:
|
||||||
log.warning("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
|
log.debug("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
|
||||||
await return_or_exception(prop.fset, self._pool, **kwargs)
|
await return_or_exception(prop.fset, self._pool, **kwargs)
|
||||||
self._writer.write(CMD_OK)
|
self._writer.write(CMD_OK)
|
||||||
else:
|
else:
|
||||||
log.warning("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
|
log.debug("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
|
||||||
self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode())
|
self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode())
|
||||||
|
|
||||||
async def client_handshake(self) -> None:
|
async def client_handshake(self) -> None:
|
||||||
@ -131,7 +131,7 @@ class ControlSession:
|
|||||||
STREAM_WRITER: self._writer,
|
STREAM_WRITER: self._writer,
|
||||||
CLIENT_INFO.TERMINAL_WIDTH: client_info[CLIENT_INFO.TERMINAL_WIDTH],
|
CLIENT_INFO.TERMINAL_WIDTH: client_info[CLIENT_INFO.TERMINAL_WIDTH],
|
||||||
'prog': '',
|
'prog': '',
|
||||||
'usage': f'%(prog)s [-h] [{CMD}] ...'
|
'usage': f'[-h] [{CMD}] ...'
|
||||||
}
|
}
|
||||||
self._parser = ControlParser(**parser_kwargs)
|
self._parser = ControlParser(**parser_kwargs)
|
||||||
self._parser.add_subparsers(title="Commands",
|
self._parser.add_subparsers(title="Commands",
|
||||||
@ -154,15 +154,19 @@ class ControlSession:
|
|||||||
try:
|
try:
|
||||||
kwargs = vars(self._parser.parse_args(msg.split(' ')))
|
kwargs = vars(self._parser.parse_args(msg.split(' ')))
|
||||||
except ArgumentError as e:
|
except ArgumentError as e:
|
||||||
|
log.debug("%s got an ArgumentError", self._client_class_name)
|
||||||
self._writer.write(str(e).encode())
|
self._writer.write(str(e).encode())
|
||||||
return
|
return
|
||||||
except HelpRequested:
|
except (HelpRequested, ParserError):
|
||||||
|
log.debug("%s received usage help", self._client_class_name)
|
||||||
return
|
return
|
||||||
command = kwargs.pop(CMD)
|
command = kwargs.pop(CMD)
|
||||||
if isfunction(command):
|
if isfunction(command):
|
||||||
await self._exec_method_and_respond(command, **kwargs)
|
await self._exec_method_and_respond(command, **kwargs)
|
||||||
elif isinstance(command, property):
|
elif isinstance(command, property):
|
||||||
await self._exec_property_and_respond(command, **kwargs)
|
await self._exec_property_and_respond(command, **kwargs)
|
||||||
|
else:
|
||||||
|
self._writer.write(str(CommandError(f"Unknown command object: {command}")).encode())
|
||||||
|
|
||||||
async def listen(self) -> None:
|
async def listen(self) -> None:
|
||||||
"""
|
"""
|
@ -65,3 +65,11 @@ class ServerException(Exception):
|
|||||||
|
|
||||||
class HelpRequested(ServerException):
|
class HelpRequested(ServerException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ParserError(ServerException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommandError(ServerException):
|
||||||
|
pass
|
||||||
|
@ -15,12 +15,13 @@ You should have received a copy of the GNU Lesser General Public License along w
|
|||||||
If not, see <https://www.gnu.org/licenses/>."""
|
If not, see <https://www.gnu.org/licenses/>."""
|
||||||
|
|
||||||
__doc__ = """
|
__doc__ = """
|
||||||
Miscellaneous helper functions.
|
Miscellaneous helper functions. None of these should be considered part of the public API.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
from asyncio.coroutines import iscoroutinefunction
|
from asyncio.coroutines import iscoroutinefunction
|
||||||
from asyncio.queues import Queue
|
from asyncio.queues import Queue
|
||||||
|
from importlib import import_module
|
||||||
from inspect import getdoc
|
from inspect import getdoc
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
@ -63,3 +64,22 @@ async def return_or_exception(_function_to_execute: AnyCallableT, *args, **kwarg
|
|||||||
return _function_to_execute(*args, **kwargs)
|
return _function_to_execute(*args, **kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return e
|
return e
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_dotted_path(dotted_path: str) -> object:
|
||||||
|
"""
|
||||||
|
Resolves a dotted path to a global object and returns that object.
|
||||||
|
|
||||||
|
Algorithm shamelessly stolen from the `logging.config` module from the standard library.
|
||||||
|
"""
|
||||||
|
names = dotted_path.split('.')
|
||||||
|
module_name = names.pop(0)
|
||||||
|
found = import_module(module_name)
|
||||||
|
for name in names:
|
||||||
|
try:
|
||||||
|
found = getattr(found, name)
|
||||||
|
except AttributeError:
|
||||||
|
module_name += f'.{name}'
|
||||||
|
import_module(module_name)
|
||||||
|
found = getattr(found, name)
|
||||||
|
return found
|
||||||
|
@ -120,7 +120,7 @@ class BaseTaskPool:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def is_locked(self) -> bool:
|
def is_locked(self) -> bool:
|
||||||
"""Returns `True` if more the pool has been locked (see below)."""
|
"""Returns `True` if the pool has been locked (see below)."""
|
||||||
return self._locked
|
return self._locked
|
||||||
|
|
||||||
def lock(self) -> None:
|
def lock(self) -> None:
|
||||||
|
@ -53,6 +53,6 @@ class Queue(_Queue):
|
|||||||
Implements an asynchronous context manager for the queue.
|
Implements an asynchronous context manager for the queue.
|
||||||
|
|
||||||
Upon exiting `item_processed()` is called. This is why this context manager may not always be what you want,
|
Upon exiting `item_processed()` is called. This is why this context manager may not always be what you want,
|
||||||
but in some situations it makes the codes much cleaner.
|
but in some situations it makes the code much cleaner.
|
||||||
"""
|
"""
|
||||||
self.item_processed()
|
self.item_processed()
|
||||||
|
0
tests/test_control/__init__.py
Normal file
0
tests/test_control/__init__.py
Normal file
45
tests/test_control/test___main__.py
Normal file
45
tests/test_control/test___main__.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from unittest import IsolatedAsyncioTestCase
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
from asyncio_taskpool.control.client import TCPControlClient, UnixControlClient
|
||||||
|
from asyncio_taskpool.control import __main__ as module
|
||||||
|
|
||||||
|
|
||||||
|
class CLITestCase(IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
|
def test_parse_cli(self):
|
||||||
|
socket_path = '/some/path/to.sock'
|
||||||
|
args = [module.UNIX, socket_path]
|
||||||
|
expected_kwargs = {
|
||||||
|
module.CLIENT_CLASS: UnixControlClient,
|
||||||
|
module.SOCKET_PATH: Path(socket_path)
|
||||||
|
}
|
||||||
|
parsed_kwargs = module.parse_cli(args)
|
||||||
|
self.assertDictEqual(expected_kwargs, parsed_kwargs)
|
||||||
|
|
||||||
|
host, port = '1.2.3.4', '1234'
|
||||||
|
args = [module.TCP, host, port]
|
||||||
|
expected_kwargs = {
|
||||||
|
module.CLIENT_CLASS: TCPControlClient,
|
||||||
|
module.HOST: host,
|
||||||
|
module.PORT: int(port)
|
||||||
|
}
|
||||||
|
parsed_kwargs = module.parse_cli(args)
|
||||||
|
self.assertDictEqual(expected_kwargs, parsed_kwargs)
|
||||||
|
|
||||||
|
with patch('sys.stderr'):
|
||||||
|
with self.assertRaises(SystemExit):
|
||||||
|
module.parse_cli(['invalid', 'foo', 'bar'])
|
||||||
|
|
||||||
|
@patch.object(module, 'parse_cli')
|
||||||
|
async def test_main(self, mock_parse_cli: MagicMock):
|
||||||
|
mock_client_start = AsyncMock()
|
||||||
|
mock_client = MagicMock(start=mock_client_start)
|
||||||
|
mock_client_cls = MagicMock(return_value=mock_client)
|
||||||
|
mock_client_kwargs = {'foo': 123, 'bar': 456, 'baz': 789}
|
||||||
|
mock_parse_cli.return_value = {module.CLIENT_CLASS: mock_client_cls} | mock_client_kwargs
|
||||||
|
self.assertIsNone(await module.main())
|
||||||
|
mock_parse_cli.assert_called_once_with()
|
||||||
|
mock_client_cls.assert_called_once_with(**mock_client_kwargs)
|
||||||
|
mock_client_start.assert_awaited_once_with()
|
@ -25,9 +25,9 @@ import shutil
|
|||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest import IsolatedAsyncioTestCase, skipIf
|
from unittest import IsolatedAsyncioTestCase, skipIf
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, call, patch
|
||||||
|
|
||||||
from asyncio_taskpool import client
|
from asyncio_taskpool.control import client
|
||||||
from asyncio_taskpool.constants import CLIENT_INFO, SESSION_MSG_BYTES
|
from asyncio_taskpool.constants import CLIENT_INFO, SESSION_MSG_BYTES
|
||||||
|
|
||||||
|
|
||||||
@ -37,7 +37,7 @@ FOO, BAR = 'foo', 'bar'
|
|||||||
class ControlClientTestCase(IsolatedAsyncioTestCase):
|
class ControlClientTestCase(IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
self.abstract_patcher = patch('asyncio_taskpool.client.ControlClient.__abstractmethods__', set())
|
self.abstract_patcher = patch('asyncio_taskpool.control.client.ControlClient.__abstractmethods__', set())
|
||||||
self.print_patcher = patch.object(client, 'print')
|
self.print_patcher = patch.object(client, 'print')
|
||||||
self.mock_abstract_methods = self.abstract_patcher.start()
|
self.mock_abstract_methods = self.abstract_patcher.start()
|
||||||
self.mock_print = self.print_patcher.start()
|
self.mock_print = self.print_patcher.start()
|
||||||
@ -55,7 +55,7 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
|
|||||||
|
|
||||||
def test_client_info(self):
|
def test_client_info(self):
|
||||||
self.assertEqual({CLIENT_INFO.TERMINAL_WIDTH: shutil.get_terminal_size().columns},
|
self.assertEqual({CLIENT_INFO.TERMINAL_WIDTH: shutil.get_terminal_size().columns},
|
||||||
client.ControlClient.client_info())
|
client.ControlClient._client_info())
|
||||||
|
|
||||||
async def test_abstract(self):
|
async def test_abstract(self):
|
||||||
with self.assertRaises(NotImplementedError):
|
with self.assertRaises(NotImplementedError):
|
||||||
@ -65,16 +65,19 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
|
|||||||
self.assertEqual(self.kwargs, self.client._conn_kwargs)
|
self.assertEqual(self.kwargs, self.client._conn_kwargs)
|
||||||
self.assertFalse(self.client._connected)
|
self.assertFalse(self.client._connected)
|
||||||
|
|
||||||
@patch.object(client.ControlClient, 'client_info')
|
@patch.object(client.ControlClient, '_client_info')
|
||||||
async def test__server_handshake(self, mock_client_info: MagicMock):
|
async def test__server_handshake(self, mock__client_info: MagicMock):
|
||||||
mock_client_info.return_value = mock_info = {FOO: 1, BAR: 9999}
|
mock__client_info.return_value = mock_info = {FOO: 1, BAR: 9999}
|
||||||
self.assertIsNone(await self.client._server_handshake(self.mock_reader, self.mock_writer))
|
self.assertIsNone(await self.client._server_handshake(self.mock_reader, self.mock_writer))
|
||||||
self.assertTrue(self.client._connected)
|
self.assertTrue(self.client._connected)
|
||||||
mock_client_info.assert_called_once_with()
|
mock__client_info.assert_called_once_with()
|
||||||
self.mock_write.assert_called_once_with(json.dumps(mock_info).encode())
|
self.mock_write.assert_called_once_with(json.dumps(mock_info).encode())
|
||||||
self.mock_drain.assert_awaited_once_with()
|
self.mock_drain.assert_awaited_once_with()
|
||||||
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
|
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
|
||||||
self.mock_print.assert_called_once_with("Connected to", self.mock_read.return_value.decode())
|
self.mock_print.assert_has_calls([
|
||||||
|
call("Connected to", self.mock_read.return_value.decode()),
|
||||||
|
call("Type '-h' to get help and usage instructions for all available commands.\n")
|
||||||
|
])
|
||||||
|
|
||||||
@patch.object(client, 'input')
|
@patch.object(client, 'input')
|
||||||
def test__get_command(self, mock_input: MagicMock):
|
def test__get_command(self, mock_input: MagicMock):
|
||||||
@ -172,6 +175,43 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
|
|||||||
self.mock_print.assert_called_once_with("Disconnected from control server.")
|
self.mock_print.assert_called_once_with("Disconnected from control server.")
|
||||||
|
|
||||||
|
|
||||||
|
class TCPControlClientTestCase(IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.base_init_patcher = patch.object(client.ControlClient, '__init__')
|
||||||
|
self.mock_base_init = self.base_init_patcher.start()
|
||||||
|
self.host, self.port = 'localhost', 12345
|
||||||
|
self.kwargs = {FOO: 123, BAR: 456}
|
||||||
|
self.client = client.TCPControlClient(host=self.host, port=self.port, **self.kwargs)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.base_init_patcher.stop()
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
self.assertEqual(self.host, self.client._host)
|
||||||
|
self.assertEqual(self.port, self.client._port)
|
||||||
|
self.mock_base_init.assert_called_once_with(**self.kwargs)
|
||||||
|
|
||||||
|
@patch.object(client, 'print')
|
||||||
|
@patch.object(client, 'open_connection')
|
||||||
|
async def test__open_connection(self, mock_open_connection: AsyncMock, mock_print: MagicMock):
|
||||||
|
mock_open_connection.return_value = expected_output = 'something'
|
||||||
|
kwargs = {'a': 1, 'b': 2}
|
||||||
|
output = await self.client._open_connection(**kwargs)
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
mock_open_connection.assert_awaited_once_with(self.host, self.port, **kwargs)
|
||||||
|
mock_print.assert_not_called()
|
||||||
|
|
||||||
|
mock_open_connection.reset_mock()
|
||||||
|
|
||||||
|
mock_open_connection.side_effect = e = ConnectionError()
|
||||||
|
output1, output2 = await self.client._open_connection(**kwargs)
|
||||||
|
self.assertIsNone(output1)
|
||||||
|
self.assertIsNone(output2)
|
||||||
|
mock_open_connection.assert_awaited_once_with(self.host, self.port, **kwargs)
|
||||||
|
mock_print.assert_called_once_with(str(e), file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
|
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
|
||||||
class UnixControlClientTestCase(IsolatedAsyncioTestCase):
|
class UnixControlClientTestCase(IsolatedAsyncioTestCase):
|
||||||
|
|
@ -20,12 +20,16 @@ Unittests for the `asyncio_taskpool.control.parser` module.
|
|||||||
|
|
||||||
|
|
||||||
from argparse import ArgumentParser, HelpFormatter, ArgumentDefaultsHelpFormatter, RawTextHelpFormatter, SUPPRESS
|
from argparse import ArgumentParser, HelpFormatter, ArgumentDefaultsHelpFormatter, RawTextHelpFormatter, SUPPRESS
|
||||||
|
from ast import literal_eval
|
||||||
from inspect import signature
|
from inspect import signature
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from unittest.mock import MagicMock, call, patch
|
from unittest.mock import MagicMock, call, patch
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
from asyncio_taskpool import parser
|
from asyncio_taskpool.control import parser
|
||||||
from asyncio_taskpool.exceptions import HelpRequested
|
from asyncio_taskpool.exceptions import HelpRequested, ParserError
|
||||||
|
from asyncio_taskpool.helpers import resolve_dotted_path
|
||||||
|
from asyncio_taskpool.types import ArgsT, CancelCB, CoroutineFunc, EndCB, KwArgsT
|
||||||
|
|
||||||
|
|
||||||
FOO, BAR = 'foo', 'bar'
|
FOO, BAR = 'foo', 'bar'
|
||||||
@ -41,7 +45,7 @@ class ControlServerTestCase(TestCase):
|
|||||||
self.kwargs = {
|
self.kwargs = {
|
||||||
'stream_writer': self.stream_writer,
|
'stream_writer': self.stream_writer,
|
||||||
'terminal_width': self.terminal_width,
|
'terminal_width': self.terminal_width,
|
||||||
parser.FORMATTER_CLASS: FOO
|
'formatter_class': FOO
|
||||||
}
|
}
|
||||||
self.parser = parser.ControlParser(**self.kwargs)
|
self.parser = parser.ControlParser(**self.kwargs)
|
||||||
|
|
||||||
@ -157,6 +161,14 @@ class ControlServerTestCase(TestCase):
|
|||||||
mock_add_property_command.assert_called_once_with(FooBar.prop, FooBar.__name__, **common_kwargs)
|
mock_add_property_command.assert_called_once_with(FooBar.prop, FooBar.__name__, **common_kwargs)
|
||||||
mock_set_defaults.assert_has_calls([call(**{x: FooBar.method}), call(**{x: FooBar.prop})])
|
mock_set_defaults.assert_has_calls([call(**{x: FooBar.method}), call(**{x: FooBar.prop})])
|
||||||
|
|
||||||
|
@patch.object(parser.ArgumentParser, 'add_subparsers')
|
||||||
|
def test_add_subparsers(self, mock_base_add_subparsers: MagicMock):
|
||||||
|
args, kwargs = [1, 2, 42], {FOO: 123, BAR: 456}
|
||||||
|
mock_base_add_subparsers.return_value = mock_action = MagicMock()
|
||||||
|
output = self.parser.add_subparsers(*args, **kwargs)
|
||||||
|
self.assertEqual(mock_action, output)
|
||||||
|
mock_base_add_subparsers.assert_called_once_with(*args, **kwargs)
|
||||||
|
|
||||||
def test__print_message(self):
|
def test__print_message(self):
|
||||||
self.stream_writer.write = MagicMock()
|
self.stream_writer.write = MagicMock()
|
||||||
self.assertIsNone(self.parser._print_message(''))
|
self.assertIsNone(self.parser._print_message(''))
|
||||||
@ -175,7 +187,7 @@ class ControlServerTestCase(TestCase):
|
|||||||
|
|
||||||
@patch.object(parser.ArgumentParser, 'error')
|
@patch.object(parser.ArgumentParser, 'error')
|
||||||
def test_error(self, mock_supercls_error: MagicMock):
|
def test_error(self, mock_supercls_error: MagicMock):
|
||||||
with self.assertRaises(HelpRequested):
|
with self.assertRaises(ParserError):
|
||||||
self.parser.error(FOO + BAR)
|
self.parser.error(FOO + BAR)
|
||||||
mock_supercls_error.assert_called_once_with(message=FOO + BAR)
|
mock_supercls_error.assert_called_once_with(message=FOO + BAR)
|
||||||
|
|
||||||
@ -186,11 +198,11 @@ class ControlServerTestCase(TestCase):
|
|||||||
self.parser.print_help(arg)
|
self.parser.print_help(arg)
|
||||||
mock_print_help.assert_called_once_with(arg)
|
mock_print_help.assert_called_once_with(arg)
|
||||||
|
|
||||||
@patch.object(parser, 'get_arg_type_wrapper')
|
@patch.object(parser, '_get_type_from_annotation')
|
||||||
@patch.object(parser.ArgumentParser, 'add_argument')
|
@patch.object(parser.ArgumentParser, 'add_argument')
|
||||||
def test_add_function_arg(self, mock_add_argument: MagicMock, mock_get_arg_type_wrapper: MagicMock):
|
def test_add_function_arg(self, mock_add_argument: MagicMock, mock__get_type_from_annotation: MagicMock):
|
||||||
mock_add_argument.return_value = expected_output = 'action'
|
mock_add_argument.return_value = expected_output = 'action'
|
||||||
mock_get_arg_type_wrapper.return_value = mock_type = 'fake'
|
mock__get_type_from_annotation.return_value = mock_type = 'fake'
|
||||||
|
|
||||||
foo_type, args_type, bar_type, baz_type, boo_type = tuple, str, int, float, complex
|
foo_type, args_type, bar_type, baz_type, boo_type = tuple, str, int, float, complex
|
||||||
bar_default, baz_default, boo_default = 1, 0.1, 1j
|
bar_default, baz_default, boo_default = 1, 0.1, 1j
|
||||||
@ -203,42 +215,42 @@ class ControlServerTestCase(TestCase):
|
|||||||
kwargs = {FOO + BAR: 'xyz'}
|
kwargs = {FOO + BAR: 'xyz'}
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_foo, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_foo, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('foo', type=mock_type, **kwargs)
|
mock_add_argument.assert_called_once_with('foo', type=mock_type, **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_called_once_with(foo_type)
|
mock__get_type_from_annotation.assert_called_once_with(foo_type)
|
||||||
|
|
||||||
mock_add_argument.reset_mock()
|
mock_add_argument.reset_mock()
|
||||||
mock_get_arg_type_wrapper.reset_mock()
|
mock__get_type_from_annotation.reset_mock()
|
||||||
|
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_args, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_args, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('args', nargs='*', type=mock_type, **kwargs)
|
mock_add_argument.assert_called_once_with('args', nargs='*', type=mock_type, **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_called_once_with(args_type)
|
mock__get_type_from_annotation.assert_called_once_with(args_type)
|
||||||
|
|
||||||
mock_add_argument.reset_mock()
|
mock_add_argument.reset_mock()
|
||||||
mock_get_arg_type_wrapper.reset_mock()
|
mock__get_type_from_annotation.reset_mock()
|
||||||
|
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_bar, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_bar, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('-b', '--bar', default=bar_default, type=mock_type, **kwargs)
|
mock_add_argument.assert_called_once_with('-b', '--bar', default=bar_default, type=mock_type, **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_called_once_with(bar_type)
|
mock__get_type_from_annotation.assert_called_once_with(bar_type)
|
||||||
|
|
||||||
mock_add_argument.reset_mock()
|
mock_add_argument.reset_mock()
|
||||||
mock_get_arg_type_wrapper.reset_mock()
|
mock__get_type_from_annotation.reset_mock()
|
||||||
|
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_baz, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_baz, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('-B', '--baz', default=baz_default, type=mock_type, **kwargs)
|
mock_add_argument.assert_called_once_with('-B', '--baz', default=baz_default, type=mock_type, **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_called_once_with(baz_type)
|
mock__get_type_from_annotation.assert_called_once_with(baz_type)
|
||||||
|
|
||||||
mock_add_argument.reset_mock()
|
mock_add_argument.reset_mock()
|
||||||
mock_get_arg_type_wrapper.reset_mock()
|
mock__get_type_from_annotation.reset_mock()
|
||||||
|
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_boo, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_boo, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('--boo', default=boo_default, type=mock_type, **kwargs)
|
mock_add_argument.assert_called_once_with('--boo', default=boo_default, type=mock_type, **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_called_once_with(boo_type)
|
mock__get_type_from_annotation.assert_called_once_with(boo_type)
|
||||||
|
|
||||||
mock_add_argument.reset_mock()
|
mock_add_argument.reset_mock()
|
||||||
mock_get_arg_type_wrapper.reset_mock()
|
mock__get_type_from_annotation.reset_mock()
|
||||||
|
|
||||||
self.assertEqual(expected_output, self.parser.add_function_arg(param_flag, **kwargs))
|
self.assertEqual(expected_output, self.parser.add_function_arg(param_flag, **kwargs))
|
||||||
mock_add_argument.assert_called_once_with('-f', '--flag', action='store_true', **kwargs)
|
mock_add_argument.assert_called_once_with('-f', '--flag', action='store_true', **kwargs)
|
||||||
mock_get_arg_type_wrapper.assert_not_called()
|
mock__get_type_from_annotation.assert_not_called()
|
||||||
|
|
||||||
@patch.object(parser.ControlParser, 'add_function_arg')
|
@patch.object(parser.ControlParser, 'add_function_arg')
|
||||||
def test_add_function_args(self, mock_add_function_arg: MagicMock):
|
def test_add_function_args(self, mock_add_function_arg: MagicMock):
|
||||||
@ -253,7 +265,25 @@ class ControlServerTestCase(TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class RestTestCase(TestCase):
|
class RestTestCase(TestCase):
|
||||||
def test_get_arg_type_wrapper(self):
|
def test__get_arg_type_wrapper(self):
|
||||||
type_wrap = parser.get_arg_type_wrapper(int)
|
type_wrap = parser._get_arg_type_wrapper(int)
|
||||||
|
self.assertEqual('int', type_wrap.__name__)
|
||||||
self.assertEqual(SUPPRESS, type_wrap(SUPPRESS))
|
self.assertEqual(SUPPRESS, type_wrap(SUPPRESS))
|
||||||
self.assertEqual(13, type_wrap('13'))
|
self.assertEqual(13, type_wrap('13'))
|
||||||
|
|
||||||
|
@patch.object(parser, '_get_arg_type_wrapper')
|
||||||
|
def test__get_type_from_annotation(self, mock__get_arg_type_wrapper: MagicMock):
|
||||||
|
mock__get_arg_type_wrapper.return_value = expected_output = FOO + BAR
|
||||||
|
dotted_path_ann = [CoroutineFunc, EndCB, CancelCB]
|
||||||
|
literal_eval_ann = [ArgsT, KwArgsT, Iterable[ArgsT], Iterable[KwArgsT]]
|
||||||
|
any_other_ann = MagicMock()
|
||||||
|
for a in dotted_path_ann:
|
||||||
|
self.assertEqual(expected_output, parser._get_type_from_annotation(a))
|
||||||
|
mock__get_arg_type_wrapper.assert_has_calls(len(dotted_path_ann) * [call(resolve_dotted_path)])
|
||||||
|
mock__get_arg_type_wrapper.reset_mock()
|
||||||
|
for a in literal_eval_ann:
|
||||||
|
self.assertEqual(expected_output, parser._get_type_from_annotation(a))
|
||||||
|
mock__get_arg_type_wrapper.assert_has_calls(len(literal_eval_ann) * [call(literal_eval)])
|
||||||
|
mock__get_arg_type_wrapper.reset_mock()
|
||||||
|
self.assertEqual(expected_output, parser._get_type_from_annotation(any_other_ann))
|
||||||
|
mock__get_arg_type_wrapper.assert_called_once_with(any_other_ann)
|
@ -26,8 +26,8 @@ from pathlib import Path
|
|||||||
from unittest import IsolatedAsyncioTestCase, skipIf
|
from unittest import IsolatedAsyncioTestCase, skipIf
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
from asyncio_taskpool import server
|
from asyncio_taskpool.control import server
|
||||||
from asyncio_taskpool.client import ControlClient, UnixControlClient
|
from asyncio_taskpool.control.client import ControlClient, TCPControlClient, UnixControlClient
|
||||||
|
|
||||||
|
|
||||||
FOO, BAR = 'foo', 'bar'
|
FOO, BAR = 'foo', 'bar'
|
||||||
@ -46,7 +46,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
|
|||||||
server.log.setLevel(cls.log_lvl)
|
server.log.setLevel(cls.log_lvl)
|
||||||
|
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
self.abstract_patcher = patch('asyncio_taskpool.server.ControlServer.__abstractmethods__', set())
|
self.abstract_patcher = patch('asyncio_taskpool.control.server.ControlServer.__abstractmethods__', set())
|
||||||
self.mock_abstract_methods = self.abstract_patcher.start()
|
self.mock_abstract_methods = self.abstract_patcher.start()
|
||||||
self.mock_pool = MagicMock()
|
self.mock_pool = MagicMock()
|
||||||
self.kwargs = {FOO: 123, BAR: 456}
|
self.kwargs = {FOO: 123, BAR: 456}
|
||||||
@ -120,6 +120,50 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
|
|||||||
mock_create_task.assert_called_once_with(mock_awaitable)
|
mock_create_task.assert_called_once_with(mock_awaitable)
|
||||||
|
|
||||||
|
|
||||||
|
class TCPControlServerTestCase(IsolatedAsyncioTestCase):
|
||||||
|
log_lvl: int
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls) -> None:
|
||||||
|
cls.log_lvl = server.log.level
|
||||||
|
server.log.setLevel(999)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls) -> None:
|
||||||
|
server.log.setLevel(cls.log_lvl)
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.base_init_patcher = patch.object(server.ControlServer, '__init__')
|
||||||
|
self.mock_base_init = self.base_init_patcher.start()
|
||||||
|
self.mock_pool = MagicMock()
|
||||||
|
self.host, self.port = 'localhost', 12345
|
||||||
|
self.kwargs = {FOO: 123, BAR: 456}
|
||||||
|
self.server = server.TCPControlServer(pool=self.mock_pool, host=self.host, port=self.port, **self.kwargs)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
self.base_init_patcher.stop()
|
||||||
|
|
||||||
|
def test__client_class(self):
|
||||||
|
self.assertEqual(TCPControlClient, self.server._client_class)
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
self.assertEqual(self.host, self.server._host)
|
||||||
|
self.assertEqual(self.port, self.server._port)
|
||||||
|
self.mock_base_init.assert_called_once_with(self.mock_pool, **self.kwargs)
|
||||||
|
|
||||||
|
@patch.object(server, 'start_server')
|
||||||
|
async def test__get_server_instance(self, mock_start_server: AsyncMock):
|
||||||
|
mock_start_server.return_value = expected_output = 'totally_a_server'
|
||||||
|
mock_callback, mock_kwargs = MagicMock(), {'a': 1, 'b': 2}
|
||||||
|
args = [mock_callback]
|
||||||
|
output = await self.server._get_server_instance(*args, **mock_kwargs)
|
||||||
|
self.assertEqual(expected_output, output)
|
||||||
|
mock_start_server.assert_called_once_with(mock_callback, self.host, self.port, **mock_kwargs)
|
||||||
|
|
||||||
|
def test__final_callback(self):
|
||||||
|
self.assertIsNone(self.server._final_callback())
|
||||||
|
|
||||||
|
|
||||||
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
|
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
|
||||||
class UnixControlServerTestCase(IsolatedAsyncioTestCase):
|
class UnixControlServerTestCase(IsolatedAsyncioTestCase):
|
||||||
log_lvl: int
|
log_lvl: int
|
@ -24,7 +24,7 @@ from argparse import ArgumentError, Namespace
|
|||||||
from unittest import IsolatedAsyncioTestCase
|
from unittest import IsolatedAsyncioTestCase
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch, call
|
from unittest.mock import AsyncMock, MagicMock, patch, call
|
||||||
|
|
||||||
from asyncio_taskpool import session
|
from asyncio_taskpool.control import session
|
||||||
from asyncio_taskpool.constants import CLIENT_INFO, CMD, SESSION_MSG_BYTES, STREAM_WRITER
|
from asyncio_taskpool.constants import CLIENT_INFO, CMD, SESSION_MSG_BYTES, STREAM_WRITER
|
||||||
from asyncio_taskpool.exceptions import HelpRequested
|
from asyncio_taskpool.exceptions import HelpRequested
|
||||||
from asyncio_taskpool.pool import SimpleTaskPool
|
from asyncio_taskpool.pool import SimpleTaskPool
|
||||||
@ -107,7 +107,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
|
|||||||
STREAM_WRITER: self.mock_writer,
|
STREAM_WRITER: self.mock_writer,
|
||||||
CLIENT_INFO.TERMINAL_WIDTH: width,
|
CLIENT_INFO.TERMINAL_WIDTH: width,
|
||||||
'prog': '',
|
'prog': '',
|
||||||
'usage': f'%(prog)s [-h] [{CMD}] ...'
|
'usage': f'[-h] [{CMD}] ...'
|
||||||
}
|
}
|
||||||
expected_subparsers_kwargs = {
|
expected_subparsers_kwargs = {
|
||||||
'title': "Commands",
|
'title': "Commands",
|
||||||
@ -142,9 +142,7 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
|
|||||||
mock__exec_method_and_respond.reset_mock()
|
mock__exec_method_and_respond.reset_mock()
|
||||||
mock_parse_args.reset_mock()
|
mock_parse_args.reset_mock()
|
||||||
|
|
||||||
mock_parse_args = MagicMock(return_value=Namespace(**{CMD: prop}, **kwargs))
|
mock_parse_args.return_value = Namespace(**{CMD: prop}, **kwargs)
|
||||||
self.session._parser = MagicMock(parse_args=mock_parse_args)
|
|
||||||
self.mock_writer.write = MagicMock()
|
|
||||||
self.assertIsNone(await self.session._parse_command(msg))
|
self.assertIsNone(await self.session._parse_command(msg))
|
||||||
mock_parse_args.assert_called_once_with(msg.split(' '))
|
mock_parse_args.assert_called_once_with(msg.split(' '))
|
||||||
self.mock_writer.write.assert_not_called()
|
self.mock_writer.write.assert_not_called()
|
||||||
@ -154,6 +152,21 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
|
|||||||
mock__exec_property_and_respond.reset_mock()
|
mock__exec_property_and_respond.reset_mock()
|
||||||
mock_parse_args.reset_mock()
|
mock_parse_args.reset_mock()
|
||||||
|
|
||||||
|
bad_command = 'definitely not a function or property'
|
||||||
|
mock_parse_args.return_value = Namespace(**{CMD: bad_command}, **kwargs)
|
||||||
|
with patch.object(session, 'CommandError') as cmd_err_cls:
|
||||||
|
cmd_err_cls.return_value = exc = MagicMock()
|
||||||
|
self.assertIsNone(await self.session._parse_command(msg))
|
||||||
|
cmd_err_cls.assert_called_once_with(f"Unknown command object: {bad_command}")
|
||||||
|
mock_parse_args.assert_called_once_with(msg.split(' '))
|
||||||
|
mock__exec_method_and_respond.assert_not_called()
|
||||||
|
mock__exec_property_and_respond.assert_not_called()
|
||||||
|
self.mock_writer.write.assert_called_once_with(str(exc).encode())
|
||||||
|
|
||||||
|
mock__exec_property_and_respond.reset_mock()
|
||||||
|
mock_parse_args.reset_mock()
|
||||||
|
self.mock_writer.write.reset_mock()
|
||||||
|
|
||||||
mock_parse_args.side_effect = exc = ArgumentError(MagicMock(), "oops")
|
mock_parse_args.side_effect = exc = ArgumentError(MagicMock(), "oops")
|
||||||
self.assertIsNone(await self.session._parse_command(msg))
|
self.assertIsNone(await self.session._parse_command(msg))
|
||||||
mock_parse_args.assert_called_once_with(msg.split(' '))
|
mock_parse_args.assert_called_once_with(msg.split(' '))
|
@ -20,7 +20,7 @@ Unittests for the `asyncio_taskpool.helpers` module.
|
|||||||
|
|
||||||
|
|
||||||
from unittest import IsolatedAsyncioTestCase
|
from unittest import IsolatedAsyncioTestCase
|
||||||
from unittest.mock import MagicMock, AsyncMock, NonCallableMagicMock
|
from unittest.mock import MagicMock, AsyncMock, NonCallableMagicMock, call, patch
|
||||||
|
|
||||||
from asyncio_taskpool import helpers
|
from asyncio_taskpool import helpers
|
||||||
|
|
||||||
@ -118,3 +118,13 @@ class HelpersTestCase(IsolatedAsyncioTestCase):
|
|||||||
output = await helpers.return_or_exception(mock_func, *args, **kwargs)
|
output = await helpers.return_or_exception(mock_func, *args, **kwargs)
|
||||||
self.assertEqual(test_exception, output)
|
self.assertEqual(test_exception, output)
|
||||||
mock_func.assert_called_once_with(*args, **kwargs)
|
mock_func.assert_called_once_with(*args, **kwargs)
|
||||||
|
|
||||||
|
def test_resolve_dotted_path(self):
|
||||||
|
from logging import WARNING
|
||||||
|
from urllib.request import urlopen
|
||||||
|
self.assertEqual(WARNING, helpers.resolve_dotted_path('logging.WARNING'))
|
||||||
|
self.assertEqual(urlopen, helpers.resolve_dotted_path('urllib.request.urlopen'))
|
||||||
|
with patch.object(helpers, 'import_module', return_value=object) as mock_import_module:
|
||||||
|
with self.assertRaises(AttributeError):
|
||||||
|
helpers.resolve_dotted_path('foo.bar.baz')
|
||||||
|
mock_import_module.assert_has_calls([call('foo'), call('foo.bar')])
|
||||||
|
@ -1,14 +1,18 @@
|
|||||||
# Using `asyncio-taskpool`
|
# Using `asyncio-taskpool`
|
||||||
|
|
||||||
|
## Contents
|
||||||
|
- [Contents](#contents)
|
||||||
|
- [Minimal example for `SimpleTaskPool`](#minimal-example-for-simpletaskpool)
|
||||||
|
- [Advanced example for `TaskPool`](#advanced-example-for-taskpool)
|
||||||
|
- [Control server example](#control-server-example)
|
||||||
|
|
||||||
## Minimal example for `SimpleTaskPool`
|
## Minimal example for `SimpleTaskPool`
|
||||||
|
|
||||||
With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run.
|
With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run.
|
||||||
|
|
||||||
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
|
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
|
||||||
|
|
||||||
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
|
The following demo script enables full log output first for additional clarity. It is complete and should work as is.
|
||||||
|
|
||||||
### Code
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import logging
|
import logging
|
||||||
@ -48,7 +52,9 @@ if __name__ == '__main__':
|
|||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
```
|
```
|
||||||
|
|
||||||
### Output
|
<details>
|
||||||
|
<summary>Output: (Click to expand)</summary>
|
||||||
|
|
||||||
```
|
```
|
||||||
SimpleTaskPool-0 initialized
|
SimpleTaskPool-0 initialized
|
||||||
Started SimpleTaskPool-0_Task-0
|
Started SimpleTaskPool-0_Task-0
|
||||||
@ -78,6 +84,7 @@ Ended SimpleTaskPool-0_Task-1
|
|||||||
> did 4
|
> did 4
|
||||||
> did 4
|
> did 4
|
||||||
```
|
```
|
||||||
|
</details>
|
||||||
|
|
||||||
## Advanced example for `TaskPool`
|
## Advanced example for `TaskPool`
|
||||||
|
|
||||||
@ -85,9 +92,7 @@ This time, we want to start tasks from _different_ coroutine functions **and** w
|
|||||||
|
|
||||||
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
|
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
|
||||||
|
|
||||||
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
|
The following demo script enables full log output first for additional clarity. It is complete and should work as is.
|
||||||
|
|
||||||
### Code
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import logging
|
import logging
|
||||||
@ -144,10 +149,9 @@ if __name__ == '__main__':
|
|||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
```
|
```
|
||||||
|
|
||||||
### Output
|
<details>
|
||||||
Additional comments for the output are provided with `<---` next to the output lines.
|
<summary>Output: (Click to expand)</summary>
|
||||||
|
|
||||||
(Keep in mind that the logger and `print` asynchronously write to `stdout`.)
|
|
||||||
```
|
```
|
||||||
TaskPool-0 initialized
|
TaskPool-0 initialized
|
||||||
Started TaskPool-0_Task-0
|
Started TaskPool-0_Task-0
|
||||||
@ -229,4 +233,37 @@ Ended TaskPool-0_Task-5
|
|||||||
> Done.
|
> Done.
|
||||||
```
|
```
|
||||||
|
|
||||||
|
(Added comments with `<---` next to the output lines.)
|
||||||
|
|
||||||
|
Keep in mind that the logger and `print` asynchronously write to `stdout`, so the order of lines in your output may be slightly different.
|
||||||
|
</details>
|
||||||
|
|
||||||
|
## Control server example
|
||||||
|
|
||||||
|
One of the main features of `asyncio-taskpool` is the ability to control a task pool "from the outside" at runtime.
|
||||||
|
|
||||||
|
The [example_server.py](./example_server.py) script launches a couple of worker tasks within a `SimpleTaskPool` instance and then starts a `TCPControlServer` instance for that task pool. The server is configured to locally bind to port `9999` and is stopped automatically after the "work" is done.
|
||||||
|
|
||||||
|
To run the script:
|
||||||
|
```shell
|
||||||
|
python usage/example_server.py
|
||||||
|
```
|
||||||
|
|
||||||
|
You can then connect to the server via the command line interface:
|
||||||
|
```shell
|
||||||
|
python -m asyncio_taskpool.control tcp localhost 9999
|
||||||
|
```
|
||||||
|
|
||||||
|
The CLI starts a `TCPControlClient` that connects to our example server. Once the connection is established, it gives you an input prompt allowing you to issue commands to the task pool:
|
||||||
|
```
|
||||||
|
Connected to SimpleTaskPool-0
|
||||||
|
Type '-h' to get help and usage instructions for all available commands.
|
||||||
|
|
||||||
|
>
|
||||||
|
```
|
||||||
|
|
||||||
|
It may be useful to run the server script and the client interface in two separate terminal windows side by side. The server script is configured with a verbose logger and will react to any commands issued by the client with detailed log messages in the terminal.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
© 2022 Daniil Fajnberg
|
© 2022 Daniil Fajnberg
|
||||||
|
@ -15,7 +15,7 @@ You should have received a copy of the GNU Lesser General Public License along w
|
|||||||
If not, see <https://www.gnu.org/licenses/>."""
|
If not, see <https://www.gnu.org/licenses/>."""
|
||||||
|
|
||||||
__doc__ = """
|
__doc__ = """
|
||||||
Working example of a UnixControlServer in combination with the SimpleTaskPool.
|
Working example of a TCPControlServer in combination with the SimpleTaskPool.
|
||||||
Use the main CLI client to interface at the socket.
|
Use the main CLI client to interface at the socket.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -65,12 +65,12 @@ async def main() -> None:
|
|||||||
# We just put some integers into our queue, since all our workers actually do, is print an item and sleep for a bit.
|
# We just put some integers into our queue, since all our workers actually do, is print an item and sleep for a bit.
|
||||||
for item in range(100):
|
for item in range(100):
|
||||||
q.put_nowait(item)
|
q.put_nowait(item)
|
||||||
pool = SimpleTaskPool(worker, (q,)) # initializes the pool
|
pool = SimpleTaskPool(worker, args=(q,)) # initializes the pool
|
||||||
await pool.start(3) # launches three worker tasks
|
await pool.start(3) # launches three worker tasks
|
||||||
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever()
|
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever()
|
||||||
# We block until `.task_done()` has been called once by our workers for every item placed into the queue.
|
# We block until `.task_done()` has been called once by our workers for every item placed into the queue.
|
||||||
await q.join()
|
await q.join()
|
||||||
# Since we don't need any "work" done anymore, we can lock our control server by cancelling the task.
|
# Since we don't need any "work" done anymore, we can get rid of our control server by cancelling the task.
|
||||||
control_server_task.cancel()
|
control_server_task.cancel()
|
||||||
# Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left,
|
# Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left,
|
||||||
# we can now safely cancel their tasks.
|
# we can now safely cancel their tasks.
|
||||||
|
Reference in New Issue
Block a user