From 91d546ebc2bedebfb587ed6e1ae0d48a23b0c8bd Mon Sep 17 00:00:00 2001 From: Daniil Fajnberg Date: Wed, 30 Mar 2022 16:17:34 +0200 Subject: [PATCH] moved the entire meta task logic to the base class --- src/asyncio_taskpool/pool.py | 191 ++++++++++++----------------------- tests/test_pool.py | 159 ++++++++++++----------------- 2 files changed, 133 insertions(+), 217 deletions(-) diff --git a/src/asyncio_taskpool/pool.py b/src/asyncio_taskpool/pool.py index ef10b05..5a998b2 100644 --- a/src/asyncio_taskpool/pool.py +++ b/src/asyncio_taskpool/pool.py @@ -83,6 +83,10 @@ class BaseTaskPool: self._enough_room: Semaphore = Semaphore() self._task_groups: Dict[str, TaskGroupRegister[int]] = {} + # Mapping task group names to sets of meta tasks, and a bucket for cancelled meta tasks. + self._group_meta_tasks_running: Dict[str, Set[Task]] = {} + self._meta_tasks_cancelled: Set[Task] = set() + # Finish with method/functions calls that add the pool to the internal list of pools, set its initial size, # and issue a log message. self._idx: int = self._add_pool(self) @@ -375,6 +379,17 @@ class BaseTaskPool: for task in tasks: task.cancel(msg=msg) + def _cancel_group_meta_tasks(self, group_name: str) -> None: + """Cancels and forgets all meta tasks associated with the task group named `group_name`.""" + try: + meta_tasks = self._group_meta_tasks_running.pop(group_name) + except KeyError: + return + for meta_task in meta_tasks: + meta_task.cancel() + self._meta_tasks_cancelled.update(meta_tasks) + log.debug("%s cancelled and forgot meta tasks from group %s", str(self), group_name) + def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None: """ Removes all tasks from the specified group and cancels them. @@ -386,6 +401,7 @@ class BaseTaskPool: group_reg: The task group register object containing the task IDs. msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`. """ + self._cancel_group_meta_tasks(group_name) while group_reg: try: self._tasks_running[group_reg.pop()].cancel(msg=msg) @@ -399,12 +415,16 @@ class BaseTaskPool: The task group is subsequently forgotten by the pool. + If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the + actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will + **never even start**. + Args: - group_name: The name of the group of tasks that shall be cancelled. - msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`. + group_name: The name of the group of tasks (and meta tasks) that shall be cancelled. + msg (optional): Passed to the `Task.cancel()` method of every task in the group. Raises: - `InvalidGroupName`: if no task group named `group_name` exists in the pool. + `InvalidGroupName`: No task group named `group_name` exists in the pool. """ log.debug("%s cancelling tasks in group %s", str(self), group_name) try: @@ -417,10 +437,14 @@ class BaseTaskPool: async def cancel_all(self, msg: str = None) -> None: """ - Cancels all tasks still running within the pool. + Cancels all tasks still running within the pool (including meta tasks). + + If any methods such launched meta tasks belonging to that group, these meta tasks are cancelled before the + actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will + **never even start**. Args: - msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`. + msg (optional): Passed to the `Task.cancel()` method of every task. """ log.warning("%s cancelling all tasks!", str(self)) while self._task_groups: @@ -428,120 +452,6 @@ class BaseTaskPool: async with group_reg: self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg) - async def flush(self, return_exceptions: bool = False): - """ - Gathers (i.e. awaits) all ended/cancelled tasks in the pool. - - The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded - `Task` objects. - - It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks - registered for the tasks block. - - Args: - return_exceptions (optional): Passed directly into `gather`. - """ - await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), return_exceptions=return_exceptions) - self._tasks_ended.clear() - self._tasks_cancelled.clear() - - async def gather_and_close(self, return_exceptions: bool = False): - """ - Gathers (i.e. awaits) **all** tasks in the pool, then closes it. - - Once this method is called, no more tasks can be started in the pool. - - This method may block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of the - callbacks registered for a task blocks for whatever reason. - - Args: - return_exceptions (optional): Passed directly into `gather`. - - Raises: - `PoolStillUnlocked`: The pool has not been locked yet. - """ - self.lock() - await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), *self._tasks_running.values(), - return_exceptions=return_exceptions) - self._tasks_ended.clear() - self._tasks_cancelled.clear() - self._tasks_running.clear() - self._closed = True - - -class TaskPool(BaseTaskPool): - """ - General purpose task pool class. - - Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib. - - A `TaskPool` instance can manage an arbitrary number of concurrent tasks from any coroutine function. - Tasks in the pool can all belong to the same coroutine function, - but they can also come from any number of different and unrelated coroutine functions. - - As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.) - Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment. - - Adding tasks blocks **only if** the pool is full at that moment. - """ - - def __init__(self, pool_size: int = inf, name: str = None) -> None: - super().__init__(pool_size=pool_size, name=name) - # In addition to all the attributes of the base class, we need a dictionary mapping task group names to sets of - # meta tasks that are/were running in the context of that group, and a bucket for cancelled meta tasks. - self._group_meta_tasks_running: Dict[str, Set[Task]] = {} - self._meta_tasks_cancelled: Set[Task] = set() - - def _cancel_group_meta_tasks(self, group_name: str) -> None: - """Cancels and forgets all meta tasks associated with the task group named `group_name`.""" - try: - meta_tasks = self._group_meta_tasks_running.pop(group_name) - except KeyError: - return - for meta_task in meta_tasks: - meta_task.cancel() - self._meta_tasks_cancelled.update(meta_tasks) - log.debug("%s cancelled and forgot meta tasks from group %s", str(self), group_name) - - def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGroupRegister, msg: str = None) -> None: - """See base class.""" - self._cancel_group_meta_tasks(group_name) - super()._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg) - - async def cancel_group(self, group_name: str, msg: str = None) -> None: - """ - Cancels an entire group of tasks. - - The task group is subsequently forgotten by the pool. - - If any methods such as :meth:`map` launched meta tasks belonging to that group, these meta tasks are cancelled - before the actual tasks are cancelled. This means that any tasks "queued" to be started by a meta task will - **never even start**. In the case of :meth:`map` this would mean that its `arg_iter` may be abandoned before it - was fully consumed (if that is even possible). - - Args: - group_name: The name of the group of tasks (and meta tasks) that shall be cancelled. - msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`. - - Raises: - `InvalidGroupName`: No task group named `group_name` exists in the pool. - """ - await super().cancel_group(group_name=group_name, msg=msg) - - async def cancel_all(self, msg: str = None) -> None: - """ - Cancels all tasks still running within the pool (including meta tasks). - - If any methods such as :meth:`map` launched meta tasks, these meta tasks are cancelled before the actual tasks - are cancelled. This means that any tasks "queued" to be started by a meta task will **never even start**. In the - case of :meth:`map` this would mean that its `arg_iter` may be abandoned before it was fully consumed (if that - is even possible). - - Args: - msg (optional): Passed to the `Task.cancel()` method of every task specified by the `task_ids`. - """ - await super().cancel_all(msg=msg) - def _pop_ended_meta_tasks(self) -> Set[Task]: """ Goes through all not-cancelled meta tasks, checks if they are done already, and returns those that are. @@ -573,7 +483,7 @@ class TaskPool(BaseTaskPool): Gathers (i.e. awaits) all ended/cancelled tasks in the pool. The tasks are subsequently forgotten by the pool. This method exists mainly to free up memory of unneeded - `Task` objects. It also gets rid of unneeded meta tasks. + `Task` objects. It also gets rid of unneeded (ended/cancelled) meta tasks. It blocks, **only if** any of the tasks block while catching a `asyncio.CancelledError` or any of the callbacks registered for the tasks block. @@ -585,7 +495,9 @@ class TaskPool(BaseTaskPool): await gather(*self._meta_tasks_cancelled, *self._pop_ended_meta_tasks(), return_exceptions=return_exceptions) self._meta_tasks_cancelled.clear() - await super().flush(return_exceptions=return_exceptions) + await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), return_exceptions=return_exceptions) + self._tasks_ended.clear() + self._tasks_cancelled.clear() async def gather_and_close(self, return_exceptions: bool = False): """ @@ -594,8 +506,8 @@ class TaskPool(BaseTaskPool): Once this method is called, no more tasks can be started in the pool. Note that this method may block indefinitely as long as any task in the pool is not done. This includes meta - tasks launched by methods such as :meth:`map`, which end by themselves, only once the arguments iterator is - fully consumed (which may not even be possible). To avoid this, make sure to call :meth:`cancel_all` first. + tasks launched by other methods, which may or may not even end by themselves. To avoid this, make sure to call + :meth:`cancel_all` first. This method may also block, if one of the tasks blocks while catching a `asyncio.CancelledError` or if any of the callbacks registered for a task blocks for whatever reason. @@ -612,7 +524,29 @@ class TaskPool(BaseTaskPool): await gather(*self._meta_tasks_cancelled, *not_cancelled_meta_tasks, return_exceptions=return_exceptions) self._meta_tasks_cancelled.clear() self._group_meta_tasks_running.clear() - await super().gather_and_close(return_exceptions=return_exceptions) + await gather(*self._tasks_ended.values(), *self._tasks_cancelled.values(), *self._tasks_running.values(), + return_exceptions=return_exceptions) + self._tasks_ended.clear() + self._tasks_cancelled.clear() + self._tasks_running.clear() + self._closed = True + + +class TaskPool(BaseTaskPool): + """ + General purpose task pool class. + + Attempts to emulate part of the interface of `multiprocessing.pool.Pool` from the stdlib. + + A `TaskPool` instance can manage an arbitrary number of concurrent tasks from any coroutine function. + Tasks in the pool can all belong to the same coroutine function, + but they can also come from any number of different and unrelated coroutine functions. + + As long as there is room in the pool, more tasks can be added. (By default, there is no pool size limit.) + Each task started in the pool receives a unique ID, which can be used to cancel specific tasks at any moment. + + Adding tasks blocks **only if** the pool is full at that moment. + """ def _generate_group_name(self, prefix: str, coroutine_function: CoroutineFunc) -> str: """ @@ -678,6 +612,9 @@ class TaskPool(BaseTaskPool): because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num`. + If the entire task group is cancelled, the meta task is cancelled first, which may cause the number of tasks + spawned to be less than `num`. + Args: func: The coroutine function to use for spawning the new tasks within the task pool. @@ -793,6 +730,9 @@ class TaskPool(BaseTaskPool): because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`. + If the entire task group is cancelled, the meta task is cancelled first, which means that `arg_iter` may be + abandoned before being fully consumed (if that is even possible). + Args: group_name: Name of the task group to add the new tasks to. It must be a name that doesn't exist yet. @@ -846,6 +786,9 @@ class TaskPool(BaseTaskPool): because this method returns immediately, this does not mean that any task was started or that any number of tasks will start soon, as this is solely determined by the :attr:`BaseTaskPool.pool_size` and `num_concurrent`. + If the entire task group is cancelled, the meta task is cancelled first, which means that `arg_iter` may be + abandoned before being fully consumed (if that is even possible). + Args: func: The coroutine function to use for spawning the new tasks within the task pool. diff --git a/tests/test_pool.py b/tests/test_pool.py index d2b43d0..ec3a4c6 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -93,6 +93,9 @@ class BaseTaskPoolTestCase(CommonTestCase): self.assertIsInstance(self.task_pool._enough_room, Semaphore) self.assertDictEqual(EMPTY_DICT, self.task_pool._task_groups) + self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) + self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) + self.assertEqual(self.mock_idx, self.task_pool._idx) self.mock__add_pool.assert_called_once_with(self.task_pool) @@ -316,10 +319,31 @@ class BaseTaskPoolTestCase(CommonTestCase): mock__get_running_task.assert_has_calls([call(task_id1), call(task_id2), call(task_id3)]) mock_cancel.assert_has_calls([call(msg=FOO), call(msg=FOO), call(msg=FOO)]) - def test__cancel_and_remove_all_from_group(self): + def test__cancel_group_meta_tasks(self): + mock_task1, mock_task2 = MagicMock(), MagicMock() + self.task_pool._group_meta_tasks_running[BAR] = {mock_task1, mock_task2} + self.assertIsNone(self.task_pool._cancel_group_meta_tasks(FOO)) + self.assertDictEqual({BAR: {mock_task1, mock_task2}}, self.task_pool._group_meta_tasks_running) + self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) + mock_task1.cancel.assert_not_called() + mock_task2.cancel.assert_not_called() + + self.assertIsNone(self.task_pool._cancel_group_meta_tasks(BAR)) + self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) + self.assertSetEqual({mock_task1, mock_task2}, self.task_pool._meta_tasks_cancelled) + mock_task1.cancel.assert_called_once_with() + mock_task2.cancel.assert_called_once_with() + + @patch.object(pool.BaseTaskPool, '_cancel_group_meta_tasks') + def test__cancel_and_remove_all_from_group(self, mock__cancel_group_meta_tasks: MagicMock): task_id = 555 mock_cancel = MagicMock() - self.task_pool._tasks_running[task_id] = MagicMock(cancel=mock_cancel) + + def add_mock_task_to_running(_): + self.task_pool._tasks_running[task_id] = MagicMock(cancel=mock_cancel) + # We add the fake task to the `_tasks_running` dictionary as a side effect of calling the mocked method, + # to verify that it is called first, before the cancellation loop starts. + mock__cancel_group_meta_tasks.side_effect = add_mock_task_to_running class MockRegister(set, MagicMock): pass @@ -351,11 +375,36 @@ class BaseTaskPoolTestCase(CommonTestCase): mock_grp_aenter.assert_awaited_once_with() mock_grp_aexit.assert_awaited_once() - async def test_flush(self): + def test__pop_ended_meta_tasks(self): + mock_task, mock_done_task1 = MagicMock(done=lambda: False), MagicMock(done=lambda: True) + self.task_pool._group_meta_tasks_running[FOO] = {mock_task, mock_done_task1} + mock_done_task2, mock_done_task3 = MagicMock(done=lambda: True), MagicMock(done=lambda: True) + self.task_pool._group_meta_tasks_running[BAR] = {mock_done_task2, mock_done_task3} + expected_output = {mock_done_task1, mock_done_task2, mock_done_task3} + output = self.task_pool._pop_ended_meta_tasks() + self.assertSetEqual(expected_output, output) + self.assertDictEqual({FOO: {mock_task}}, self.task_pool._group_meta_tasks_running) + + @patch.object(pool.BaseTaskPool, '_pop_ended_meta_tasks') + async def test_flush(self, mock__pop_ended_meta_tasks: MagicMock): + # Meta tasks: + mock_ended_meta_task = AsyncMock() + mock__pop_ended_meta_tasks.return_value = {mock_ended_meta_task()} + mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError) + self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()} + # Actual tasks: mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception) self.task_pool._tasks_ended = {123: mock_ended_func()} self.task_pool._tasks_cancelled = {456: mock_cancelled_func()} + self.assertIsNone(await self.task_pool.flush(return_exceptions=True)) + + # Meta tasks: + mock__pop_ended_meta_tasks.assert_called_once_with() + mock_ended_meta_task.assert_awaited_once_with() + mock_cancelled_meta_task.assert_awaited_once_with() + self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) + # Actual tasks: mock_ended_func.assert_awaited_once_with() mock_cancelled_func.assert_awaited_once_with() self.assertDictEqual(EMPTY_DICT, self.task_pool._tasks_ended) @@ -363,15 +412,28 @@ class BaseTaskPoolTestCase(CommonTestCase): @patch.object(pool.BaseTaskPool, 'lock') async def test_gather_and_close(self, mock_lock: MagicMock): + # Meta tasks: + mock_meta_task1, mock_meta_task2 = AsyncMock(), AsyncMock() + self.task_pool._group_meta_tasks_running = {FOO: {mock_meta_task1()}, BAR: {mock_meta_task2()}} + mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError) + self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()} + # Actual tasks: mock_running_func = AsyncMock() mock_ended_func, mock_cancelled_func = AsyncMock(), AsyncMock(side_effect=Exception) self.task_pool._tasks_ended = {123: mock_ended_func()} self.task_pool._tasks_cancelled = {456: mock_cancelled_func()} self.task_pool._tasks_running = {789: mock_running_func()} - self.task_pool._locked = True self.assertIsNone(await self.task_pool.gather_and_close(return_exceptions=True)) + mock_lock.assert_called_once_with() + # Meta tasks: + mock_meta_task1.assert_awaited_once_with() + mock_meta_task2.assert_awaited_once_with() + mock_cancelled_meta_task.assert_awaited_once_with() + self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) + self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) + # Actual tasks: mock_ended_func.assert_awaited_once_with() mock_cancelled_func.assert_awaited_once_with() mock_running_func.assert_awaited_once_with() @@ -385,95 +447,6 @@ class TaskPoolTestCase(CommonTestCase): TEST_CLASS = pool.TaskPool task_pool: pool.TaskPool - def setUp(self) -> None: - self.base_class_init_patcher = patch.object(pool.BaseTaskPool, '__init__') - self.base_class_init = self.base_class_init_patcher.start() - super().setUp() - - def tearDown(self) -> None: - self.base_class_init_patcher.stop() - super().tearDown() - - def test_init(self): - self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) - self.base_class_init.assert_called_once_with(pool_size=self.TEST_POOL_SIZE, name=self.TEST_POOL_NAME) - - def test__cancel_group_meta_tasks(self): - mock_task1, mock_task2 = MagicMock(), MagicMock() - self.task_pool._group_meta_tasks_running[BAR] = {mock_task1, mock_task2} - self.assertIsNone(self.task_pool._cancel_group_meta_tasks(FOO)) - self.assertDictEqual({BAR: {mock_task1, mock_task2}}, self.task_pool._group_meta_tasks_running) - self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) - mock_task1.cancel.assert_not_called() - mock_task2.cancel.assert_not_called() - - self.assertIsNone(self.task_pool._cancel_group_meta_tasks(BAR)) - self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) - self.assertSetEqual({mock_task1, mock_task2}, self.task_pool._meta_tasks_cancelled) - mock_task1.cancel.assert_called_once_with() - mock_task2.cancel.assert_called_once_with() - - @patch.object(pool.BaseTaskPool, '_cancel_and_remove_all_from_group') - @patch.object(pool.TaskPool, '_cancel_group_meta_tasks') - def test__cancel_and_remove_all_from_group(self, mock__cancel_group_meta_tasks: MagicMock, - mock_base__cancel_and_remove_all_from_group: MagicMock): - group_name, group_reg, msg = 'xyz', MagicMock(), FOO - self.assertIsNone(self.task_pool._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)) - mock__cancel_group_meta_tasks.assert_called_once_with(group_name) - mock_base__cancel_and_remove_all_from_group.assert_called_once_with(group_name, group_reg, msg=msg) - - @patch.object(pool.BaseTaskPool, 'cancel_group') - async def test_cancel_group(self, mock_base_cancel_group: AsyncMock): - group_name, msg = 'abc', 'xyz' - await self.task_pool.cancel_group(group_name, msg=msg) - mock_base_cancel_group.assert_awaited_once_with(group_name=group_name, msg=msg) - - @patch.object(pool.BaseTaskPool, 'cancel_all') - async def test_cancel_all(self, mock_base_cancel_all: AsyncMock): - msg = 'xyz' - await self.task_pool.cancel_all(msg=msg) - mock_base_cancel_all.assert_awaited_once_with(msg=msg) - - def test__pop_ended_meta_tasks(self): - mock_task, mock_done_task1 = MagicMock(done=lambda: False), MagicMock(done=lambda: True) - self.task_pool._group_meta_tasks_running[FOO] = {mock_task, mock_done_task1} - mock_done_task2, mock_done_task3 = MagicMock(done=lambda: True), MagicMock(done=lambda: True) - self.task_pool._group_meta_tasks_running[BAR] = {mock_done_task2, mock_done_task3} - expected_output = {mock_done_task1, mock_done_task2, mock_done_task3} - output = self.task_pool._pop_ended_meta_tasks() - self.assertSetEqual(expected_output, output) - self.assertDictEqual({FOO: {mock_task}}, self.task_pool._group_meta_tasks_running) - - @patch.object(pool.TaskPool, '_pop_ended_meta_tasks') - @patch.object(pool.BaseTaskPool, 'flush') - async def test_flush(self, mock_base_flush: AsyncMock, mock__pop_ended_meta_tasks: MagicMock): - mock_ended_meta_task = AsyncMock() - mock__pop_ended_meta_tasks.return_value = {mock_ended_meta_task()} - mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError) - self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()} - self.assertIsNone(await self.task_pool.flush(return_exceptions=False)) - mock_base_flush.assert_awaited_once_with(return_exceptions=False) - mock__pop_ended_meta_tasks.assert_called_once_with() - mock_ended_meta_task.assert_awaited_once_with() - mock_cancelled_meta_task.assert_awaited_once_with() - self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) - - @patch.object(pool.BaseTaskPool, 'lock') - @patch.object(pool.BaseTaskPool, 'gather_and_close') - async def test_gather_and_close(self, mock_base_gather_and_close: AsyncMock, mock_lock: MagicMock): - mock_meta_task1, mock_meta_task2 = AsyncMock(), AsyncMock() - self.task_pool._group_meta_tasks_running = {FOO: {mock_meta_task1()}, BAR: {mock_meta_task2()}} - mock_cancelled_meta_task = AsyncMock(side_effect=CancelledError) - self.task_pool._meta_tasks_cancelled = {mock_cancelled_meta_task()} - self.assertIsNone(await self.task_pool.gather_and_close(return_exceptions=True)) - mock_lock.assert_called_once_with() - mock_base_gather_and_close.assert_awaited_once_with(return_exceptions=True) - mock_meta_task1.assert_awaited_once_with() - mock_meta_task2.assert_awaited_once_with() - mock_cancelled_meta_task.assert_awaited_once_with() - self.assertDictEqual(EMPTY_DICT, self.task_pool._group_meta_tasks_running) - self.assertSetEqual(EMPTY_SET, self.task_pool._meta_tasks_cancelled) - def test__generate_group_name(self): prefix, func = 'x y z', AsyncMock(__name__=BAR) base_name = f'{prefix}-{BAR}-group'