From 91740c240c8eb5ce544aa42b9f76ce9f407d32ad Mon Sep 17 00:00:00 2001 From: Filippo Ledda Date: Mon, 5 Feb 2024 15:23:21 +0100 Subject: [PATCH] #751 delete workflows on workspace delete --- .../controllers/crud/crud_controllers.py | 13 +++ .../server/workspaces/service/crud_service.py | 88 ++++++++++--------- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/applications/workspaces/server/workspaces/controllers/crud/crud_controllers.py b/applications/workspaces/server/workspaces/controllers/crud/crud_controllers.py index e10baf98..68df4eae 100644 --- a/applications/workspaces/server/workspaces/controllers/crud/crud_controllers.py +++ b/applications/workspaces/server/workspaces/controllers/crud/crud_controllers.py @@ -13,6 +13,8 @@ from workspaces.controllers.crud.base_model_controller import BaseModelView +from cloudharness.workflows.argo import delete_workflow + class WorkspaceView(BaseModelView): @@ -24,6 +26,17 @@ def post(self, body): except NotAllowed: return "Not allowed", 405 + + + def delete(self, id_): + """Delete an object from the repository.""" + for wf in WorkspaceService.get_workspace_workflows(id_): + delete_workflow(wf.name) + + return super().delete(id_) + + + class OsbrepositoryView(BaseModelView): service = OsbrepositoryService() diff --git a/applications/workspaces/server/workspaces/service/crud_service.py b/applications/workspaces/server/workspaces/service/crud_service.py index 3d363c49..5b8694ba 100644 --- a/applications/workspaces/server/workspaces/service/crud_service.py +++ b/applications/workspaces/server/workspaces/service/crud_service.py @@ -2,6 +2,7 @@ import json import os import shutil +from typing import List from cloudharness.service.pvc import create_persistent_volume_claim, delete_persistent_volume_claim from cloudharness import log @@ -34,6 +35,7 @@ from workspaces.utils import dao_entity2dict, guess_resource_type from ..database import db + def rm_null_values(dikt): tmp = {} for k, v in dikt.items(): # remove null fields from dict @@ -45,9 +47,11 @@ def rm_null_values(dikt): class NotAuthorized(Exception): pass + class NotFoundException(Exception): pass + class NotAllowed(Exception): pass @@ -106,7 +110,6 @@ def search(self, page=1, per_page=20, *args, **kwargs) -> Pagination: objects = self.repository.search( page=page, per_page=per_page, *args, **kwargs) - objects.items = [self.to_dto(obj) for obj in objects.items] return objects @@ -153,7 +156,7 @@ def __str__(self): def is_authorized(self, object): raise NotImplementedError( f"Authorization not implemented for {self.__class__.__name__}") - + @classmethod @cache def get_user_cached(cls, user_id): @@ -185,20 +188,27 @@ def check_max_num_workspaces_per_user(self, user_id=None): "limit exceeded" ) + @staticmethod + def get_workspace_workflows(ws_id) -> List[argo.Workflow]: + try: + return [w for w in argo.get_workflows(status="Running", limit=9999).items + if w.status == "Running" and w.raw.spec.templates[0].metadata.labels.get( + "workspace" + ).strip() == str(ws_id)] + except AttributeError: + return [] + @send_event(message_type="workspace", operation="create") def post(self, body): if 'user_id' not in body: body['user_id'] = keycloak_user_id() self.check_max_num_workspaces_per_user(body['user_id']) - workspace = super().post(body) - create_volume(name=self.get_pvc_name(workspace.id), size=self.get_workspace_volume_size(workspace)) - - + for workspace_resource in workspace.resources: WorkspaceresourceService.handle_resource_data(workspace_resource) @@ -223,7 +233,7 @@ def clone(self, workspace_id): if workspace is None: raise NotFoundException( f"Cannot clone workspace with id {workspace_id}: not found.") - + workspace.name = f"Clone of {workspace.name}" workspace.user_id = user_id workspace.publicable = False @@ -231,13 +241,10 @@ def clone(self, workspace_id): workspace.timestamp_created = None workspace.resources = [] - - - cloned = self.repository.post(workspace) create_volume(name=self.get_pvc_name(cloned.id), - size=self.get_workspace_volume_size(workspace)) + size=self.get_workspace_volume_size(workspace)) clone_workspaces_content(workspace_id, cloned.id) return cloned @@ -285,25 +292,28 @@ def to_dto(cls, workspace_entity: TWorkspaceEntity) -> Workspace: workspace = super().to_dto(workspace_entity) for resource in workspace_entity.resources: - if resource.folder: # Legacy folder/path handling + if resource.folder: # Legacy folder/path handling resource.path = resource.folder del resource.folder resource.origin = json.loads(resource.origin) - if resource.origin.get("folder", None) is not None: # Legacy folder/path handling + # Legacy folder/path handling + if resource.origin.get("folder", None) is not None: resource.origin["path"] = resource.origin.get("folder") del resource.origin["folder"] - - workspace.resources = [WorkspaceresourceService.to_dto(r) for r in workspace_entity.resources] if workspace_entity.resources else [] + + workspace.resources = [WorkspaceresourceService.to_dto( + r) for r in workspace_entity.resources] if workspace_entity.resources else [] return workspace @classmethod def to_dao(cls, d: dict) -> TWorkspaceEntity: - + resources = d.get("resources", []) d["resources"] = [] workspace: TWorkspaceEntity = super().to_dao(d) workspace.tags = TagRepository().get_tags_daos(workspace.tags) - workspace.resources = [WorkspaceresourceService.to_dao(r) for r in resources] + workspace.resources = [ + WorkspaceresourceService.to_dao(r) for r in resources] return workspace def get(self, id_): @@ -329,37 +339,32 @@ def get(self, id_): logger.debug( "Post get, check workflows for workspace %s....", workspace.id) try: - workflows = argo.get_workflows(status="Running", limit=9999) - if workflows and workflows.items: - for workflow in workflows.items: - try: - if workflow.status == "Running" and workflow.raw.spec.templates[0].metadata.labels.get( - "workspace" - ).strip() == str(workspace.id): - fake_path = f"Importing resources, progress {workflow.raw.status.progress}".replace( - "/", " of ") - workspace.resources.append( - WorkspaceResource.from_dict( - { - "id": -1, - "name": "Refreshing resources", - "origin": {"path": fake_path}, - "resource_type": ResourceType.U, - "workspace_id": workspace.id, - } + for workflow in self.get_workspace_workflows(workspace.id): + try: + fake_path = f"Importing resources, progress {workflow.raw.status.progress}".replace( + "/", " of ") + workspace.resources.append( + WorkspaceResource.from_dict( + { + "id": -1, + "name": "Refreshing resources", + "origin": {"path": fake_path}, + "resource_type": ResourceType.U, + "workspace_id": workspace.id, + } + ) ) - ) - break - except Exception as e: + + except Exception as e: logger.exception("Error checking workflow for workspace %s: %s", - workspace.id, workflow.name) + workspace.id, workflow.name) from pprint import pprint # pprint(workflow.raw) # probably not a workspace import workflow job --> skip it pass except Exception as e: logger.exception("Error checking workflows for workspace %s: %s", - workspace.id, e) + workspace.id, e) return workspace @@ -435,7 +440,6 @@ def user(cls, osbrepo): return cls.get_user_cached(osbrepo.user_id) - class VolumestorageService(BaseModelService): repository = VolumeStorageRepository() @@ -493,7 +497,7 @@ def post(self, body) -> WorkspaceResource: @staticmethod def handle_resource_data(workspace_resource: WorkspaceResource) -> WorkspaceResource: - if workspace_resource.status == "p" and workspace_resource.origin: + if workspace_resource.status == "p" and workspace_resource.origin: from workspaces.helpers.etl_helpers import copy_workspace_resource copy_workspace_resource(workspace_resource)