Skip to content

Commit

Permalink
Stream archive, no longer offer other formats than zip
Browse files Browse the repository at this point in the history
Tested with 390Mb job dir results in 5s with a 133Mb zip file.
The compression level, chunk sizes where optimized for time, cpu load and size.
  • Loading branch information
sverhoeven committed Nov 17, 2024
1 parent 83898cb commit f041ab6
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 26 deletions.
8 changes: 4 additions & 4 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ per-file-ignores =
DAR201,
; Found too long name: test_ok_running_job_with_input_and_output_file
WPS118,
; Found too many variables used to unpack a tuple
WPS236,
; Found too many `assert` statements
WPS218,
; all init files
__init__.py:
; ignore not used imports
Expand All @@ -115,10 +119,6 @@ per-file-ignores =
F403,
; Found wrong metadata variable
WPS410,
; Found too many variables used to unpack a tuple
WPS236,
; Found too many `assert` statements
WPS218,


exclude =
Expand Down
61 changes: 60 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jsonschema = "<4.18.0"
jinja2 = "^3.1.2"
# Downgrade redis as arq 0.25.0 does not support redis>=5.1.0
redis = {extras = ["hiredis"], version ="5.0.8"}
stream-zip = "^0.0.83"

[tool.poetry.group.dev.dependencies]
pytest = "^7.0"
Expand Down
76 changes: 74 additions & 2 deletions src/bartender/walk_dir.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from os import DirEntry
from collections.abc import AsyncIterable, Callable
from datetime import datetime
from os import DirEntry, walk
from pathlib import Path
from stat import S_IFREG
from typing import Optional, Union

from aiofiles.os import scandir
from aiofiles import open
from aiofiles.os import scandir, stat, wrap
from pydantic import BaseModel
from stream_zip import ZIP_32, AsyncMemberFile


class DirectoryItem(BaseModel):
Expand Down Expand Up @@ -49,3 +54,70 @@ async def walk_dir(
if children:
item.children = sorted(children, key=lambda entry: entry.name)
return item


async_walk = wrap(walk)

WalkFilter = Callable[[str], bool]


def exclude_filter(excludes: list[str]) -> WalkFilter:
"""Create a filter that excludes paths based on a list.
Args:
excludes: List of patterns to exclude.
Returns:
A function that takes a path and returns True if it should be included.
"""
if not excludes:
return lambda _: True
return lambda path: not any(pattern in path for pattern in excludes)


chunk_size = 1048576 # 1Mb
owner_read_only_file_mode = 0o400
read_only_file_mode = S_IFREG | owner_read_only_file_mode


async def _yield_file_contents(name: Path) -> AsyncIterable[bytes]:
async with open(name, "rb") as handle:
while chunk := await handle.read(chunk_size):
yield chunk


async def _yield_file(path: Path, rpath: str) -> AsyncMemberFile:
stats = await stat(path)
return (
rpath,
datetime.fromtimestamp(stats.st_mtime),
read_only_file_mode,
ZIP_32,
_yield_file_contents(path),
)


async def walk_dir_generator( # noqa: WPS231
job_dir: Path,
wfilter: WalkFilter = lambda _: True,
) -> AsyncIterable[AsyncMemberFile]:
"""Walk a directory and yield its files.
Can be used as input for stream_zip.async_stream_zip
Args:
job_dir: The job directory.
wfilter: A function that takes a path and returns True if it should be included.
Yields:
Tuple of file name, m_time, mode, method, file.
"""
for root, _, files in await async_walk(job_dir):
if not wfilter(str(Path(root).relative_to(job_dir))):
continue
for file in files:
path = Path(root) / file
rpath = str(path.relative_to(job_dir))
if not wfilter(rpath):
continue
yield await _yield_file(path, rpath)
54 changes: 36 additions & 18 deletions src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import zlib
from pathlib import Path
from shutil import rmtree
from typing import Annotated, Optional, Set, Tuple
Expand All @@ -12,11 +13,12 @@
Query,
Request,
)
from fastapi.responses import FileResponse, PlainTextResponse
from fastapi.responses import FileResponse, PlainTextResponse, StreamingResponse
from jsonschema import ValidationError
from pydantic import PositiveInt
from sqlalchemy.exc import NoResultFound
from starlette import status
from stream_zip import async_stream_zip

