Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kill, delete and remove endpoints #64

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export DIRACX_DB_URL_AUTHDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBLOGGINGDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_SANDBOXMETADATADB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_TASKQUEUEDB="sqlite+aiosqlite:///:memory:"
export DIRACX_SERVICE_AUTH_TOKEN_KEY="file://${signing_key}"
export DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS='["http://'$(hostname| tr -s '[:upper:]' '[:lower:]')':8000/docs/oauth2-redirect"]'
export DIRACX_SANDBOX_STORE_BUCKET_NAME=sandboxes
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ diracx.db.sql =
JobDB = diracx.db.sql:JobDB
JobLoggingDB = diracx.db.sql:JobLoggingDB
SandboxMetadataDB = diracx.db.sql:SandboxMetadataDB
#DummyDB = diracx.db:DummyDB
TaskQueueDB = diracx.db.sql:TaskQueueDB
#DummyDB = diracx.db.sql:DummyDB
diracx.db.os =
JobParametersDB = diracx.db.os:JobParametersDB
diracx.services =
Expand Down
4 changes: 1 addition & 3 deletions src/diracx/cli/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def generate_cs(
DefaultGroup=user_group,
Users={},
Groups={
user_group: GroupConfig(
JobShare=None, Properties={"NormalUser"}, Quota=None, Users=set()
)
user_group: GroupConfig(Properties={"NormalUser"}, Quota=None, Users=set())
},
)
config = Config(
Expand Down
228 changes: 228 additions & 0 deletions src/diracx/client/aio/operations/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
build_auth_userinfo_request,
build_config_serve_config_request,
build_jobs_delete_bulk_jobs_request,
build_jobs_delete_single_job_request,
build_jobs_get_job_status_bulk_request,
build_jobs_get_job_status_history_bulk_request,
build_jobs_get_sandbox_file_request,
Expand All @@ -43,6 +44,9 @@
build_jobs_get_single_job_status_request,
build_jobs_initiate_sandbox_upload_request,
build_jobs_kill_bulk_jobs_request,
build_jobs_kill_single_job_request,
build_jobs_remove_bulk_jobs_request,
build_jobs_remove_single_job_request,
build_jobs_reschedule_bulk_jobs_request,
build_jobs_reschedule_single_job_request,
build_jobs_search_request,
Expand Down Expand Up @@ -1283,6 +1287,64 @@ async def kill_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any:

return deserialized

@distributed_trace_async
async def remove_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any:
"""Remove Bulk Jobs.

Fully remove a list of jobs from the WMS databases.

WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and the delete endpoint should be used instead for any other purpose.

:keyword job_ids: Required.
:paramtype job_ids: list[int]
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_remove_bulk_jobs_request(
job_ids=job_ids,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def get_job_status_bulk(
self, *, job_ids: List[int], **kwargs: Any
Expand Down Expand Up @@ -1948,6 +2010,172 @@ async def get_single_job(self, job_id: int, **kwargs: Any) -> Any:

return deserialized

@distributed_trace_async
async def delete_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Delete Single Job.

Delete a job by killing and setting the job status to DELETED.

:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_delete_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def kill_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Kill Single Job.

Kill a job.

:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_kill_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def remove_single_job(self, job_id: int, **kwargs: Any) -> Any:
"""Remove Single Job.

Fully remove a job from the WMS databases.

WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and the delete endpoint should be used instead.

:param job_id: Required.
:type job_id: int
:return: any
:rtype: any
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}

cls: ClsType[Any] = kwargs.pop("cls", None)

request = build_jobs_remove_single_job_request(
job_id=job_id,
headers=_headers,
params=_params,
)
request.url = self._client.format_url(request.url)

_stream = False
pipeline_response: PipelineResponse = (
await self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(
status_code=response.status_code, response=response, error_map=error_map
)
raise HttpResponseError(response=response)

deserialized = self._deserialize("object", pipeline_response)

if cls:
return cls(pipeline_response, deserialized, {})

return deserialized

@distributed_trace_async
async def get_single_job_status(
self, job_id: int, **kwargs: Any
Expand Down
4 changes: 4 additions & 0 deletions src/diracx/client/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._models import SandboxInfo
from ._models import SandboxUploadResponse
from ._models import ScalarSearchSpec
from ._models import ScalarSearchSpecValue
from ._models import SetJobStatusReturn
from ._models import SortSpec
from ._models import SortSpecDirection
Expand All @@ -32,6 +33,7 @@
from ._models import ValidationError
from ._models import ValidationErrorLocItem
from ._models import VectorSearchSpec
from ._models import VectorSearchSpecValues

from ._enums import ChecksumAlgorithm
from ._enums import Enum0
Expand Down Expand Up @@ -68,6 +70,7 @@
"SandboxInfo",
"SandboxUploadResponse",
"ScalarSearchSpec",
"ScalarSearchSpecValue",
"SetJobStatusReturn",
"SortSpec",
"SortSpecDirection",
Expand All @@ -78,6 +81,7 @@
"ValidationError",
"ValidationErrorLocItem",
"VectorSearchSpec",
"VectorSearchSpecValues",
"ChecksumAlgorithm",
"Enum0",
"Enum1",
Expand Down
Loading
Loading