Skip to content

Commit

Permalink
Started working on DELETE /job/{id}
Browse files Browse the repository at this point in the history
Refs #37
  • Loading branch information
sverhoeven committed Mar 7, 2024
1 parent 5e18bfd commit ae3a089
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 7 deletions.
15 changes: 15 additions & 0 deletions src/bartender/db/dao/job_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,20 @@ async def set_job_name(self, jobid: int, user: str, name: str) -> None:
job.name = name
await self.session.commit()

async def delete_job(self, jobid: int, user: str) -> None:
"""Delete job from database.
Args:
jobid: name of job instance.
user: Which user wants to delete the job.
Raises:
IndexError: if job was not found or user is not the owner.
"""
job = await self.session.get(Job, jobid)
if job is None or job.submitter != user:
raise IndexError("Job not found")
await self.session.delete(job)
await self.session.commit()


CurrentJobDAO = Annotated[JobDAO, Depends()]
10 changes: 10 additions & 0 deletions src/bartender/filesystems/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,15 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
target: Local directory to copy to.
"""


async def delete(self, description: JobDescription) -> None:
"""Delete job directory of description.
Args:
description: Remote directory to delete.
"""
# after download or cancellation you might want to
# delete the remote job directory

async def close(self) -> None:
"""Close filesystem."""
7 changes: 7 additions & 0 deletions src/bartender/filesystems/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ async def download(
target: Local directory to copy to.
"""

async def delete(self, description: JobDescription) -> None:
"""Delete job directory of description.
Args:
description: Remote directory to delete.
"""

async def close(self) -> None:
"""Close filesystem."""

Expand Down
14 changes: 11 additions & 3 deletions src/bartender/filesystems/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,22 @@ async def download(self, src: JobDescription, target: JobDescription) -> None:
remotepath = str(target.job_dir.parent)
await sftp.get(localpaths, remotepath, recurse=True)

async def delete(self, description: JobDescription) -> None:
"""Delete job directory of description.
Args:
description: Remote directory to delete.
"""
if self.conn is None:
self.conn = await ssh_connect(self.ssh_config)
async with self.conn.start_sftp_client() as sftp:
await sftp.rmtree(str(description.job_dir))

async def close(self) -> None:
"""Close SSH connection."""
if self.conn:
self.conn.close()

# TODO add delete(description),
# after download you might want to delete the remote job dir

def __eq__(self, other: object) -> bool:
return (
isinstance(other, SftpFileSystem)
Expand Down
39 changes: 35 additions & 4 deletions src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from os import getloadavg, sched_getaffinity
from pathlib import Path
from shutil import rmtree
from typing import Annotated, Literal, Optional, Tuple, Type, Union

from aiofiles.os import unlink
from fastapi import (
APIRouter,
BackgroundTasks,
Expand All @@ -27,6 +29,7 @@
from bartender.context import CurrentContext, get_job_root_dir
from bartender.db.dao.job_dao import CurrentJobDAO
from bartender.db.models.job_model import MAX_LENGTH_NAME, CompletedStates, Job
from bartender.destinations import Destination
from bartender.filesystems.queue import CurrentFileOutStagingQueue
from bartender.walk_dir import DirectoryItem, walk_dir
from bartender.web.api.job.interactive_apps import InteractiveAppResult, run
Expand Down Expand Up @@ -218,11 +221,20 @@ def retrieve_job_file(

CurrentJob = Annotated[Job, Depends(retrieve_job)]

def get_destination(
job: CurrentJob,
context: CurrentContext,
):
if not job.destination or not job.internal_id:
raise ValueError("Job has no destination")
return context.destinations[job.destination]

CurrentDestination = Annotated[Destination, Depends(get_destination)]

async def get_completed_logs(
job_dir: CurrentCompletedJobDir,
job: CurrentJob,
context: CurrentContext,
destination: CurrentDestination,
) -> Tuple[str, str]:
"""Get the standard output and error of a completed job.
Expand All @@ -238,9 +250,6 @@ async def get_completed_logs(
Returns:
The standard output and error.
"""
if not job.destination or not job.internal_id:
raise ValueError("Job has no destination")
destination = context.destinations[job.destination]
try:
return await destination.scheduler.logs(job.internal_id, job_dir)
except FileNotFoundError as exc:
Expand Down Expand Up @@ -565,3 +574,25 @@ async def rename_job_name(
status_code=status.HTTP_404_NOT_FOUND,
detail="Job not found",
) from exc

@router.delete("/{jobid}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_job(
jobid: int,
job_dao: CurrentJobDAO,
job: Annotated[Job, Depends(retrieve_job)],
user: CurrentUser,
job_dir: Annotated[Path, Depends(get_job_dir)],
destination: CurrentDestination,
) -> None:
CancelableStates = { "queued", "running" }
if job.state in CancelableStates:
await destination.scheduler.cancel(job.internal_id)
# TODO cleanup remote job directory

if job_dir.is_symlink():
# We want to remove the symlink not its target
await unlink(job_dir)
else:
await async_wrap(rmtree)(job_dir)

await job_dao.delete_job(jobid, user.username)
56 changes: 56 additions & 0 deletions tests/web/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,3 +916,59 @@ async def test_rename_job_name_wrong_user(

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response.json() == {"detail": "Job not found"}

@pytest.mark.anyio
async def test_delete_completed_job(
fastapi_app: FastAPI,
client: AsyncClient,
auth_headers: Dict[str, str],
mock_ok_job: int,
job_root_dir: Path,
):
job_id = str(mock_ok_job)
url = fastapi_app.url_path_for("delete_job", jobid=job_id)
response = await client.delete(url, headers=auth_headers)

assert response.status_code == status.HTTP_204_NO_CONTENT

job_dir = job_root_dir / job_id
assert not job_dir.exists()

response2 = await client.delete(url, headers=auth_headers)
assert response2.status_code == status.HTTP_404_NOT_FOUND

@pytest.mark.anyio
async def test_delete_queued_job(
dbsession: AsyncSession,
current_user: User,
demo_context: Context,
) -> None:
cancel_mock = Mock()

class FakeScheduler(AbstractScheduler):
async def state(self, job_id: str) -> State:
return "queued"

async def states(self, job_ids: list[str]) -> list[State]:
return ["queued"]

async def submit(self, description: JobDescription) -> str:
raise NotImplementedError()

async def cancel(self, job_id: str) -> None:
cancel_mock(job_id)

async def close(self) -> None:
raise NotImplementedError()

demo_context.destinations["dest1"].scheduler = FakeScheduler()


dao = JobDAO(dbsession)
job_id, download_mock = await prepare_job(
db_state="queued",
scheduler_state="queued",
dao=dao,
current_user=current_user,
demo_context=demo_context,
)

0 comments on commit ae3a089

Please sign in to comment.