Skip to content

Commit

Permalink
Drop jobdir/meta file + refactor bartender.filesystem modules to stag…
Browse files Browse the repository at this point in the history
…ing and walk_dir module
  • Loading branch information
sverhoeven committed Feb 19, 2024
1 parent 83f2384 commit 129b068
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 226 deletions.
36 changes: 0 additions & 36 deletions src/bartender/filesystem/__init__.py

This file was deleted.

40 changes: 0 additions & 40 deletions src/bartender/filesystem/assemble_job.py

This file was deleted.

56 changes: 0 additions & 56 deletions src/bartender/filesystem/stage_job_input.py

This file was deleted.

115 changes: 115 additions & 0 deletions src/bartender/staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from pathlib import Path
from shutil import unpack_archive
from typing import Literal, Optional

from aiofiles import open
from aiofiles.os import mkdir, remove
from fastapi import HTTPException, UploadFile
from starlette import status

from bartender.async_utils import async_wrap
from bartender.config import ApplicatonConfiguration

CHUNK_SIZE = 1024 * 1024 # 1Mb


class UnsupportedContentTypeError(Exception):
"""When content type is unsupported."""


def has_needed_files(
application: ApplicatonConfiguration,
job_dir: Path,
) -> Literal[True]:
"""Check if files required by application are present in job directory.
Args:
application: Name of application to check config file for.
job_dir: In which directory to look.
Raises:
IndexError: When one or more needed files can not be found
Returns:
True when found or no files where needed.
"""
missing_files = []
for needed_file in application.upload_needs:
file = job_dir / needed_file
file_exists = file.exists() and file.is_file()
if not file_exists:
missing_files.append(needed_file)
if missing_files:
raise IndexError(
f"Application requires files {missing_files}, "
"but where not found in uploaded zip archive",
)
return True


async def create_job_dir(job_id: int, job_root_dir: Path) -> Path:
"""Create job directory.
Args:
job_id: id of the job.
job_root_dir: Root directory for all jobs.
Raises:
HTTPException: When job directory could not be made.
Returns:
Directory of job.
"""
job_dir: Path = job_root_dir / str(job_id)

try:
await mkdir(job_dir)
except FileExistsError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create directory for job.",
) from exc
return job_dir


def _is_valid_content_type(content_type: Optional[str]) -> bool:
supported_upload_content_types = {
"application/zip",
"application/x-zip-compressed",
} # TODO add support for other formats like tar.gz, tar.bz2, .7z?
if content_type not in supported_upload_content_types:
raise UnsupportedContentTypeError(
f"Unable to stage job input wrong mime type {content_type}, "
+ f"supported are {supported_upload_content_types}",
)
return True


async def unpack_upload(
job_dir: Path,
archive: UploadFile,
dest_fn: str = "archive.zip",
) -> None:
"""Unpack archive file to job id directory.
Args:
job_dir: Where to put archive file.
archive: The archive file with async read method.
dest_fn: Temporary archive filename.
"""
_is_valid_content_type(archive.content_type)

# Copy archive to disk
job_archive = job_dir / dest_fn
# If archive contains
async with open(job_archive, "wb") as out_file:
while content := await archive.read(CHUNK_SIZE):
if isinstance(content, str):
break # type narrowing for mypy, content is always bytes
await out_file.write(content)

if archive.content_type in {"application/zip", "application/x-zip-compressed"}:
await async_wrap(unpack_archive)(job_archive, extract_dir=job_dir, format="zip")
# TODO what happens when archive contains archive.zip, will it overwrite itself?

await remove(job_archive) # no longer needed?
File renamed without changes.
9 changes: 3 additions & 6 deletions src/bartender/web/api/applications/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from bartender.config import ApplicatonConfiguration
from bartender.context import Context, CurrentContext
from bartender.db.dao.job_dao import CurrentJobDAO
from bartender.filesystem import has_needed_files
from bartender.filesystem.assemble_job import assemble_job
from bartender.filesystem.stage_job_input import stage_job_input
from bartender.staging import create_job_dir, has_needed_files, unpack_upload
from bartender.web.api.applications.submit import submit
from bartender.web.users import CurrentUser, User

Expand Down Expand Up @@ -58,16 +56,15 @@ async def upload_job( # noqa: WPS210, WPS211
if job_id is None:
raise IndexError("Failed to create database entry for job")

job_dir = assemble_job(
job_dir = await create_job_dir(
job_id,
submitter.apikey,
context.job_root_dir,
)
# TODO uploaded file can be big, and thus take long time to unpack,
# not nice to do it in request/response handling,
# as request could timeout on consumer side.
# Move to background task or have dedicated routes for preparing input files.
await stage_job_input(job_dir, upload)
await unpack_upload(job_dir, upload)
has_needed_files(context.applications[application], job_dir)
payload = await _validate_form(request, context.applications[application])

Expand Down
2 changes: 1 addition & 1 deletion src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from bartender.context import CurrentContext, get_job_root_dir
from bartender.db.dao.job_dao import CurrentJobDAO
from bartender.db.models.job_model import MAX_LENGTH_NAME, CompletedStates, Job
from bartender.filesystem.walk_dir import DirectoryItem, walk_dir
from bartender.filesystems.queue import CurrentFileOutStagingQueue
from bartender.walk_dir import DirectoryItem, walk_dir
from bartender.web.api.job.interactive_apps import InteractiveAppResult, run
from bartender.web.api.job.schema import JobModelDTO
from bartender.web.api.job.sync import sync_state, sync_states
Expand Down
78 changes: 78 additions & 0 deletions tests/test_staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from pathlib import Path

import pytest

from bartender.config import ApplicatonConfiguration
from bartender.staging import create_job_dir, has_needed_files


@pytest.mark.anyio
async def test_create_job_dir(job_root_dir: Path) -> None:
"""Test the assembly the job."""
job_id = 1

await create_job_dir(job_id, job_root_dir)

job_dir = job_root_dir / str(job_id)
assert job_dir.exists()


class TestHasNeededFiles:
@pytest.fixture
def job_dir(self, tmp_path: Path) -> Path:
return tmp_path

@pytest.fixture
def application(self) -> ApplicatonConfiguration:
return ApplicatonConfiguration(
command_template="wc config.ini data.csv",
upload_needs=["config.ini", "data.csv"],
)

def test_has_needed_files_with_existing_files(
self,
job_dir: Path,
application: ApplicatonConfiguration,
) -> None:
# Create the needed files
(job_dir / "config.ini").touch()
(job_dir / "data.csv").touch()

# Check if the files exist
result = has_needed_files(application, job_dir)

# Assert that the function returns True
assert result is True

def test_has_needed_files_with_missing_files(
self,
job_dir: Path,
application: ApplicatonConfiguration,
) -> None:
# Check if the files are missing
with pytest.raises(IndexError):
has_needed_files(application, job_dir)

def test_has_needed_files_with_partial_missing_files(
self,
job_dir: Path,
application: ApplicatonConfiguration,
) -> None:
# Create one of the needed files
(job_dir / "config.ini").touch()

# Check if the files are missing
with pytest.raises(IndexError):
has_needed_files(application, job_dir)

def test_has_needed_files_with_no_files_needed(
self,
job_dir: Path,
application: ApplicatonConfiguration,
) -> None:
# Remove the upload_needs list
application.upload_needs = []

# Check if the function returns True
result = has_needed_files(application, job_dir)
assert result is True
Loading

0 comments on commit 129b068

Please sign in to comment.