Skip to content

Commit

Permalink
made a bunch of methods "non-async"
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-berg committed Mar 30, 2022
1 parent 91d546e commit a72a7cc
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 91 deletions.
4 changes: 2 additions & 2 deletions docs/source/pages/pool.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ By contrast, here is how you would do it with a task pool:
...
pool = TaskPool()
group_name = await pool.apply(queue_worker_function, args=(q_in, q_out), num=5)
group_name = pool.apply(queue_worker_function, args=(q_in, q_out), num=5)
...
pool.cancel_group(group_name)
...
Expand Down Expand Up @@ -141,7 +141,7 @@ Or we could use a task pool:
async def main():
...
pool = TaskPool()
await pool.map(another_worker_function, data_iterator, num_concurrent=5)
pool.map(another_worker_function, data_iterator, num_concurrent=5)
...
await pool.gather_and_close()
Expand Down
61 changes: 28 additions & 33 deletions src/asyncio_taskpool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def _cancel_and_remove_all_from_group(self, group_name: str, group_reg: TaskGrou
continue
log.debug("%s cancelled tasks from group %s", str(self), group_name)

async def cancel_group(self, group_name: str, msg: str = None) -> None:
def cancel_group(self, group_name: str, msg: str = None) -> None:
"""
Cancels an entire group of tasks.
Expand All @@ -431,11 +431,10 @@ async def cancel_group(self, group_name: str, msg: str = None) -> None:
group_reg = self._task_groups.pop(group_name)
except KeyError:
raise exceptions.InvalidGroupName(f"No task group named {group_name} exists in this pool.")
async with group_reg:
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
log.debug("%s forgot task group %s", str(self), group_name)

async def cancel_all(self, msg: str = None) -> None:
def cancel_all(self, msg: str = None) -> None:
"""
Cancels all tasks still running within the pool (including meta tasks).
Expand All @@ -449,8 +448,7 @@ async def cancel_all(self, msg: str = None) -> None:
log.warning("%s cancelling all tasks!", str(self))
while self._task_groups:
group_name, group_reg = self._task_groups.popitem()
async with group_reg:
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)
self._cancel_and_remove_all_from_group(group_name, group_reg, msg=msg)

def _pop_ended_meta_tasks(self) -> Set[Task]:
"""
Expand Down Expand Up @@ -598,8 +596,8 @@ async def _apply_num(self, group_name: str, func: CoroutineFunc, args: ArgsT = (
await gather(*(self._start_task(func(*args, **kwargs), group_name=group_name, end_callback=end_callback,
cancel_callback=cancel_callback) for _ in range(num)))

async def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None, num: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = None, num: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates tasks with the supplied arguments to be run in the pool.
Expand Down Expand Up @@ -646,11 +644,10 @@ async def apply(self, func: CoroutineFunc, args: ArgsT = (), kwargs: KwArgsT = N
self._check_start(function=func)
if group_name is None:
group_name = self._generate_group_name('apply', func)
group_reg = self._task_groups.setdefault(group_name, TaskGroupRegister())
async with group_reg:
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._apply_num(group_name, func, args, kwargs, num,
end_callback=end_callback, cancel_callback=cancel_callback)))
self._task_groups.setdefault(group_name, TaskGroupRegister())
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._apply_num(group_name, func, args, kwargs, num,
end_callback=end_callback, cancel_callback=cancel_callback)))
return group_name

@staticmethod
Expand Down Expand Up @@ -711,8 +708,8 @@ async def _arg_consumer(self, group_name: str, num_concurrent: int, func: Corout
str(e.__class__.__name__), func.__name__, '*' * arg_stars, str(next_arg))
map_semaphore.release()

async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates tasks in the pool with arguments from the supplied iterable.
Expand Down Expand Up @@ -760,14 +757,13 @@ async def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc,
raise ValueError("`num_concurrent` must be a positive integer.")
if group_name in self._task_groups.keys():
raise exceptions.InvalidGroupName(f"Group named {group_name} already exists!")
self._task_groups[group_name] = group_reg = TaskGroupRegister()
async with group_reg:
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)))
self._task_groups[group_name] = TaskGroupRegister()
meta_tasks = self._group_meta_tasks_running.setdefault(group_name, set())
meta_tasks.add(create_task(self._arg_consumer(group_name, num_concurrent, func, arg_iter, arg_stars,
end_callback=end_callback, cancel_callback=cancel_callback)))

async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
Expand Down Expand Up @@ -819,12 +815,12 @@ async def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int =
"""
if group_name is None:
group_name = self._generate_group_name('map', func)
await self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, arg_iter, 0,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name

async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
as positional arguments to the function.
Expand All @@ -836,13 +832,12 @@ async def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_con
"""
if group_name is None:
group_name = self._generate_group_name('starmap', func)
await self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, args_iter, 1,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name

async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None,
cancel_callback: CancelCB = None) -> str:
def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be
unpacked as keyword-arguments to the function.
Expand All @@ -854,8 +849,8 @@ async def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT
"""
if group_name is None:
group_name = self._generate_group_name('doublestarmap', func)
await self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback)
self._map(group_name, num_concurrent, func, kwargs_iter, 2,
end_callback=end_callback, cancel_callback=cancel_callback)
return group_name


Expand Down
Loading

0 comments on commit a72a7cc

Please sign in to comment.