Skip to content

Commit

Permalink
Fix server's control session reading from socket; improve docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-berg committed May 7, 2022
1 parent 72e380c commit 5a72a6d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
author = 'Daniil Fajnberg'

# The full version, including alpha/beta/rc tags
release = '1.1.1'
release = '1.1.2'


# -- General configuration ---------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = asyncio-taskpool
version = 1.1.1
version = 1.1.2
author = Daniil Fajnberg
author_email = [email protected]
description = Dynamically manage pools of asyncio tasks
Expand Down
2 changes: 2 additions & 0 deletions src/asyncio_taskpool/control/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async def _server_handshake(self, reader: StreamReader, writer: StreamWriter) ->
"""
self._connected = True
writer.write(json.dumps(self._client_info()).encode())
writer.write(b'\n')
await writer.drain()
print("Connected to", (await reader.read(SESSION_MSG_BYTES)).decode())
print("Type '-h' to get help and usage instructions for all available commands.\n")
Expand Down Expand Up @@ -131,6 +132,7 @@ async def _interact(self, reader: StreamReader, writer: StreamWriter) -> None:
try:
# Send the command to the server.
writer.write(cmd.encode())
writer.write(b'\n')
await writer.drain()
except ConnectionError as e:
self._connected = False
Expand Down
7 changes: 4 additions & 3 deletions src/asyncio_taskpool/control/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .parser import ControlParser
from ..exceptions import CommandError, HelpRequested, ParserError
from ..pool import TaskPool, SimpleTaskPool
from ..internals.constants import CLIENT_INFO, CMD, CMD_OK, SESSION_MSG_BYTES
from ..internals.constants import CLIENT_INFO, CMD, CMD_OK
from ..internals.helpers import return_or_exception

if TYPE_CHECKING:
Expand Down Expand Up @@ -134,7 +134,8 @@ async def client_handshake(self) -> None:
Client info is retrieved, server info is sent back, and the
:class:`ControlParser <asyncio_taskpool.control.parser.ControlParser>` is set up.
"""
client_info = json.loads((await self._reader.read(SESSION_MSG_BYTES)).decode().strip())
msg = (await self._reader.readline()).decode().strip()
client_info = json.loads(msg)
log.debug("%s connected", self._client_class_name)
parser_kwargs = {
'stream': self._response_buffer,
Expand Down Expand Up @@ -187,7 +188,7 @@ async def listen(self) -> None:
It will obviously block indefinitely.
"""
while self._control_server.is_serving():
msg = (await self._reader.read(SESSION_MSG_BYTES)).decode().strip()
msg = (await self._reader.readline()).decode().strip()
if not msg:
log.debug("%s disconnected", self._client_class_name)
break
Expand Down
13 changes: 9 additions & 4 deletions src/asyncio_taskpool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,10 @@ async def _arg_consumer(self, group_name: str, num_concurrent: int, func: Corout
def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_iter: ArgsT, arg_stars: int,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> None:
"""
Creates tasks in the pool with arguments from the supplied iterable.
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Each coroutine looks like `func(arg)`, `func(*arg)`, or `func(**arg)`, `arg` being taken from `arg_iter`.
The method is a task-based equivalent of the `multiprocessing.pool.Pool.map` method.
All the new tasks are added to the same task group.
Expand Down Expand Up @@ -819,10 +820,10 @@ def _map(self, group_name: str, num_concurrent: int, func: CoroutineFunc, arg_it
def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
A task-based equivalent of the `multiprocessing.pool.Pool.map` method.
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`.
Each coroutine looks like `func(arg)`, `arg` being an element taken from `arg_iter`. The method is a task-based
equivalent of the `multiprocessing.pool.Pool.map` method.
All the new tasks are added to the same task group.
Expand Down Expand Up @@ -876,6 +877,8 @@ def map(self, func: CoroutineFunc, arg_iter: ArgsT, num_concurrent: int = 1, gro
def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurrent: int = 1, group_name: str = None,
end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Like :meth:`map` except that the elements of `args_iter` are expected to be iterables themselves to be unpacked
as positional arguments to the function.
Each coroutine then looks like `func(*args)`, `args` being an element from `args_iter`.
Expand All @@ -893,6 +896,8 @@ def starmap(self, func: CoroutineFunc, args_iter: Iterable[ArgsT], num_concurren
def doublestarmap(self, func: CoroutineFunc, kwargs_iter: Iterable[KwArgsT], num_concurrent: int = 1,
group_name: str = None, end_callback: EndCB = None, cancel_callback: CancelCB = None) -> str:
"""
Creates coroutines with arguments from the supplied iterable and runs them as new tasks in the pool.
Like :meth:`map` except that the elements of `kwargs_iter` are expected to be iterables themselves to be
unpacked as keyword-arguments to the function.
Each coroutine then looks like `func(**kwargs)`, `kwargs` being an element from `kwargs_iter`.
Expand Down

0 comments on commit 5a72a6d

Please sign in to comment.