from bartender.async_utils import async_wrap
from bartender.check_load import check_load
Expand All @@ -27,8 +29,14 @@
from bartender.destinations import Destination
from bartender.filesystems.queue import CurrentFileOutStagingQueue
from bartender.schedulers.abstract import JobDescription
from bartender.walk_dir import DirectoryItem, walk_dir
from bartender.web.api.job.archive import ArchiveFormat, create_archive
from bartender.walk_dir import (
DirectoryItem,
chunk_size,
exclude_filter,
walk_dir,
walk_dir_generator,
)
from bartender.web.api.job.archive import ArchiveFormat
from bartender.web.api.job.interactive_apps import InteractiveAppResult, run
from bartender.web.api.job.schema import JobModelDTO
from bartender.web.api.job.sync import sync_state, sync_states
Expand Down Expand Up @@ -364,11 +372,10 @@ def _remove_archive(filename: str) -> None:
responses={
200: {
"content": {
"application/octet-stream": {},
"application/zip": {},
},
},
},
response_class=FileResponse,
)
async def retrieve_job_directory_as_archive( # noqa: WPS211
job_dir: CurrentCompletedJobDir,
Expand All @@ -382,7 +389,7 @@ async def retrieve_job_directory_as_archive( # noqa: WPS211
# /output that are not also called output. Might improve when globs will be
# supported in next release (already documented):
# https://github.com/PyFilesystem/pyfilesystem2/pull/464
) -> FileResponse:
) -> StreamingResponse:
"""Download contents of job directory as archive.
Args:
Expand All @@ -396,18 +403,29 @@ async def retrieve_job_directory_as_archive( # noqa: WPS211
If not provided, uses id of the job.
Returns:
FileResponse: Archive containing the content of job_dir
StreamingResponse: Archive containing the content of job_dir
Raises:
NotImplementedError: When archive format is not supported.
"""
archive_fn = str(job_dir.with_suffix(archive_format))
await create_archive(job_dir, exclude, exclude_dirs, archive_format, archive_fn)

background_tasks.add_task(_remove_archive, archive_fn)

return_fn = Path(archive_fn).name
if filename:
return_fn = filename
return FileResponse(archive_fn, filename=return_fn)
if archive_format != ".zip":
raise NotImplementedError("Only zip format is supported for now")
fn = filename or job_dir.with_suffix(".zip").name
headers = {
"Content-Disposition": f'attachment; filename="{fn}"',
}
all_excludes = []
if exclude:
all_excludes.extend(exclude)
if exclude_dirs:
all_excludes.extend(exclude_dirs)
wfilter = exclude_filter(all_excludes)
generator = async_stream_zip(
walk_dir_generator(job_dir, wfilter),
chunk_size=chunk_size,
get_compressobj=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=1),
)
return StreamingResponse(generator, media_type="application/zip", headers=headers)


@router.get("/{jobid}/archive/{path:path}")
Expand All @@ -419,7 +437,7 @@ async def retrieve_job_subdirectory_as_archive( # noqa: WPS211
exclude: Optional[list[str]] = Query(default=None),
exclude_dirs: Optional[list[str]] = Query(default=None),
filename: Optional[str] = Query(default=None),
) -> FileResponse:
) -> StreamingResponse:
"""Download job output as archive.
Args:
Expand All @@ -434,7 +452,7 @@ async def retrieve_job_subdirectory_as_archive( # noqa: WPS211
If not provided, uses id of the job.
Returns:
FileResponse: Archive containing the output of job_dir
StreamingResponse: Archive containing the output of job_dir
"""
subdirectory = _parse_subdirectory(path, job_dir)
Expand Down
Loading

0 comments on commit f041ab6

Please sign in to comment.