Skip to content

Commit

Permalink
Merge pull request #86 from i-VRESSE/linkjob
Browse files Browse the repository at this point in the history
Create `bartender link` subcommand.
  • Loading branch information
sverhoeven authored Feb 23, 2024
2 parents 129b068 + fe7343d commit 57c55f7
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 16 deletions.
11 changes: 11 additions & 0 deletions docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ If you want to generate a token with the
`docker compose -f deploy/docker-compose.yml exec api bartender generate-token` command
you should uncomment the private key volume bind in `deploy/docker-compose.yml`.
See [configuration.md#authentication](configuration.md#authentication).

## Link external directory as job

If you have a directory outside the bartender job root directory
that is the output of one the configured applications in bartender
then you might want to make it available as a job in bartender.

To do this you can create a symlink to the external directory
in the bartender job root directory,
by running the `bartender link` command.
See `bartender link --help` for more information.
77 changes: 77 additions & 0 deletions src/bartender/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import uvicorn

from bartender.config import build_config
from bartender.link import link_job
from bartender.schedulers.arq import ArqSchedulerConfig, run_workers
from bartender.settings import settings
from bartender.user import generate_token_subcommand
Expand Down Expand Up @@ -92,6 +93,7 @@ def build_parser() -> ArgumentParser:
perform_sp.set_defaults(func=perform)

add_generate_token_subcommand(subparsers)
add_link_job_subcommand(subparsers)

return parser

Expand Down Expand Up @@ -175,6 +177,81 @@ def add_generate_token_subcommand(
generate_token_sp.set_defaults(func=generate_token_subcommand)


def add_link_job_subcommand(subparsers: Any) -> None:
"""
Add the 'link' subcommand to the given subparsers.
Args:
subparsers (Any): The subparsers object to add the 'link' subcommand to.
"""
link_job_sp = subparsers.add_parser(
"link",
help="Link external directory as job",
formatter_class=Formatter,
description=dedent( # noqa: WPS462 -- docs
"""\
Link external directory as job.
The external directory should have same shape
as a completed job for the selected application.
For haddock3 application, the directory should have:
output/ directory and workflow.cfg, stderr.txt,
stdout.txt, returncode files.
Example:
```shell
# Link a directory as job
bartender link-job \\
--submitter someone \\
--application haddock3 \\
/path/to/myjob
# Prints job identifier
# The job in db has
# - name=internal_id=myjob
# - destination=local
# - state=ok
# - created_on=updated_on=now
```
""",
),
)
link_job_sp.add_argument(
"directory",
type=Path,
help=dedent( # noqa: WPS462 -- docs
"""Directory to link as job.
Its content should be readable by the user running bartender serve.
To run an interactive application on the linked job,
the directory should be writable by the user running bartender serve.
""",
),
)
link_job_sp.add_argument(
"--submitter",
default="someone",
help="Submitter of job",
)
link_job_sp.add_argument(
"--application",
default="ln",
help=dedent( # noqa: WPS462 -- docs
"""Application of job.
To run interative application on the linked job,
the application of the job should match the name of
the `job_application` of the interactive application.
""",
),
)
link_job_sp.add_argument(
"--config",
default=Path("config.yaml"),
type=Path,
help="Configuration with schedulers that need arq workers",
)
link_job_sp.set_defaults(func=link_job)


def main(argv: list[str] = sys.argv[1:]) -> None:
"""Entrypoint of the application.
Expand Down
75 changes: 75 additions & 0 deletions src/bartender/link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
from os import symlink
from pathlib import Path

from bartender.config import build_config
from bartender.db.dao.job_dao import JobDAO
from bartender.db.session import make_engine, make_session_factory


def link_job(
directory: Path,
submitter: str,
application: str,
config: Path,
destination: str = "local",
) -> None:
"""Link external directory as job.
Args:
directory: Directory to link as job
submitter: Submitter of job
application: Application of job
config: Configuration with schedulers that need arq workers
destination: Destination of job
"""
validated_config = build_config(config)
job_root_dir = validated_config.job_root_dir
name = directory.name

# Create job in db
job_id = asyncio.run(
create_job_in_db(name, application, submitter, destination),
)

# Sym link directory to job directory
job_dir = job_root_dir / str(job_id)
symlink(directory.absolute(), job_dir)
print(job_id) # noqa: WPS421 -- user feedback


async def create_job_in_db(
name: str,
application: str,
submitter: str,
destination: str,
) -> int:
"""
Create a job in the database.
Args:
name: The name of the job.
application: The application associated with the job.
submitter: The submitter of the job.
destination: The destination of the job.
Returns:
The ID of the created job.
Raises:
IndexError: If failed to create a database entry for the job.
"""
engine = make_engine()
factory = make_session_factory(engine)
async with factory() as session:
dao = JobDAO(session)
job_id = await dao.create_job(name, application, submitter)
if job_id is None:
raise IndexError("Failed to create database entry for job")
await dao.update_internal_job_id(
job_id,
internal_job_id=name,
destination=destination,
)
await dao.update_job_state(job_id, "ok")
return job_id
36 changes: 24 additions & 12 deletions src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,25 @@ def get_dir_of_completed_job(
},
response_class=FileResponse,
)
def retrieve_job_files(
def retrieve_job_file(
path: str,
job_dir: CurrentCompletedJobDir,
) -> FileResponse:
"""Retrieve files from a completed job.
"""Retrieve file from a completed job.
Args:
path: Path to file that job has produced.
job_dir: Directory with job output files.
Raises:
HTTPException: When file is not found or is outside job directory.
HTTPException: When file is not found or is not a file
or is outside job directory.
Returns:
The file content.
The file contents.
"""
try:
full_path = (job_dir / path).expanduser().resolve(strict=True)
if not full_path.is_relative_to(job_dir):
raise FileNotFoundError()
full_path = _resolve_path(path, job_dir)
if not full_path.is_file():
raise FileNotFoundError()
except FileNotFoundError as exc:
Expand Down Expand Up @@ -234,14 +233,21 @@ async def get_completed_logs(
Raises:
ValueError: When job has no destination.
HTTPException: When a log file is not found.
Returns:
The standard output and error.
"""
if not job.destination or not job.internal_id:
raise ValueError("Job has no destination")
destination = context.destinations[job.destination]
return await destination.scheduler.logs(job.internal_id, job_dir)
try:
return await destination.scheduler.logs(job.internal_id, job_dir)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File not found",
) from exc


CurrentLogs = Annotated[Tuple[str, str], Depends(get_completed_logs)]
Expand Down Expand Up @@ -430,11 +436,17 @@ async def retrieve_job_subdirectory_as_archive( # noqa: WPS211
)


def _resolve_path(path: str, job_dir: Path) -> Path:
resolved_job_dir = job_dir.resolve(strict=True)
resolved = (resolved_job_dir / path).resolve(strict=True)
if not resolved.is_relative_to(resolved_job_dir):
raise FileNotFoundError()
return resolved


def _parse_subdirectory(path: str, job_dir: Path) -> Path:
try:
subdirectory = (job_dir / path).resolve(strict=True)
if not subdirectory.is_relative_to(job_dir):
raise FileNotFoundError()
subdirectory = _resolve_path(path, job_dir)
if not subdirectory.is_dir():
raise FileNotFoundError()
except FileNotFoundError as exc:
Expand All @@ -443,7 +455,7 @@ def _parse_subdirectory(path: str, job_dir: Path) -> Path:
detail="File not found",
) from exc

return subdirectory
return job_dir / path


def get_interactive_app(
Expand Down
24 changes: 24 additions & 0 deletions tests/test_walk_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,27 @@ async def test_given_single_dir_with_file_and_dir_as_subdir(
],
)
assert result == expected

@pytest.mark.anyio
async def test_given_symlink_as_job_dir(self, tmp_path: Path) -> None:
(tmp_path / "somedir").mkdir()
(tmp_path / "somedir" / "somefile").write_text("sometext")
(tmp_path / "symlink").symlink_to(tmp_path / "somedir")

result = await walk_dir(tmp_path / "symlink", tmp_path, max_depth=2)

expected = DirectoryItem(
name="symlink",
path=Path("symlink"),
is_dir=True,
is_file=False,
children=[
DirectoryItem(
name="somefile",
path=Path("symlink/somefile"),
is_dir=False,
is_file=True,
),
],
)
assert result == expected
Loading

0 comments on commit 57c55f7

Please sign in to comment.