Skip to content

Commit

Permalink
#751 delete workflows on workspace delete
Browse files Browse the repository at this point in the history
  • Loading branch information
filippomc committed Feb 5, 2024
1 parent 18c7b5e commit 91740c2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@


from workspaces.controllers.crud.base_model_controller import BaseModelView
from cloudharness.workflows.argo import delete_workflow



class WorkspaceView(BaseModelView):
Expand All @@ -24,6 +26,17 @@ def post(self, body):
except NotAllowed:
return "Not allowed", 405



def delete(self, id_):
"""Delete an object from the repository."""
for wf in WorkspaceService.get_workspace_workflows(id_):
delete_workflow(wf.name)

return super().delete(id_)




class OsbrepositoryView(BaseModelView):
service = OsbrepositoryService()
Expand Down
88 changes: 46 additions & 42 deletions applications/workspaces/server/workspaces/service/crud_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import shutil
from typing import List

from cloudharness.service.pvc import create_persistent_volume_claim, delete_persistent_volume_claim
from cloudharness import log
Expand Down Expand Up @@ -34,6 +35,7 @@
from workspaces.utils import dao_entity2dict, guess_resource_type
from ..database import db


def rm_null_values(dikt):
tmp = {}
for k, v in dikt.items(): # remove null fields from dict
Expand All @@ -45,9 +47,11 @@ def rm_null_values(dikt):
class NotAuthorized(Exception):
pass


class NotFoundException(Exception):
pass


class NotAllowed(Exception):
pass

Expand Down Expand Up @@ -106,7 +110,6 @@ def search(self, page=1, per_page=20, *args, **kwargs) -> Pagination:
objects = self.repository.search(
page=page, per_page=per_page, *args, **kwargs)


objects.items = [self.to_dto(obj) for obj in objects.items]
return objects

Expand Down Expand Up @@ -153,7 +156,7 @@ def __str__(self):
def is_authorized(self, object):
raise NotImplementedError(
f"Authorization not implemented for {self.__class__.__name__}")

@classmethod
@cache
def get_user_cached(cls, user_id):
Expand Down Expand Up @@ -185,20 +188,27 @@ def check_max_num_workspaces_per_user(self, user_id=None):
"limit exceeded"
)

@staticmethod
def get_workspace_workflows(ws_id) -> List[argo.Workflow]:
try:
return [w for w in argo.get_workflows(status="Running", limit=9999).items
if w.status == "Running" and w.raw.spec.templates[0].metadata.labels.get(
"workspace"
).strip() == str(ws_id)]
except AttributeError:
return []

@send_event(message_type="workspace", operation="create")
def post(self, body):
if 'user_id' not in body:
body['user_id'] = keycloak_user_id()
self.check_max_num_workspaces_per_user(body['user_id'])


workspace = super().post(body)


create_volume(name=self.get_pvc_name(workspace.id),
size=self.get_workspace_volume_size(workspace))



for workspace_resource in workspace.resources:
WorkspaceresourceService.handle_resource_data(workspace_resource)

Expand All @@ -223,21 +233,18 @@ def clone(self, workspace_id):
if workspace is None:
raise NotFoundException(
f"Cannot clone workspace with id {workspace_id}: not found.")

workspace.name = f"Clone of {workspace.name}"
workspace.user_id = user_id
workspace.publicable = False
workspace.featured = False
workspace.timestamp_created = None
workspace.resources = []




cloned = self.repository.post(workspace)

create_volume(name=self.get_pvc_name(cloned.id),
size=self.get_workspace_volume_size(workspace))
size=self.get_workspace_volume_size(workspace))
clone_workspaces_content(workspace_id, cloned.id)
return cloned

Expand Down Expand Up @@ -285,25 +292,28 @@ def to_dto(cls, workspace_entity: TWorkspaceEntity) -> Workspace:

workspace = super().to_dto(workspace_entity)
for resource in workspace_entity.resources:
if resource.folder: # Legacy folder/path handling
if resource.folder: # Legacy folder/path handling
resource.path = resource.folder
del resource.folder
resource.origin = json.loads(resource.origin)
if resource.origin.get("folder", None) is not None: # Legacy folder/path handling
# Legacy folder/path handling
if resource.origin.get("folder", None) is not None:
resource.origin["path"] = resource.origin.get("folder")
del resource.origin["folder"]

workspace.resources = [WorkspaceresourceService.to_dto(r) for r in workspace_entity.resources] if workspace_entity.resources else []

workspace.resources = [WorkspaceresourceService.to_dto(
r) for r in workspace_entity.resources] if workspace_entity.resources else []
return workspace

@classmethod
def to_dao(cls, d: dict) -> TWorkspaceEntity:

resources = d.get("resources", [])
d["resources"] = []
workspace: TWorkspaceEntity = super().to_dao(d)
workspace.tags = TagRepository().get_tags_daos(workspace.tags)
workspace.resources = [WorkspaceresourceService.to_dao(r) for r in resources]
workspace.resources = [
WorkspaceresourceService.to_dao(r) for r in resources]
return workspace

def get(self, id_):
Expand All @@ -329,37 +339,32 @@ def get(self, id_):
logger.debug(
"Post get, check workflows for workspace %s....", workspace.id)
try:
workflows = argo.get_workflows(status="Running", limit=9999)
if workflows and workflows.items:
for workflow in workflows.items:
try:
if workflow.status == "Running" and workflow.raw.spec.templates[0].metadata.labels.get(
"workspace"
).strip() == str(workspace.id):
fake_path = f"Importing resources, progress {workflow.raw.status.progress}".replace(
"/", " of ")
workspace.resources.append(
WorkspaceResource.from_dict(
{
"id": -1,
"name": "Refreshing resources",
"origin": {"path": fake_path},
"resource_type": ResourceType.U,
"workspace_id": workspace.id,
}
for workflow in self.get_workspace_workflows(workspace.id):
try:
fake_path = f"Importing resources, progress {workflow.raw.status.progress}".replace(
"/", " of ")
workspace.resources.append(
WorkspaceResource.from_dict(
{
"id": -1,
"name": "Refreshing resources",
"origin": {"path": fake_path},
"resource_type": ResourceType.U,
"workspace_id": workspace.id,
}
)
)
)
break
except Exception as e:

except Exception as e:
logger.exception("Error checking workflow for workspace %s: %s",
workspace.id, workflow.name)
workspace.id, workflow.name)
from pprint import pprint
# pprint(workflow.raw)
# probably not a workspace import workflow job --> skip it
pass
except Exception as e:
logger.exception("Error checking workflows for workspace %s: %s",
workspace.id, e)
workspace.id, e)

return workspace

Expand Down Expand Up @@ -435,7 +440,6 @@ def user(cls, osbrepo):
return cls.get_user_cached(osbrepo.user_id)



class VolumestorageService(BaseModelService):
repository = VolumeStorageRepository()

Expand Down Expand Up @@ -493,7 +497,7 @@ def post(self, body) -> WorkspaceResource:

@staticmethod
def handle_resource_data(workspace_resource: WorkspaceResource) -> WorkspaceResource:
if workspace_resource.status == "p" and workspace_resource.origin:
if workspace_resource.status == "p" and workspace_resource.origin:
from workspaces.helpers.etl_helpers import copy_workspace_resource
copy_workspace_resource(workspace_resource)

Expand Down

0 comments on commit 91740c2

Please sign in to comment.