diff --git a/src/diracx/client/aio/operations/_operations.py b/src/diracx/client/aio/operations/_operations.py index 41506aea..49e113bc 100644 --- a/src/diracx/client/aio/operations/_operations.py +++ b/src/diracx/client/aio/operations/_operations.py @@ -35,6 +35,7 @@ build_auth_userinfo_request, build_config_serve_config_request, build_jobs_delete_bulk_jobs_request, + build_jobs_delete_single_job_request, build_jobs_get_job_status_bulk_request, build_jobs_get_job_status_history_bulk_request, build_jobs_get_sandbox_file_request, @@ -43,6 +44,9 @@ build_jobs_get_single_job_status_request, build_jobs_initiate_sandbox_upload_request, build_jobs_kill_bulk_jobs_request, + build_jobs_kill_single_job_request, + build_jobs_remove_bulk_jobs_request, + build_jobs_remove_single_job_request, build_jobs_reschedule_bulk_jobs_request, build_jobs_reschedule_single_job_request, build_jobs_search_request, @@ -1283,6 +1287,64 @@ async def kill_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any: return deserialized + @distributed_trace_async + async def remove_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any: + """Remove Bulk Jobs. + + Fully remove a list of jobs from the WMS databases. + + WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS + and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should + be removed, and the delete endpoint should be used instead for any other purpose. + + :keyword job_ids: Required. + :paramtype job_ids: list[int] + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_remove_bulk_jobs_request( + job_ids=job_ids, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + await self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + @distributed_trace_async async def get_job_status_bulk( self, *, job_ids: List[int], **kwargs: Any @@ -1948,6 +2010,172 @@ async def get_single_job(self, job_id: int, **kwargs: Any) -> Any: return deserialized + @distributed_trace_async + async def delete_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Delete Single Job. + + Delete a job by killing and setting the job status to DELETED. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_delete_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + await self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + + @distributed_trace_async + async def kill_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Kill Single Job. + + Kill a job. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_kill_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + await self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + + @distributed_trace_async + async def remove_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Remove Single Job. + + Fully remove a job from the WMS databases. + + WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS + and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should + be removed, and the delete endpoint should be used instead. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_remove_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + await self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + @distributed_trace_async async def get_single_job_status( self, job_id: int, **kwargs: Any diff --git a/src/diracx/client/models/__init__.py b/src/diracx/client/models/__init__.py index 34678bc5..71619e80 100644 --- a/src/diracx/client/models/__init__.py +++ b/src/diracx/client/models/__init__.py @@ -22,6 +22,7 @@ from ._models import SandboxInfo from ._models import SandboxUploadResponse from ._models import ScalarSearchSpec +from ._models import ScalarSearchSpecValue from ._models import SetJobStatusReturn from ._models import SortSpec from ._models import SortSpecDirection @@ -32,6 +33,7 @@ from ._models import ValidationError from ._models import ValidationErrorLocItem from ._models import VectorSearchSpec +from ._models import VectorSearchSpecValues from ._enums import ChecksumAlgorithm from ._enums import Enum0 @@ -68,6 +70,7 @@ "SandboxInfo", "SandboxUploadResponse", "ScalarSearchSpec", + "ScalarSearchSpecValue", "SetJobStatusReturn", "SortSpec", "SortSpecDirection", @@ -78,6 +81,7 @@ "ValidationError", "ValidationErrorLocItem", "VectorSearchSpec", + "VectorSearchSpecValues", "ChecksumAlgorithm", "Enum0", "Enum1", diff --git a/src/diracx/client/models/_models.py b/src/diracx/client/models/_models.py index e3803575..7d997f8a 100644 --- a/src/diracx/client/models/_models.py +++ b/src/diracx/client/models/_models.py @@ -714,7 +714,7 @@ class ScalarSearchSpec(_serialization.Model): "like". :vartype operator: str or ~client.models.ScalarSearchOperator :ivar value: Value. Required. - :vartype value: str + :vartype value: ~client.models.ScalarSearchSpecValue """ _validation = { @@ -726,7 +726,7 @@ class ScalarSearchSpec(_serialization.Model): _attribute_map = { "parameter": {"key": "parameter", "type": "str"}, "operator": {"key": "operator", "type": "str"}, - "value": {"key": "value", "type": "str"}, + "value": {"key": "value", "type": "ScalarSearchSpecValue"}, } def __init__( @@ -734,7 +734,7 @@ def __init__( *, parameter: str, operator: Union[str, "_models.ScalarSearchOperator"], - value: str, + value: "_models.ScalarSearchSpecValue", **kwargs: Any ) -> None: """ @@ -744,7 +744,7 @@ def __init__( "like". :paramtype operator: str or ~client.models.ScalarSearchOperator :keyword value: Value. Required. - :paramtype value: str + :paramtype value: ~client.models.ScalarSearchSpecValue """ super().__init__(**kwargs) self.parameter = parameter @@ -752,6 +752,16 @@ def __init__( self.value = value +class ScalarSearchSpecValue(_serialization.Model): + """Value.""" + + _attribute_map = {} + + def __init__(self, **kwargs: Any) -> None: + """ """ + super().__init__(**kwargs) + + class SetJobStatusReturn(_serialization.Model): """SetJobStatusReturn. @@ -1093,7 +1103,7 @@ class VectorSearchSpec(_serialization.Model): :ivar operator: An enumeration. Required. Known values are: "in" and "not in". :vartype operator: str or ~client.models.VectorSearchOperator :ivar values: Values. Required. - :vartype values: list[str] + :vartype values: ~client.models.VectorSearchSpecValues """ _validation = { @@ -1105,7 +1115,7 @@ class VectorSearchSpec(_serialization.Model): _attribute_map = { "parameter": {"key": "parameter", "type": "str"}, "operator": {"key": "operator", "type": "str"}, - "values": {"key": "values", "type": "[str]"}, + "values": {"key": "values", "type": "VectorSearchSpecValues"}, } def __init__( @@ -1113,7 +1123,7 @@ def __init__( *, parameter: str, operator: Union[str, "_models.VectorSearchOperator"], - values: List[str], + values: "_models.VectorSearchSpecValues", **kwargs: Any ) -> None: """ @@ -1122,7 +1132,7 @@ def __init__( :keyword operator: An enumeration. Required. Known values are: "in" and "not in". :paramtype operator: str or ~client.models.VectorSearchOperator :keyword values: Values. Required. - :paramtype values: list[str] + :paramtype values: ~client.models.VectorSearchSpecValues """ super().__init__(**kwargs) self.parameter = parameter @@ -1130,6 +1140,16 @@ def __init__( self.values = values +class VectorSearchSpecValues(_serialization.Model): + """Values.""" + + _attribute_map = {} + + def __init__(self, **kwargs: Any) -> None: + """ """ + super().__init__(**kwargs) + + class VOInfo(_serialization.Model): """VOInfo. diff --git a/src/diracx/client/operations/_operations.py b/src/diracx/client/operations/_operations.py index 5cb6e97a..aa3101d6 100644 --- a/src/diracx/client/operations/_operations.py +++ b/src/diracx/client/operations/_operations.py @@ -410,6 +410,28 @@ def build_jobs_kill_bulk_jobs_request( ) +def build_jobs_remove_bulk_jobs_request( + *, job_ids: List[int], **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/remove" + + # Construct parameters + _params["job_ids"] = _SERIALIZER.query("job_ids", job_ids, "[int]") + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest( + method="POST", url=_url, params=_params, headers=_headers, **kwargs + ) + + def build_jobs_get_job_status_bulk_request( *, job_ids: List[int], **kwargs: Any ) -> HttpRequest: @@ -597,6 +619,63 @@ def build_jobs_get_single_job_request(job_id: int, **kwargs: Any) -> HttpRequest return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) +def build_jobs_delete_single_job_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _format_url_section(_url, **path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="DELETE", url=_url, headers=_headers, **kwargs) + + +def build_jobs_kill_single_job_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/kill" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _format_url_section(_url, **path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="POST", url=_url, headers=_headers, **kwargs) + + +def build_jobs_remove_single_job_request(job_id: int, **kwargs: Any) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/jobs/{job_id}/remove" + path_format_arguments = { + "job_id": _SERIALIZER.url("job_id", job_id, "int"), + } + + _url: str = _format_url_section(_url, **path_format_arguments) # type: ignore + + # Construct headers + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + + return HttpRequest(method="POST", url=_url, headers=_headers, **kwargs) + + def build_jobs_get_single_job_status_request(job_id: int, **kwargs: Any) -> HttpRequest: _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) @@ -1889,6 +1968,64 @@ def kill_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any: return deserialized + @distributed_trace + def remove_bulk_jobs(self, *, job_ids: List[int], **kwargs: Any) -> Any: + """Remove Bulk Jobs. + + Fully remove a list of jobs from the WMS databases. + + WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS + and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should + be removed, and the delete endpoint should be used instead for any other purpose. + + :keyword job_ids: Required. + :paramtype job_ids: list[int] + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_remove_bulk_jobs_request( + job_ids=job_ids, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + @distributed_trace def get_job_status_bulk( self, *, job_ids: List[int], **kwargs: Any @@ -2552,6 +2689,172 @@ def get_single_job(self, job_id: int, **kwargs: Any) -> Any: return deserialized + @distributed_trace + def delete_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Delete Single Job. + + Delete a job by killing and setting the job status to DELETED. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_delete_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + + @distributed_trace + def kill_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Kill Single Job. + + Kill a job. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_kill_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + + @distributed_trace + def remove_single_job(self, job_id: int, **kwargs: Any) -> Any: + """Remove Single Job. + + Fully remove a job from the WMS databases. + + WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS + and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should + be removed, and the delete endpoint should be used instead. + + :param job_id: Required. + :type job_id: int + :return: any + :rtype: any + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[Any] = kwargs.pop("cls", None) + + request = build_jobs_remove_single_job_request( + job_id=job_id, + headers=_headers, + params=_params, + ) + request.url = self._client.format_url(request.url) + + _stream = False + pipeline_response: PipelineResponse = ( + self._client._pipeline.run( # pylint: disable=protected-access + request, stream=_stream, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error( + status_code=response.status_code, response=response, error_map=error_map + ) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("object", pipeline_response) + + if cls: + return cls(pipeline_response, deserialized, {}) + + return deserialized + @distributed_trace def get_single_job_status( self, job_id: int, **kwargs: Any diff --git a/src/diracx/db/sql/jobs/db.py b/src/diracx/db/sql/jobs/db.py index 30d18126..21635ea2 100644 --- a/src/diracx/db/sql/jobs/db.py +++ b/src/diracx/db/sql/jobs/db.py @@ -8,7 +8,7 @@ from sqlalchemy import delete, func, insert, select, update from sqlalchemy.exc import IntegrityError, NoResultFound -from diracx.core.exceptions import InvalidQueryError +from diracx.core.exceptions import InvalidQueryError, JobNotFound from diracx.core.models import ( JobMinorStatus, JobStatus, @@ -17,8 +17,6 @@ ScalarSearchOperator, ScalarSearchSpec, ) -from diracx.core.exceptions import InvalidQueryError, JobNotFound -from diracx.core.models import JobStatus, JobStatusReturn, LimitedJobStatusReturn from diracx.core.properties import JOB_SHARING, SecurityProperty from ..utils import BaseSQLDB, apply_search_filters