Skip to content

Commit

Permalink
[F] Added retries for connection errors during async downloading
Browse files Browse the repository at this point in the history
- now when part cannot be downloaded by any reason (e.g. connection loss)
  whole operation will be retried
- fixed multiple logging handlers created with multiple qget calls
  • Loading branch information
dwojtasik committed May 15, 2023
1 parent 3fad7e7 commit b91548c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 21 deletions.
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ supplied by user is use as a top limit for calculated value.
History
=======
Unreleased
----------
- Fixed multiple logging handlers created with multiple qget calls.
- Added retries for connection errors during async downloading.

0.1.5 (2023-01-25)
------------------
- Updated copyright note.
Expand Down
14 changes: 14 additions & 0 deletions qget/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ def _get_parser() -> argparse.ArgumentParser:
default=default_args["max_part_mb"],
help="Desirable (if possible) max part size in megabytes.",
)
parser.add_argument(
"--retries",
type=int,
dest="retries",
default=default_args["retries"],
help="Retries number for part download.",
)
parser.add_argument(
"--retry_sec",
type=int,
dest="retry_sec",
default=default_args["retry_sec"],
help="Time to wait between retries of part download in seconds.",
)
parser.add_argument(
"--limit",
type=str,
Expand Down
88 changes: 67 additions & 21 deletions qget/qget_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tempfile
import time
from dataclasses import dataclass
from typing import Callable, Dict
from typing import Awaitable, Callable, Dict
from urllib.parse import unquote, urlparse
from python_socks import parse_proxy_url

Expand Down Expand Up @@ -249,6 +249,8 @@ def _get_logger(debug: bool) -> logging.Logger:
logger.setLevel(logging.DEBUG if debug else logging.ERROR)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG if debug else logging.ERROR)
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(handler)
return logger

Expand Down Expand Up @@ -496,12 +498,39 @@ def _parse_limit(limit: str) -> int:
return value


async def _with_retries(
code: Callable[[], Awaitable[None]], retries: int = 10, retry_sec: int = 1, exception: Exception = None
) -> None:
"""Retries code block execution.
Args:
code (Callable[[], Awaitable[None]]): Code to execute in form of async callback.
retries (int, optional): Retries number. Defaults to 10.
retry_sec (int, optional): Time to wait between retries in seconds. Defaults to 1.
exception (Exception, optional): Exception thrown meanwhile code execution. Defaults to None.
Raises:
exception: Exception thrown meanwhile execution. Thrown only when retries run out.
"""
if retries < 0:
if exception is not None:
raise exception
return
try:
await code()
except Exception as ex: # pylint: disable=broad-except
await asyncio.sleep(retry_sec)
await _with_retries(code, retries - 1, retry_sec, ex)


async def _download_part(
session: aiohttp.ClientSession,
url: str,
tmp_dir: str,
part_data: PartData,
chunk_bytes: int,
retries: int,
retry_sec: int,
limiter: Limiter = None,
progress_ref: ProgressState = None,
) -> None:
Expand All @@ -513,34 +542,43 @@ async def _download_part(
tmp_dir (str): Temporary directory path to save part file.
part_data (PartData): Basic informations about part.
chunk_bytes (int): Chunk of data read in iteration from url and save to part file in bytes.
retries (int): Retries number.
retry_sec (int): Time to wait between retries in seconds.
limiter (Limiter, optional): Limiter for download rate. If supplied FileBuffer will be used and chunk_bytes
for stream iteration will be overridden with new calculated value. Defaults to None.
progress_ref (ProgressState, optional): Reference to progress state.
If passed part bytes will be updated in it. Defaults to None.
"""
has_limit = limiter is not None
iter_size = limiter.chunk_bytes if has_limit else chunk_bytes
async with session.get(
url,
headers={
"range": f"bytes={part_data.begin}-{part_data.end}",
},
timeout=None,
) as response:
async with aiofiles.open(f"{tmp_dir}/part-{part_data.part_id}.cr", "wb") as part_file:
if has_limit:
file_buffer = FileBuffer(part_file, chunk_bytes)
async for chunk in response.content.iter_chunked(iter_size):
byte_count = len(chunk)
if progress_ref:
progress_ref.update_part_progress(part_data.part_id, byte_count)

async def _download_call():
"""Download callable code block used for retries"""
async with session.get(
url,
headers={
"range": f"bytes={part_data.begin}-{part_data.end}",
},
timeout=None,
) as response:
if progress_ref:
progress_ref.parts_bytes[part_data.part_id] = 0
async with aiofiles.open(f"{tmp_dir}/part-{part_data.part_id}.cr", "wb") as part_file:
if has_limit:
file_buffer = FileBuffer(part_file, chunk_bytes)
async for chunk in response.content.iter_chunked(iter_size):
byte_count = len(chunk)
if progress_ref:
progress_ref.update_part_progress(part_data.part_id, byte_count)
if has_limit:
await limiter.throttle(byte_count)
await file_buffer.write(chunk)
else:
await part_file.write(chunk)
if has_limit:
await limiter.throttle(byte_count)
await file_buffer.write(chunk)
else:
await part_file.write(chunk)
if has_limit:
await file_buffer.close()
await file_buffer.close()

await _with_retries(_download_call, retries, retry_sec)


async def _rewrite_parts(filepath: str, tmp_dir: str, chunk_bytes: int, progress_ref: ProgressState = None) -> None:
Expand Down Expand Up @@ -583,6 +621,8 @@ async def qget_coro(
connection_test_sec: int = 5,
chunk_bytes: int = 2621440,
max_part_mb: float = 5.0,
retries: int = 10,
retry_sec: int = 1,
limit: str = None,
tmp_dir: str = None,
debug: bool = False,
Expand Down Expand Up @@ -612,6 +652,8 @@ async def qget_coro(
Will be used also when rewriting parts to output file. If limit is supplied this can be override for
stream iteration. Defaults to 2621440.
max_part_mb (float, optional): Desirable (if possible) max part size in megabytes. Defaults to 5.
retries (int, optional): Retries number for part download. Defaults to 10.
retry_sec (int, optional): Time to wait between retries of part download in seconds. Defaults to 1.
limit (str, optional): Download rate limit in MBps. Can be supplied with unit as "Nunit", eg. "5M".
Valid units (case insensitive): b, k, m, g, kb, mb, gb. 0 bytes will be treat as no limit.
Defaults to None.
Expand Down Expand Up @@ -706,6 +748,8 @@ async def _semaphore_task(task: asyncio.coroutine) -> asyncio.coroutine:
resource_bytes if part == part_count - 1 else begin + part_bytes - 1,
),
chunk_bytes,
retries,
retry_sec,
limiter,
progress_ref,
)
Expand Down Expand Up @@ -763,6 +807,8 @@ def qget(url: str, **kwargs) -> None:
chunk_bytes (int, optional): Chunk of data read in iteration from url and save to part file in bytes.
Will be used also when rewriting parts to output file. Defaults to 2621440.
max_part_mb (float, optional): Desirable (if possible) max part size in megabytes. Defaults to 5.
retries (int, optional): Retries number for part download. Defaults to 10.
retry_sec (int, optional): Time to wait between retries of part download in seconds. Defaults to 1.
limit (str, optional): Download rate limit in MBps. Can be supplied with unit as "Nunit", eg. "5M".
Valid units (case insensitive): b, k, m, g, kb, mb, gb. 0 bytes will be treat as no limit.
Defaults to None.
Expand Down

0 comments on commit b91548c

Please sign in to comment.