Compare commits

..

No commits in common. "ce0f9a1f6562672d769939a06c6c0d2d867dd702" and "ed6badb08855e124e8848dc28df7f27a164d1526" have entirely different histories.

10 changed files with 31 additions and 84 deletions

View File

@ -5,6 +5,7 @@ omit =
.venv/* .venv/*
[report] [report]
fail_under = 100
show_missing = True show_missing = True
skip_covered = False skip_covered = False
exclude_lines = exclude_lines =

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = asyncio-taskpool name = asyncio-taskpool
version = 0.5.0 version = 0.4.0
author = Daniil Fajnberg author = Daniil Fajnberg
author_email = mail@daniil.fajnberg.de author_email = mail@daniil.fajnberg.de
description = Dynamically manage pools of asyncio tasks description = Dynamically manage pools of asyncio tasks

View File

@ -20,4 +20,4 @@ Brings the main classes up to package level for import convenience.
from .pool import TaskPool, SimpleTaskPool from .pool import TaskPool, SimpleTaskPool
from .server import TCPControlServer, UnixControlServer from .server import UnixControlServer

View File

@ -25,16 +25,15 @@ from asyncio import run
from pathlib import Path from pathlib import Path
from typing import Dict, Any from typing import Dict, Any
from .client import ControlClient, TCPControlClient, UnixControlClient from .client import ControlClient, UnixControlClient
from .constants import PACKAGE_NAME from .constants import PACKAGE_NAME
from .pool import TaskPool from .pool import TaskPool
from .server import TCPControlServer, UnixControlServer from .server import ControlServer
CONN_TYPE = 'conn_type' CONN_TYPE = 'conn_type'
UNIX, TCP = 'unix', 'tcp' UNIX, TCP = 'unix', 'tcp'
SOCKET_PATH = 'path' SOCKET_PATH = 'path'
HOST, PORT = 'host', 'port'
def parse_cli() -> Dict[str, Any]: def parse_cli() -> Dict[str, Any]:
@ -47,18 +46,7 @@ def parse_cli() -> Dict[str, Any]:
unix_parser.add_argument( unix_parser.add_argument(
SOCKET_PATH, SOCKET_PATH,
type=Path, type=Path,
help=f"Path to the unix socket on which the {UnixControlServer.__name__} for the {TaskPool.__name__} is " help=f"Path to the unix socket on which the {ControlServer.__name__} for the {TaskPool.__name__} is listening."
f"listening."
)
tcp_parser = subparsers.add_parser(TCP, help="Connect via TCP socket")
tcp_parser.add_argument(
HOST,
help=f"IP address or url that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on."
)
tcp_parser.add_argument(
PORT,
type=int,
help=f"Port that the {TCPControlServer.__name__} for the {TaskPool.__name__} is listening on."
) )
return vars(parser.parse_args()) return vars(parser.parse_args())
@ -68,7 +56,8 @@ async def main():
if kwargs[CONN_TYPE] == UNIX: if kwargs[CONN_TYPE] == UNIX:
client = UnixControlClient(socket_path=kwargs[SOCKET_PATH]) client = UnixControlClient(socket_path=kwargs[SOCKET_PATH])
elif kwargs[CONN_TYPE] == TCP: elif kwargs[CONN_TYPE] == TCP:
client = TCPControlClient(host=kwargs[HOST], port=kwargs[PORT]) # TODO: Implement the TCP client class
client = UnixControlClient(socket_path=kwargs[SOCKET_PATH])
else: else:
print("Invalid connection type", file=sys.stderr) print("Invalid connection type", file=sys.stderr)
sys.exit(2) sys.exit(2)

View File

@ -23,9 +23,9 @@ import json
import shutil import shutil
import sys import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from asyncio.streams import StreamReader, StreamWriter, open_connection from asyncio.streams import StreamReader, StreamWriter
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional
from .constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES from .constants import CLIENT_EXIT, CLIENT_INFO, SESSION_MSG_BYTES
from .types import ClientConnT, PathT from .types import ClientConnT, PathT
@ -50,8 +50,8 @@ class ControlClient(ABC):
""" """
Tries to connect to a socket using the provided arguments and return the associated reader-writer-pair. Tries to connect to a socket using the provided arguments and return the associated reader-writer-pair.
This method will be invoked by the public `start()` method with the pre-defined internal `_conn_kwargs` This method will be invoked by the public `start()` method with the pre-defined internal `_conn_kwargs` (unpacked)
(unpacked) as keyword-arguments. as keyword-arguments.
This method should return either a tuple of `asyncio.StreamReader` and `asyncio.StreamWriter` or a tuple of This method should return either a tuple of `asyncio.StreamReader` and `asyncio.StreamWriter` or a tuple of
`None` and `None`, if it failed to establish the defined connection. `None` and `None`, if it failed to establish the defined connection.
""" """
@ -144,34 +144,15 @@ class ControlClient(ABC):
print("Disconnected from control server.") print("Disconnected from control server.")
class TCPControlClient(ControlClient):
"""Task pool control client that expects a TCP socket to be exposed by the control server."""
def __init__(self, host: str, port: Union[int, str], **conn_kwargs) -> None:
"""In addition to what the base class does, `host` and `port` are expected as non-optional arguments."""
self._host = host
self._port = port
super().__init__(**conn_kwargs)
async def _open_connection(self, **kwargs) -> ClientConnT:
"""
Wrapper around the `asyncio.open_connection` function.
Returns a tuple of `None` and `None`, if the connection can not be established;
otherwise, the stream-reader and -writer tuple is returned.
"""
try:
return await open_connection(self._host, self._port, **kwargs)
except ConnectionError as e:
print(str(e), file=sys.stderr)
return None, None
class UnixControlClient(ControlClient): class UnixControlClient(ControlClient):
"""Task pool control client that expects a unix socket to be exposed by the control server.""" """Task pool control client that expects a unix socket to be exposed by the control server."""
def __init__(self, socket_path: PathT, **conn_kwargs) -> None: def __init__(self, socket_path: PathT, **conn_kwargs) -> None:
"""In addition to what the base class does, the `socket_path` is expected as a non-optional argument.""" """
In addition to what the base class does, the `socket_path` is expected as a non-optional argument.
The `_socket_path` attribute is set to the `Path` object created from the `socket_path` argument.
"""
from asyncio.streams import open_unix_connection from asyncio.streams import open_unix_connection
self._open_unix_connection = open_unix_connection self._open_unix_connection = open_unix_connection
self._socket_path = Path(socket_path) self._socket_path = Path(socket_path)

View File

@ -23,12 +23,12 @@ import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from asyncio import AbstractServer from asyncio import AbstractServer
from asyncio.exceptions import CancelledError from asyncio.exceptions import CancelledError
from asyncio.streams import StreamReader, StreamWriter, start_server from asyncio.streams import StreamReader, StreamWriter
from asyncio.tasks import Task, create_task from asyncio.tasks import Task, create_task
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
from .client import ControlClient, TCPControlClient, UnixControlClient from .client import ControlClient, UnixControlClient
from .pool import TaskPool, SimpleTaskPool from .pool import TaskPool, SimpleTaskPool
from .session import ControlSession from .session import ControlSession
from .types import ConnectedCallbackT from .types import ConnectedCallbackT
@ -132,24 +132,6 @@ class ControlServer(ABC): # TODO: Implement interface for normal TaskPool insta
return create_task(self._serve_forever()) return create_task(self._serve_forever())
class TCPControlServer(ControlServer):
"""Task pool control server class that exposes a TCP socket for control clients to connect to."""
_client_class = TCPControlClient
def __init__(self, pool: SimpleTaskPool, **server_kwargs) -> None:
self._host = server_kwargs.pop('host')
self._port = server_kwargs.pop('port')
super().__init__(pool, **server_kwargs)
async def _get_server_instance(self, client_connected_cb: ConnectedCallbackT, **kwargs) -> AbstractServer:
server = await start_server(client_connected_cb, self._host, self._port, **kwargs)
log.debug("Opened socket at %s:%s", self._host, self._port)
return server
def _final_callback(self) -> None:
log.debug("Closed socket at %s:%s", self._host, self._port)
class UnixControlServer(ControlServer): class UnixControlServer(ControlServer):
"""Task pool control server class that exposes a unix socket for control clients to connect to.""" """Task pool control server class that exposes a unix socket for control clients to connect to."""
_client_class = UnixControlClient _client_class = UnixControlClient

View File

@ -20,11 +20,10 @@ Unittests for the `asyncio_taskpool.client` module.
import json import json
import os
import shutil import shutil
import sys import sys
from pathlib import Path from pathlib import Path
from unittest import IsolatedAsyncioTestCase, skipIf from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from asyncio_taskpool import client from asyncio_taskpool import client
@ -172,7 +171,6 @@ class ControlClientTestCase(IsolatedAsyncioTestCase):
self.mock_print.assert_called_once_with("Disconnected from control server.") self.mock_print.assert_called_once_with("Disconnected from control server.")
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
class UnixControlClientTestCase(IsolatedAsyncioTestCase): class UnixControlClientTestCase(IsolatedAsyncioTestCase):
def setUp(self) -> None: def setUp(self) -> None:

View File

@ -21,9 +21,8 @@ Unittests for the `asyncio_taskpool.server` module.
import asyncio import asyncio
import logging import logging
import os
from pathlib import Path from pathlib import Path
from unittest import IsolatedAsyncioTestCase, skipIf from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from asyncio_taskpool import server from asyncio_taskpool import server
@ -120,7 +119,6 @@ class ControlServerTestCase(IsolatedAsyncioTestCase):
mock_create_task.assert_called_once_with(mock_awaitable) mock_create_task.assert_called_once_with(mock_awaitable)
@skipIf(os.name == 'nt', "No Unix sockets on Windows :(")
class UnixControlServerTestCase(IsolatedAsyncioTestCase): class UnixControlServerTestCase(IsolatedAsyncioTestCase):
log_lvl: int log_lvl: int

View File

@ -2,8 +2,6 @@
## Minimal example for `SimpleTaskPool` ## Minimal example for `SimpleTaskPool`
With a `SimpleTaskPool` the function to execute as well as the arguments with which to execute it must be defined during its initialization (and they cannot be changed later). The only control you have after initialization is how many of such tasks are being run.
The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all. The minimum required setup is a "worker" coroutine function that can do something asynchronously, and a main coroutine function that sets up the `SimpleTaskPool`, starts/stops the tasks as desired, and eventually awaits them all.
The following demo code enables full log output first for additional clarity. It is complete and should work as is. The following demo code enables full log output first for additional clarity. It is complete and should work as is.
@ -34,12 +32,12 @@ async def work(n: int) -> None:
async def main() -> None: async def main() -> None:
pool = SimpleTaskPool(work, args=(5,)) # initializes the pool; no work is being done yet pool = SimpleTaskPool(work, (5,)) # initializes the pool; no work is being done yet
await pool.start(3) # launches work tasks 0, 1, and 2 await pool.start(3) # launches work tasks 0, 1, and 2
await asyncio.sleep(1.5) # lets the tasks work for a bit await asyncio.sleep(1.5) # lets the tasks work for a bit
await pool.start() # launches work task 3 await pool.start() # launches work task 3
await asyncio.sleep(1.5) # lets the tasks work for a bit await asyncio.sleep(1.5) # lets the tasks work for a bit
pool.stop(2) # cancels tasks 3 and 2 (LIFO order) pool.stop(2) # cancels tasks 3 and 2
pool.lock() # required for the last line pool.lock() # required for the last line
await pool.gather_and_close() # awaits all tasks, then flushes the pool await pool.gather_and_close() # awaits all tasks, then flushes the pool
@ -116,19 +114,19 @@ async def other_work(a: int, b: int) -> None:
async def main() -> None: async def main() -> None:
# Initialize a new task pool instance and limit its size to 3 tasks. # Initialize a new task pool instance and limit its size to 3 tasks.
pool = TaskPool(3) pool = TaskPool(3)
# Queue up two tasks (IDs 0 and 1) to run concurrently (with the same keyword-arguments). # Queue up two tasks (IDs 0 and 1) to run concurrently (with the same positional arguments).
print("> Called `apply`") print("> Called `apply`")
await pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2) await pool.apply(work, kwargs={'start': 100, 'stop': 200, 'step': 10}, num=2)
# Let the tasks work for a bit. # Let the tasks work for a bit.
await asyncio.sleep(1.5) await asyncio.sleep(1.5)
# Now, let us enqueue four more tasks (which will receive IDs 2, 3, 4, and 5), each created with different # Now, let us enqueue four more tasks (which will receive IDs 2, 3, 4, and 5), each created with different
# positional arguments by using `starmap`, but we want no more than two of those to run concurrently. # positional arguments by using `starmap`, but have **no more than two of those** run concurrently.
# Since we set our pool size to 3, and already have two tasks working within the pool, # Since we set our pool size to 3, and already have two tasks working within the pool,
# only the first one of these will start immediately (and receive ID 2). # only the first one of these will start immediately (and receive ID 2).
# The second one will start (with ID 3), only once there is room in the pool, # The second one will start (with ID 3), only once there is room in the pool,
# which -- in this example -- will be the case after ID 2 ends. # which -- in this example -- will be the case after ID 2 ends.
# Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5) # Once there is room in the pool again, the third and fourth will each start (with IDs 4 and 5)
# only once there is room in the pool and no more than one other task of these new ones is running. # **only** once there is room in the pool **and** no more than one other task of these new ones is running.
args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] args_list = [(0, 10), (10, 20), (20, 30), (30, 40)]
await pool.starmap(other_work, args_list, group_size=2) await pool.starmap(other_work, args_list, group_size=2)
print("> Called `starmap`") print("> Called `starmap`")

View File

@ -23,7 +23,7 @@ Use the main CLI client to interface at the socket.
import asyncio import asyncio
import logging import logging
from asyncio_taskpool import SimpleTaskPool, TCPControlServer from asyncio_taskpool import SimpleTaskPool, UnixControlServer
from asyncio_taskpool.constants import PACKAGE_NAME from asyncio_taskpool.constants import PACKAGE_NAME
@ -34,11 +34,11 @@ logging.getLogger(PACKAGE_NAME).addHandler(logging.StreamHandler())
async def work(item: int) -> None: async def work(item: int) -> None:
"""The non-blocking sleep simulates something like an I/O operation that can be done asynchronously.""" """The non-blocking sleep simulates something like an I/O operation that can be done asynchronously."""
await asyncio.sleep(1) await asyncio.sleep(1)
print("worked on", item, flush=True) print("worked on", item)
async def worker(q: asyncio.Queue) -> None: async def worker(q: asyncio.Queue) -> None:
"""Simulates doing asynchronous work that takes a bit of time to finish.""" """Simulates doing asynchronous work that takes a little bit of time to finish."""
# We only want the worker to stop, when its task is cancelled; therefore we start an infinite loop. # We only want the worker to stop, when its task is cancelled; therefore we start an infinite loop.
while True: while True:
# We want to block here, until we can get the next item from the queue. # We want to block here, until we can get the next item from the queue.
@ -67,7 +67,7 @@ async def main() -> None:
q.put_nowait(item) q.put_nowait(item)
pool = SimpleTaskPool(worker, (q,)) # initializes the pool pool = SimpleTaskPool(worker, (q,)) # initializes the pool
await pool.start(3) # launches three worker tasks await pool.start(3) # launches three worker tasks
control_server_task = await TCPControlServer(pool, host='127.0.0.1', port=9999).serve_forever() control_server_task = await UnixControlServer(pool, path='/tmp/py_asyncio_taskpool.sock').serve_forever()
# We block until `.task_done()` has been called once by our workers for every item placed into the queue. # We block until `.task_done()` has been called once by our workers for every item placed into the queue.
await q.join() await q.join()
# Since we don't need any "work" done anymore, we can lock our control server by cancelling the task. # Since we don't need any "work" done anymore, we can lock our control server by cancelling the task.
@ -76,7 +76,7 @@ async def main() -> None:
# we can now safely cancel their tasks. # we can now safely cancel their tasks.
pool.lock() pool.lock()
pool.stop_all() pool.stop_all()
# Finally, we allow for all tasks to do their cleanup (as if they need to do any) upon being cancelled. # Finally we allow for all tasks to do do their cleanup, if they need to do any, upon being cancelled.
# We block until they all return or raise an exception, but since we are not interested in any of their exceptions, # We block until they all return or raise an exception, but since we are not interested in any of their exceptions,
# we just silently collect their exceptions along with their return values. # we just silently collect their exceptions along with their return values.
await pool.gather_and_close(return_exceptions=True) await pool.gather_and_close(return_exceptions=True)