generated from daniil-berg/boilerplate-py
improved and extended usage/readme docs; small fixes
This commit is contained in:
parent
3d84e1552b
commit
a92e646411
17
README.md
17
README.md
@ -2,9 +2,18 @@
|
||||
|
||||
**Dynamically manage pools of asyncio tasks**
|
||||
|
||||
## Contents
|
||||
- [Contents](#contents)
|
||||
- [Summary](#summary)
|
||||
- [Usage](#usage)
|
||||
- [Installation](#installation)
|
||||
- [Dependencies](#dependencies)
|
||||
- [Testing](#testing)
|
||||
- [License](#license)
|
||||
|
||||
## Summary
|
||||
|
||||
A task pool is an object with a simple interface for aggregating and dynamically managing asynchronous tasks.
|
||||
A **task pool** is an object with a simple interface for aggregating and dynamically managing asynchronous tasks.
|
||||
|
||||
With an interface that is intentionally similar to the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) class from the standard library, the `TaskPool` provides you such methods as `apply`, `map`, and `starmap` to execute coroutines concurrently as [`asyncio.Task`](https://docs.python.org/3/library/asyncio-task.html#task-object) objects. There is no limitation imposed on what kind of tasks can be run or in what combination, when new ones can be added, or when they can be cancelled.
|
||||
|
||||
@ -22,7 +31,7 @@ from asyncio_taskpool import SimpleTaskPool
|
||||
...
|
||||
|
||||
|
||||
async def work(foo, bar): ...
|
||||
async def work(_foo, _bar): ...
|
||||
|
||||
|
||||
...
|
||||
@ -55,7 +64,7 @@ Python Version 3.8+, tested on Linux
|
||||
|
||||
## Testing
|
||||
|
||||
Install `asyncio-taskpool[dev]` dependencies or just manually install `coverage` with `pip`.
|
||||
Install `asyncio-taskpool[dev]` dependencies or just manually install [`coverage`](https://coverage.readthedocs.io/en/latest/) with `pip`.
|
||||
Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests and receive the coverage report.
|
||||
|
||||
## License
|
||||
@ -64,6 +73,6 @@ Execute the [`./coverage.sh`](coverage.sh) shell script to run all unit tests an
|
||||
|
||||
The full license texts for the [GNU GPLv3.0](COPYING) and the [GNU LGPLv3.0](COPYING.LESSER) are included in this repository. If not, see https://www.gnu.org/licenses/.
|
||||
|
||||
## Copyright
|
||||
---
|
||||
|
||||
© 2022 Daniil Fajnberg
|
||||
|
@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = asyncio-taskpool
|
||||
version = 0.6.2
|
||||
version = 0.6.3
|
||||
author = Daniil Fajnberg
|
||||
author_email = mail@daniil.fajnberg.de
|
||||
description = Dynamically manage pools of asyncio tasks
|
||||
|
@ -76,6 +76,7 @@ class ControlClient(ABC):
|
||||
writer.write(json.dumps(self.client_info()).encode())
|
||||
await writer.drain()
|
||||
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
|
||||
print("Type '-h' to get help and usage instructions for all available commands.\n")
|
||||
|
||||
def _get_command(self, writer: StreamWriter) -> Optional[str]:
|
||||
"""
|
||||
|
@ -136,7 +136,7 @@ class TCPControlServer(ControlServer):
|
||||
"""Task pool control server class that exposes a TCP socket for control clients to connect to."""
|
||||
_client_class = TCPControlClient
|
||||
|
||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
||||
def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
|
||||
self._host = server_kwargs.pop('host')
|
||||
self._port = server_kwargs.pop('port')
|
||||
super().__init__(pool, **server_kwargs)
|
||||
@ -154,7 +154,7 @@ class UnixControlServer(ControlServer):
|
||||
"""Task pool control server class that exposes a unix socket for control clients to connect to."""
|
||||
_client_class = UnixControlClient
|
||||
|
||||
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
|
||||
def __init__(self, pool: Union[TaskPool, SimpleTaskPool], **server_kwargs) -> None:
|
||||
from asyncio.streams import start_unix_server
|
||||
self._start_unix_server = start_unix_server
|
||||
self._socket_path = Path(server_kwargs.pop('path'))
|
||||
|
@ -85,7 +85,7 @@ class ControlSession:
|
||||
Must correspond to the arguments expected by the `method`.
|
||||
Correctly unpacks arbitrary-length positional and keyword-arguments.
|
||||
"""
|
||||
log.warning("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__)
|
||||
log.debug("%s calls %s.%s", self._client_class_name, self._pool.__class__.__name__, method.__name__)
|
||||
normal_pos, var_pos = [], []
|
||||
for param in signature(method).parameters.values():
|
||||
if param.name == 'self':
|
||||
@ -112,11 +112,11 @@ class ControlSession:
|
||||
executed and the response written to the stream will be its return value (as an encoded string).
|
||||
"""
|
||||
if kwargs:
|
||||
log.warning("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
|
||||
log.debug("%s sets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fset.__name__)
|
||||
await return_or_exception(prop.fset, self._pool, **kwargs)
|
||||
self._writer.write(CMD_OK)
|
||||
else:
|
||||
log.warning("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
|
||||
log.debug("%s gets %s.%s", self._client_class_name, self._pool.__class__.__name__, prop.fget.__name__)
|
||||
self._writer.write(str(await return_or_exception(prop.fget, self._pool)).encode())
|
||||
|
||||
async def client_handshake(self) -> None:
|
||||
@ -154,9 +154,11 @@ class ControlSession:
|
||||
try:
|
||||
kwargs = vars(self._parser.parse_args(msg.split(' ')))
|
||||
except ArgumentError as e:
|
||||
log.debug("%s got an ArgumentError", self._client_class_name)
|
||||
self._writer.write(str(e).encode())
|
||||
return
|
||||
except HelpRequested:
|
||||
log.debug("%s received usage help", self._client_class_name)
|
||||
return
|
||||
command = kwargs.pop(CMD)
|
||||
if isfunction(command):
|
||||
|
@ -25,7 +25,7 @@ import shutil
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest import IsolatedAsyncioTestCase, skipIf
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock, call, patch
|
||||
|
||||
from asyncio_taskpool.control import client
|
||||
from asyncio_taskpool.constants import CLIENT_INFO, SESSION_MSG_BYTES
|
||||
@ -74,7 +74,10 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
|
||||
self.mock_write.assert_called_once_with(json.dumps(mock_info).encode())
|
||||
self.mock_drain.assert_awaited_once_with()
|
||||
self.mock_read.assert_awaited_once_with(SESSION_MSG_BYTES)
|
||||
self.mock_print.assert_called_once_with("Connected to", self.mock_read.return_value.decode())
|
||||
self.mock_print.assert_has_calls([
|
||||
call("Connected to", self.mock_read.return_value.decode()),
|
||||
call("Type '-h' to get help and usage instructions for all available commands.\n")
|
||||
])
|
||||
|
||||
@patch.object(client, 'input')
|
||||
def test__get_command(self, mock_input: MagicMock):
|
||||
|
@ -1,14 +1,18 @@
|
||||
# Using `asyncio-taskpool`
|
||||
|
||||
## Contents
|
||||
- [Contents](#contents)
|
||||
- [Minimal example for `SimpleTaskPool`](#minimal-example-for-simpletaskpool)
|
||||
- [Advanced example for `TaskPool`](#advanced-example-for-taskpool)
|
||||
- [Control server example](#control-server-example)
|
||||
|
||||
## Minimal example for `SimpleTaskPool`
|
||||
|
||||
With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run.
|
||||
|
||||
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
|
||||
|
||||
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
|
||||
|
||||
### Code
|
||||
The following demo script enables full log output first for additional clarity. It is complete and should work as is.
|
||||
|
||||
```python
|
||||
import logging
|
||||
@ -48,7 +52,9 @@ if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
### Output
|
||||
<details>
|
||||
<summary>Output: (Click to expand)</summary>
|
||||
|
||||
```
|
||||
SimpleTaskPool-0 initialized
|
||||
Started SimpleTaskPool-0_Task-0
|
||||
@ -78,6 +84,7 @@ Ended SimpleTaskPool-0_Task-1
|
||||
> did 4
|
||||
> did 4
|
||||
```
|
||||
</details>
|
||||
|
||||
## Advanced example for `TaskPool`
|
||||
|
||||
@ -85,9 +92,7 @@ This time, we want to start tasks from _different_ coroutine functions **and** w
|
||||
|
||||
As with the simple example, we need "worker" coroutine functions that can do something asynchronously, as well as a main coroutine function that sets up the pool, starts the tasks, and eventually awaits them.
|
||||
|
||||
The following demo code enables full log output first for additional clarity. It is complete and should work as is.
|
||||
|
||||
### Code
|
||||
The following demo script enables full log output first for additional clarity. It is complete and should work as is.
|
||||
|
||||
```python
|
||||
import logging
|
||||
@ -144,10 +149,9 @@ if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
### Output
|
||||
Additional comments for the output are provided with `<---` next to the output lines.
|
||||
<details>
|
||||
<summary>Output: (Click to expand)</summary>
|
||||
|
||||
(Keep in mind that the logger and `print` asynchronously write to `stdout`.)
|
||||
```
|
||||
TaskPool-0 initialized
|
||||
Started TaskPool-0_Task-0
|
||||
@ -229,4 +233,37 @@ Ended TaskPool-0_Task-5
|
||||
> Done.
|
||||
```
|
||||
|
||||
(Added comments with `<---` next to the output lines.)
|
||||
|
||||
Keep in mind that the logger and `print` asynchronously write to `stdout`, so the order of lines in your output may be slightly different.
|
||||
</details>
|
||||
|
||||
## Control server example
|
||||
|
||||
One of the main features of `asyncio_taskpool` is the ability to control a task pool "from the outside" at runtime.
|
||||
|
||||
The [example_server.py](./example_server.py) script launches a couple of worker tasks within a `SimpleTaskPool` instance and then starts a `TCPControlServer` instance for that task pool. The server is configured to locally bind to port `9999` and is stopped automatically after the "work" is done.
|
||||
|
||||
To run the script:
|
||||
```shell
|
||||
python usage/example_server.py
|
||||
```
|
||||
|
||||
You can then connect to the server via the command line interface:
|
||||
```shell
|
||||
python -m asyncio_taskpool.control tcp localhost 9999
|
||||
```
|
||||
|
||||
The CLI starts a `TCPControlClient` that connects to our example server. Once the connection is established, it gives you an input prompt allowing you to issue commands to the task pool:
|
||||
```
|
||||
Connected to SimpleTaskPool-0
|
||||
Type '-h' to get help and usage instructions for all available commands.
|
||||
|
||||
>
|
||||
```
|
||||
|
||||
It may be useful to run the server script and the client interface in two separate terminal windows side by side. The server script is configured with a verbose logger and will react to any commands issued by the client with detailed log messages in the terminal.
|
||||
|
||||
---
|
||||
|
||||
© 2022 Daniil Fajnberg
|
||||
|
@ -65,12 +65,12 @@ async def main() -> None:
|
||||
# We just put some integers into our queue, since all our workers actually do, is print an item and sleep for a bit.
|
||||
for item in range(100):
|
||||
q.put_nowait(item)
|
||||
pool = SimpleTaskPool(worker, (q,)) # initializes the pool
|
||||
pool = SimpleTaskPool(worker, args=(q,)) # initializes the pool
|
||||
await pool.start(3) # launches three worker tasks
|
||||
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever()
|
||||
# We block until `.task_done()` has been called once by our workers for every item placed into the queue.
|
||||
await q.join()
|
||||
# Since we don't need any "work" done anymore, we can lock our control server by cancelling the task.
|
||||
# Since we don't need any "work" done anymore, we can get rid of our control server by cancelling the task.
|
||||
control_server_task.cancel()
|
||||
# Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left,
|
||||
# we can now safely cancel their tasks.
|
||||
|
Loading…
Reference in New Issue
Block a user