From 1beb9fc9b0ddb533249458f30b6064a0bab5aa5b Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Tue, 29 Mar 2022 19:43:21 +0200 Subject: [PATCH] `gather_and_close` now automatically locks the pool --- README.md | 1 - docs/source/pages/pool.rst | 1 - src/asyncio_taskpool/exceptions.py | 4 ---- src/asyncio_taskpool/pool.py | 11 +++-------- tests/test_pool.py | 13 +++---------- usage/USAGE.md | 5 +---- usage/example_server.py | 1 - 7 files changed, 7 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index dc33cb2..76ee5e3 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,6 @@ async def main(): ... pool.stop(3) ... - pool.lock() await pool.gather_and_close() ... ``` diff --git a/docs/source/pages/pool.rst b/docs/source/pages/pool.rst index 0452dd7..cd5a353 100644 --- a/docs/source/pages/pool.rst +++ b/docs/source/pages/pool.rst @@ -143,7 +143,6 @@ Or we could use a task pool: pool = TaskPool() await pool.map(another_worker_function, data_iterator, num_concurrent=5) ... - pool.lock() await pool.gather_and_close() Calling the :py:meth:`.map() ` method this way ensures that there will **always** -- i.e. at any given moment in time -- be exactly 5 tasks working concurrently on our data (assuming no other pool interaction). diff --git a/src/asyncio_taskpool/exceptions.py b/src/asyncio_taskpool/exceptions.py index c911e08..357e81d 100644 --- a/src/asyncio_taskpool/exceptions.py +++ b/src/asyncio_taskpool/exceptions.py @@ -51,10 +51,6 @@ class InvalidGroupName(PoolException): pass -class PoolStillUnlocked(PoolException): - pass - - class NotCoroutine(PoolException): pass diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index d8007d3..6a56c57 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -450,9 +450,7 @@ class BaseTaskPool: """ Gathers (i.e. awaits) **all** tasks in the pool, then closes it. - After this method returns, no more tasks can be started in the pool. - - :meth:`lock` must have been called prior to this. + Once this method is called, no more tasks can be started in the pool. This method may block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of the callbacks registered for a task blocks for whatever reason. @@ -463,8 +461,7 @@ class BaseTaskPool: Raises: `PoolStillUnlocked`: The pool has not been locked yet. """ - if not self._locked: - raise exceptions.PoolStillUnlocked("Pool must be locked, before tasks can be gathered") + self.lock() await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), *self._tasks_running.values(), return_exceptions=return_exceptions) self._tasks_ended.clear() @@ -595,9 +592,7 @@ class TaskPool(BaseTaskPool): """ Gathers (i.e. awaits) **all** tasks in the pool, then closes it. - After this method returns, no more tasks can be started in the pool. - - The `lock()` method must have been called prior to this. + Once this method is called, no more tasks can be started in the pool. Note that this method may block indefinitely as long as any task in the pool is not done. This includes meta tasks launched by methods such as :meth:`map`, which end by themselves, only once the arguments iterator is diff --git a/tests/test_pool.py b/tests/test_pool.py index 6d89f0a..e669024 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -366,16 +366,9 @@ class BaseTaskPoolTestCase(CommonTestCase): async def test_gather_and_close(self): mock_running_func = AsyncMock() mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception) - self.task_pool._tasks_ended = ended = {123: mock_ended_func()} - self.task_pool._tasks_cancelled = cancelled = {456: mock_cancelled_func()} - self.task_pool._tasks_running = running = {789: mock_running_func()} - - with self.assertRaises(exceptions.PoolStillUnlocked): - await self.task_pool.gather_and_close() - self.assertDictEqual(ended, self.task_pool._tasks_ended) - self.assertDictEqual(cancelled, self.task_pool._tasks_cancelled) - self.assertDictEqual(running, self.task_pool._tasks_running) - self.assertFalse(self.task_pool._closed) + self.task_pool._tasks_ended = {123: mock_ended_func()} + self.task_pool._tasks_cancelled = {456: mock_cancelled_func()} + self.task_pool._tasks_running = {789: mock_running_func()} self.task_pool._locked = True self.assertIsNone(await self.task_pool.gather_and_close(return_exceptions=True)) diff --git a/usage/USAGE.md b/usage/USAGE.md index f982b1f..7f227ee 100644 --- a/usage/USAGE.md +++ b/usage/USAGE.md @@ -44,7 +44,6 @@ async def main() -> None: await pool.start(1) # launches work task 3 await asyncio.sleep(1.5) # lets the tasks work for a bit pool.stop(2) # cancels tasks 3 and 2 (LIFO order) - pool.lock() # required for the last line await pool.gather_and_close() # awaits all tasks, then flushes the pool @@ -137,9 +136,7 @@ async def main() -> None: args_list = [(0, 10), (10, 20), (20, 30), (30, 40)] await pool.starmap(other_work, args_list, num_concurrent=2) print("> Called `starmap`") - # Now we lock the pool, so that we can safely await all our tasks. - pool.lock() - # Finally, we block, until all tasks have ended. + # We block, until all tasks have ended. print("> Calling `gather_and_close`...") await pool.gather_and_close() print("> Done.") diff --git a/usage/example_server.py b/usage/example_server.py index 3c85107..454c413 100644 --- a/usage/example_server.py +++ b/usage/example_server.py @@ -75,7 +75,6 @@ async def main() -> None: control_server_task.cancel() # Since our workers should now be stuck waiting for more items to pick from the queue, but no items are left, # we can now safely cancel their tasks. - pool.lock() pool.stop_all() # Finally, we allow for all tasks to do their cleanup (as if they need to do any) upon being cancelled. # We block until they all return or raise an exception, but since we are not interested in any of their exceptions,