diff --git a/rtwcli/rtw_cmds/rtw_cmds/docker/verbs.py b/rtwcli/rtw_cmds/rtw_cmds/docker/verbs.py index 83daf850..a9bfa73e 100644 --- a/rtwcli/rtw_cmds/rtw_cmds/docker/verbs.py +++ b/rtwcli/rtw_cmds/rtw_cmds/docker/verbs.py @@ -19,7 +19,7 @@ is_docker_container_running, ) from rtwcli.verb import VerbExtension -from rtwcli.workspace_manger import get_current_workspace +from rtwcli.workspace_utils import get_current_workspace class EnterVerb(VerbExtension): diff --git a/rtwcli/rtw_cmds/rtw_cmds/workspace/create_verb.py b/rtwcli/rtw_cmds/rtw_cmds/workspace/create_verb.py index bf5892ea..6ed42ddd 100644 --- a/rtwcli/rtw_cmds/rtw_cmds/workspace/create_verb.py +++ b/rtwcli/rtw_cmds/rtw_cmds/workspace/create_verb.py @@ -50,7 +50,7 @@ ) from rtwcli.verb import VerbExtension import docker -from rtwcli.workspace_manger import ( +from rtwcli.workspace_utils import ( Workspace, WorkspacesConfig, get_compile_cmd, @@ -64,7 +64,7 @@ DEFAULT_FINAL_IMAGE_NAME_FORMAT = "rtw_{workspace_name}_final" DEFAULT_CONTAINER_NAME_FORMAT = "{final_image_name}-instance" DEFAULT_HOSTNAME_FORMAT = "rtw-{workspace_name}-docker" -DEFAULT_RTW_DOCKER_BRANCH = "rtw_ws_create" +DEFAULT_RTW_DOCKER_BRANCH = "master" DEFAULT_RTW_DOCKER_PATH = os.path.expanduser("~/ros_team_workspace") DEFAULT_UPSTREAM_WS_NAME_FORMAT = "{workspace_name}_upstream" DEFAULT_WS_REPOS_FILE_FORMAT = "{repo_name}.{ros_distro}.repos" @@ -936,9 +936,9 @@ def main(self, *, args): ws_name=create_args.upstream_ws_name, ws_folder=create_args.upstream_ws_abs_path, distro=create_args.ros_distro, - ws_docker_support=True if create_args.docker else False, - docker_tag=create_args.final_image_name if create_args.docker else "", - docker_container_name=create_args.container_name if create_args.docker else "", + ws_docker_support=create_args.docker, + docker_tag=create_args.final_image_name if create_args.docker else None, + docker_container_name=create_args.container_name if create_args.docker else None, standalone=create_args.standalone, ) if not update_workspaces_config(WORKSPACES_PATH, local_upstream_ws): diff --git a/rtwcli/rtw_cmds/rtw_cmds/workspace/delete_verb.py b/rtwcli/rtw_cmds/rtw_cmds/workspace/delete_verb.py new file mode 100644 index 00000000..f9f01382 --- /dev/null +++ b/rtwcli/rtw_cmds/rtw_cmds/workspace/delete_verb.py @@ -0,0 +1,405 @@ +# Copyright (c) 2023, Stogl Robotics Consulting UG (haftungsbeschränkt) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from abc import ABC, abstractmethod +import argparse +from dataclasses import dataclass, field +from enum import Enum, auto +import os +import shutil +import questionary.styles +import rich +from rich.tree import Tree +from typing import Dict, List +import questionary +from rtwcli import logger +from rtwcli.constants import ( + RICH_TREE_FST_LVL_COLOR, + RICH_TREE_GUIDE_STYLE, + RICH_TREE_LABEL_COLOR, + RICH_TREE_SND_LVL_COLOR, + RICH_TREE_STATUS_COLOR, + RICH_TREE_TRD_LVL_COLOR, + WORKSPACES_PATH, +) +from rtwcli.docker_utils import ( + docker_container_exists, + docker_stop, + is_docker_tag_valid, + remove_docker_container, + remove_docker_image, +) +from rtwcli.verb import VerbExtension +from rtwcli.workspace_utils import ( + Workspace, + WorkspacesConfig, + get_selected_ws_names_from_user, + load_workspaces_config_from_yaml_file, + save_workspaces_config, + workspace_name_completer, +) + + +class StepStatus(Enum): + NOT_STARTED = auto() + COMPLETED = auto() + FAILED = auto() + SKIPPED = auto() + + +class WorkspaceRemovalStatus(Enum): + NOT_STARTED = auto() + COMPLETED = auto() + FAILED = auto() + INTERRUPTED = auto() + CAUGHT_EXCEPTION = auto() + + +class StepName(Enum): + REMOVE_DOCKER_CONTAINER = "Remove Docker Container" + REMOVE_DOCKER_IMAGE = "Remove Docker Image" + REMOVE_WORKSPACE_FOLDER = "Remove Workspace Folder" + REMOVE_CONFIGURATION = "Remove Configuration" + + +REMOVAL_STEPS = [ + StepName.REMOVE_DOCKER_CONTAINER, + StepName.REMOVE_DOCKER_IMAGE, + StepName.REMOVE_WORKSPACE_FOLDER, + StepName.REMOVE_CONFIGURATION, +] + + +@dataclass +class StepStats: + name: StepName + status: StepStatus = StepStatus.NOT_STARTED + message: str = "" + + +@dataclass +class WorkspaceRemovalStats: + status: WorkspaceRemovalStatus = WorkspaceRemovalStatus.NOT_STARTED + message: str = "" + steps: List[StepStats] = field(default_factory=list) + + +class RemovalStep(ABC): + def __init__(self, name: StepName): + self.name = name + self.status = StepStatus.NOT_STARTED + self.message = "" + + @abstractmethod + def should_run(self, workspace: Workspace) -> bool: + pass + + @abstractmethod + def run(self, workspace: Workspace, config: WorkspacesConfig) -> bool: + pass + + def execute(self, workspace: Workspace, config: WorkspacesConfig) -> StepStats: + if self.should_run(workspace): + logger.info(f"Starting ws removal step: '{self.name.value}'") + try: + if self.run(workspace, config): + self.status = StepStatus.COMPLETED + logger.info(f"Successfully completed {self.name.value}: {self.message}") + else: + self.status = StepStatus.FAILED + logger.error(f"Failed to complete {self.name.value}: {self.message}") + except Exception as e: + self.status = StepStatus.FAILED + self.message = ( + f"Exception caught in {self.name.value}: {str(e)}" + f"\nOriginal message: '{self.message}'" + ) + logger.error(self.message) + raise + else: + self.status = StepStatus.SKIPPED + self.message = f"Skipping {self.name.value} (not applicable, e.g. no Docker support)" + logger.info(self.message) + + return StepStats(name=self.name, status=self.status, message=self.message) + + +class RemoveDockerContainer(RemovalStep): + def __init__(self): + super().__init__(StepName.REMOVE_DOCKER_CONTAINER) + + def should_run(self, workspace: Workspace) -> bool: + return workspace.ws_docker_support + + def run(self, workspace: Workspace, config: WorkspacesConfig) -> bool: + if not workspace.docker_container_name: + self.message = "Docker support is enabled but container name not set." + return False + + if docker_container_exists(workspace.docker_container_name): + if docker_stop(workspace.docker_container_name): + logger.info(f"Stopped Docker container '{workspace.docker_container_name}'") + else: + self.message = ( + f"Failed to stop Docker container '{workspace.docker_container_name}'" + ) + return False + + if remove_docker_container(workspace.docker_container_name): + self.message = f"Removed Docker container '{workspace.docker_container_name}'" + return True + else: + self.message = ( + f"Failed to remove Docker container '{workspace.docker_container_name}'" + ) + return False + else: + self.message = ( + f"Container '{workspace.docker_container_name}' does not exist. Skipping removal." + ) + return True + + +class RemoveDockerImage(RemovalStep): + def __init__(self): + super().__init__(StepName.REMOVE_DOCKER_IMAGE) + + def should_run(self, workspace: Workspace) -> bool: + return workspace.ws_docker_support + + def run(self, workspace: Workspace, config: WorkspacesConfig) -> bool: + if not workspace.docker_tag: + self.message = "Docker support is enabled but Docker tag not set." + return False + + if is_docker_tag_valid(workspace.docker_tag): + if remove_docker_image(workspace.docker_tag): + self.message = f"Removed Docker image '{workspace.docker_tag}'" + return True + else: + self.message = f"Failed to remove Docker image '{workspace.docker_tag}'" + return False + else: + self.message = ( + f"Docker image '{workspace.docker_tag}' does not exist. Skipping removal." + ) + return True + + +class RemoveWorkspaceFolder(RemovalStep): + def __init__(self): + super().__init__(StepName.REMOVE_WORKSPACE_FOLDER) + + def should_run(self, workspace: Workspace) -> bool: + return not workspace.standalone + + def run(self, workspace: Workspace, config: WorkspacesConfig) -> bool: + if not workspace.ws_folder: + self.message = "Workspace is not standalone but workspace folder not set." + return False + + if not os.path.exists(workspace.ws_folder): + self.message = ( + f"Workspace folder '{workspace.ws_folder}' does not exist. Skipping removal." + ) + return True + + # ask the user what should be removed, everything per default + items_in_ws_folder = os.listdir(workspace.ws_folder) + if not items_in_ws_folder: + self.message = f"Nothing to remove in workspace folder '{workspace.ws_folder}'" + return True + + item_paths = [os.path.join(workspace.ws_folder, item) for item in items_in_ws_folder] + item_choices_to_delete = { + f"{item} {os.listdir(item_path) if os.path.isdir(item_path) else ''}": item_path + for item, item_path in zip(items_in_ws_folder, item_paths) + } + items_to_delete = questionary.checkbox( + message=f"Select items to delete in workspace folder '{workspace.ws_folder}'\n", + choices=list(item_choices_to_delete.keys()), + style=questionary.Style([("highlighted", "bold")]), + ).ask() + if items_to_delete is None: # cancelled by user + self.message = f"User cancelled removal of workspace folder '{workspace.ws_folder}'" + return False + + logger.debug(f"Items to delete: {items_to_delete}") + + if len(items_to_delete) == len(items_in_ws_folder): + logger.debug( + f"User selected to delete all items in workspace folder '{workspace.ws_folder}', " + "removing entire folder" + ) + item_paths_to_delete = [workspace.ws_folder] + else: + item_paths_to_delete = [item_choices_to_delete[item] for item in items_to_delete] + for item_path in item_paths_to_delete: + logger.debug(f"Removing item: {item_path}") + if os.path.isfile(item_path) or os.path.islink(item_path): + os.unlink(item_path) + elif os.path.isdir(item_path): + shutil.rmtree(item_path) + logger.debug(f"Removed item: {item_path}") + + self.message = f"Removed items: {item_paths_to_delete}" + return True + + +class RemoveFromConfig(RemovalStep): + def __init__(self): + super().__init__(StepName.REMOVE_CONFIGURATION) + + def should_run(self, workspace: Workspace) -> bool: + return True + + def run(self, workspace: Workspace, config: WorkspacesConfig) -> bool: + if config.remove_workspace(workspace.ws_name): + if save_workspaces_config(WORKSPACES_PATH, config): + self.message = ( + f"Removed workspace '{workspace.ws_name}' from config file '{WORKSPACES_PATH}'" + ) + return True + else: + self.message = ( + f"Failed to save config file '{WORKSPACES_PATH}' after removing workspace" + ) + return False + else: + self.message = ( + f"Failed to remove workspace '{workspace.ws_name}' from workspace config" + ) + return False + + +def get_removal_step(step_name: StepName) -> RemovalStep: + step_classes = { + StepName.REMOVE_DOCKER_CONTAINER: RemoveDockerContainer, + StepName.REMOVE_DOCKER_IMAGE: RemoveDockerImage, + StepName.REMOVE_WORKSPACE_FOLDER: RemoveWorkspaceFolder, + StepName.REMOVE_CONFIGURATION: RemoveFromConfig, + } + return step_classes[step_name]() + + +def remove_workspaces( + config: WorkspacesConfig, workspace_names: List[str] +) -> Dict[str, WorkspaceRemovalStats]: + removal_stats: Dict[str, WorkspaceRemovalStats] = { + ws_name: WorkspaceRemovalStats(status=WorkspaceRemovalStatus.NOT_STARTED) + for ws_name in workspace_names + } + + for ws_name in workspace_names: + workspace = config.workspaces.get(ws_name) + if not workspace: + removal_stats[ws_name].status = WorkspaceRemovalStatus.FAILED + removal_stats[ws_name].message = ( + f"Workspace '{ws_name}' not found in the configuration" + ) + logger.error(removal_stats[ws_name].message) + continue + + try: + interrupted = False + for step_name in REMOVAL_STEPS: + step = get_removal_step(step_name) + step_stats = step.execute(workspace, config) + removal_stats[ws_name].steps.append(step_stats) + if step_stats.status == StepStatus.FAILED: + removal_stats[ws_name].status = WorkspaceRemovalStatus.FAILED + removal_stats[ws_name].message = ( + f"Failed to remove workspace '{ws_name}' during {step.name.value}" + ) + interrupted = True + break + if interrupted: + logger.debug(f"Removal of workspace '{ws_name}' interrupted") + break + except KeyboardInterrupt: + removal_stats[ws_name].status = WorkspaceRemovalStatus.INTERRUPTED + removal_stats[ws_name].message = ( + f"Removal process for workspace '{ws_name}' interrupted by user" + ) + logger.warning(removal_stats[ws_name].message) + break + except Exception as e: + removal_stats[ws_name].status = WorkspaceRemovalStatus.CAUGHT_EXCEPTION + removal_stats[ws_name].message = ( + f"Error during removal of workspace '{ws_name}': {str(e)}" + ) + logger.exception(removal_stats[ws_name].message) + break + + removal_stats[ws_name].status = WorkspaceRemovalStatus.COMPLETED + removal_stats[ws_name].message = f"Successfully removed workspace '{ws_name}'" + logger.info(removal_stats[ws_name].message) + + return removal_stats + + +def print_removal_stats(stats: Dict[str, WorkspaceRemovalStats]): + total = len(stats) + completed = sum( + 1 for ws_stats in stats.values() if ws_stats.status == WorkspaceRemovalStatus.COMPLETED + ) + tree = Tree( + f"{RICH_TREE_LABEL_COLOR}Workspace Removal Statistics: {completed}/{total}", + guide_style=RICH_TREE_GUIDE_STYLE, + ) + for ws_i, (ws_name, ws_stats) in enumerate(stats.items(), start=1): + ws_node = tree.add( + f"{RICH_TREE_FST_LVL_COLOR}{ws_i}. {ws_name} - {RICH_TREE_STATUS_COLOR}{ws_stats.status.name}", + highlight=True, + ) + for step_i, step in enumerate(ws_stats.steps, start=1): + step_node = ws_node.add( + f"{RICH_TREE_SND_LVL_COLOR}{step_i}. {step.name.value}: {RICH_TREE_STATUS_COLOR}{step.status.name}" + ) + step_node.add(f"{RICH_TREE_TRD_LVL_COLOR}{step.message}") + rich.print(tree) + + +class DeleteVerb(VerbExtension): + """Delete an available ROS workspace in the config.""" + + def add_arguments(self, parser: argparse.ArgumentParser, cli_name: str) -> None: + arg = parser.add_argument( + "workspace_name", + help="The workspace name", + nargs="?", + ) + arg.completer = workspace_name_completer # type: ignore + + def main(self, *, args): + workspaces_config = load_workspaces_config_from_yaml_file(WORKSPACES_PATH) + if not workspaces_config.workspaces: + logger.info(f"No workspaces found in config file '{WORKSPACES_PATH}'") + return + + if args.workspace_name: + logger.debug(f"Deleting workspace from args: {args.workspace_name}") + ws_names_to_delete = [args.workspace_name] + else: + ws_names_to_delete = get_selected_ws_names_from_user( + workspaces=workspaces_config.workspaces, + select_question_msg="Select workspaces to delete", + confirm_question_msg="Are you sure you want to delete the selected workspaces?", + ) + + logger.debug(f"Ws names to delete: {ws_names_to_delete}") + + removal_stats = remove_workspaces(workspaces_config, ws_names_to_delete) + print_removal_stats(removal_stats) diff --git a/rtwcli/rtw_cmds/rtw_cmds/workspace/import_verb.py b/rtwcli/rtw_cmds/rtw_cmds/workspace/import_verb.py index 7f2bf4db..06a87e7a 100644 --- a/rtwcli/rtw_cmds/rtw_cmds/workspace/import_verb.py +++ b/rtwcli/rtw_cmds/rtw_cmds/workspace/import_verb.py @@ -26,7 +26,7 @@ from rtwcli.rocker_utils import execute_rocker_cmd, generate_rocker_flags from rtwcli.utils import get_filtered_args, replace_user_name_in_path from rtwcli.verb import VerbExtension -from rtwcli.workspace_manger import Workspace, update_workspaces_config +from rtwcli.workspace_utils import Workspace, update_workspaces_config @dataclass diff --git a/rtwcli/rtw_cmds/rtw_cmds/workspace/port_verb.py b/rtwcli/rtw_cmds/rtw_cmds/workspace/port_verb.py index fb51e9db..e99c0303 100644 --- a/rtwcli/rtw_cmds/rtw_cmds/workspace/port_verb.py +++ b/rtwcli/rtw_cmds/rtw_cmds/workspace/port_verb.py @@ -13,127 +13,229 @@ # limitations under the License. import argparse -import os +from enum import Enum +import re +from typing import Dict, List +import rich +import rich.tree +from rtwcli import logger from rtwcli.constants import ( - F_DISTRO, - F_WS_FOLDER, + RICH_TREE_FST_LVL_COLOR, + RICH_TREE_GUIDE_STYLE, + RICH_TREE_LABEL_COLOR, + RICH_TREE_STATUS_COLOR, + ROS_TEAM_WS_BASE_WS_ENV_VAR, + ROS_TEAM_WS_DISTRO_ENV_VAR, + ROS_TEAM_WS_DOCKER_CONTAINER_NAME_ENV_VAR, + ROS_TEAM_WS_DOCKER_TAG_ENV_VAR, + ROS_TEAM_WS_PREFIX, + ROS_TEAM_WS_WS_DOCKER_SUPPORT_ENV_VAR, ROS_TEAM_WS_WS_FOLDER_ENV_VAR, ROS_TEAM_WS_RC_PATH, - ROS_TEAM_WS_ENV_VARIABLES, + WORKSPACES_PATH, ) from rtwcli.verb import VerbExtension -from rtwcli.workspace_manger import ( - env_var_to_workspace_var, - try_port_workspace, - extract_workspaces_from_bash_script, +from rtwcli.workspace_utils import ( + Workspace, + get_selected_ws_names_from_user, + update_workspaces_config, ) -class PortVerb(VerbExtension): - """Port workspace(s) by creating the corresponding config entry.""" +WS_FUNCTION_PREFIX = ROS_TEAM_WS_PREFIX + "setup_" +BASE_WS_CURRENT = "" +EMPTY_DOCKER_TAG = "-" +DEFAULT_DOCKER_CONTAINER_NAME_FORMAT = "{docker_tag}-instance" - def add_arguments(self, parser: argparse.ArgumentParser, cli_name: str): - parser.add_argument( - "--all", - action="store_true", - default=False, - help=f"Port all workspaces (from '{ROS_TEAM_WS_RC_PATH}')", - ) - parser.add_argument( - "--current", - action="store_true", - default=False, - help="Port the currently sourced workspace", - ) - def main(self, *, args): - if args.all and args.current: - print("Please specify only single argument: '--all' or '--current'.") - elif args.all: - self.port_all_workspaces() - elif args.current: - self.port_current_workspace() - else: - print("Please specify what to port: '--all' or '--current'.") +class PortStatus(Enum): + """Port status enum.""" - def port_all_workspaces( - self, - var_str_format: str = "\t{:>30} -> {:<20}: {}", - sep_line: str = "-" * 50, - summary_sep_line: str = "#" * 50, - ): - print(f"Reading workspaces from script '{ROS_TEAM_WS_RC_PATH}'") + SUCCESS = "SUCCESS" + FAILED = "FAILED" - script_workspaces = extract_workspaces_from_bash_script(ROS_TEAM_WS_RC_PATH) - ws_num = len(script_workspaces) - print(f"Found {ws_num} workspaces in script '{ROS_TEAM_WS_RC_PATH}'") - # For statistics: - porting_stats = { - "successful": [], - "failed": [], +def extract_workspaces_from_bash_script(script_path: str) -> Dict[str, Workspace]: + """Extract workspaces from a bash script.""" + with open(script_path) as file: + data = file.read() + + # regex: find all functions in the script + ws_functions = re.findall(r"(\w+ \(\) \{[^}]+\})", data, re.MULTILINE) + logger.debug( + f"Found workspace functions: {len(ws_functions)}" + f"\nFirst ws function: \n{ws_functions[0]} \nLast ws function: \n{ws_functions[-1]}" + ) + + workspaces: Dict[str, Workspace] = {} + for ws_function in ws_functions: + ws_function_name = re.search(r"(\w+) \(", ws_function).group(1) # type: ignore + if WS_FUNCTION_PREFIX not in ws_function_name: + logger.info( + f"WS function name '{ws_function_name}' does not contain " + f"ws function prefix '{WS_FUNCTION_PREFIX}', skipping." + ) + continue + ws_name = ws_function_name[len(WS_FUNCTION_PREFIX) :] + logger.debug(f"WS name: '{ws_name}'") + + # regex: find all variables and values in the function + # e.g. (export) RosTeamWS_DISTRO="humble" + variables_and_values: Dict[str, str] = { + var: val.strip('"') + for var, val in re.findall(r'(?:export )?(\w+)=(".*?"|\'.*?\'|[^ ]+)', ws_function) } + logger.debug(f"WS function variables and values: {variables_and_values}") - for i, (script_ws, script_ws_data) in enumerate(script_workspaces.items()): - ws_name_to_print = f"{i+1}/{ws_num} script workspace '{script_ws}'" - print(f"{sep_line}\n" f"Processing {ws_name_to_print}," f" ws_data: {script_ws_data}") - workspace_data_to_port = {} - for env_var, env_var_value in script_ws_data.items(): - ws_var, ws_var_value = env_var_to_workspace_var(env_var, env_var_value) - print(var_str_format.format(env_var, ws_var, env_var_value)) - workspace_data_to_port[ws_var] = ws_var_value - - print("Generating workspace name from workspace path with first folder letters: ") - ws_path = script_ws_data[ROS_TEAM_WS_WS_FOLDER_ENV_VAR] - new_ws_name = os.path.basename(ws_path) - print(f"\t'{ws_path}' -> {new_ws_name}") - - success = try_port_workspace(workspace_data_to_port, new_ws_name) - if success: - print(f"Ported workspace '{new_ws_name}' successfully") - porting_stats["successful"].append(ws_name_to_print) - else: - print(f"Porting workspace '{new_ws_name}' failed") - porting_stats["failed"].append(ws_name_to_print) - - # Print the final summary: - print("\n" + summary_sep_line) - print("Porting Summary:") - print(f"Total Workspaces: {ws_num}") - print(f"Successfully Ported: {len(porting_stats['successful'])}") - for ws_name in porting_stats["successful"]: - print(f" - {ws_name}") - print(f"Failed to Port: {len(porting_stats['failed'])}") - for ws_name in porting_stats["failed"]: - print(f" - {ws_name}") - print(summary_sep_line) - - def port_current_workspace(self, var_str_format: str = "\t{:>30} -> {:<20}: {}"): - workspace_data_to_port = {} - print(f"Reading workspace environment variables: {ROS_TEAM_WS_ENV_VARIABLES}") - for env_var in ROS_TEAM_WS_ENV_VARIABLES: - env_var_value = os.environ.get(env_var, None) - ws_var, ws_var_value = env_var_to_workspace_var(env_var, env_var_value) - # check if variable is exported - if ws_var in [F_DISTRO, F_WS_FOLDER] and ws_var_value is None: - print(f"Variable {env_var} is not exported. Cannot proceed with porting.") - return - print(var_str_format.format(env_var, ws_var, env_var_value)) - workspace_data_to_port[ws_var] = ws_var_value - - print("Generating workspace name from workspace path with first folder letters: ") - ws_path = os.environ.get(ROS_TEAM_WS_WS_FOLDER_ENV_VAR) - if not ws_path: - raise RuntimeError( - f"Environment variable '{ROS_TEAM_WS_WS_FOLDER_ENV_VAR}' is not set." + if ROS_TEAM_WS_DISTRO_ENV_VAR not in variables_and_values: + logger.error( + f"Workspace '{ws_name}' does not have distro '{ROS_TEAM_WS_DISTRO_ENV_VAR}', " + "skipping." ) + continue + distro = variables_and_values[ROS_TEAM_WS_DISTRO_ENV_VAR] - new_ws_name = os.path.basename(ws_path) - print(f"\t'{ws_path}' -> {new_ws_name}") + if ROS_TEAM_WS_WS_FOLDER_ENV_VAR not in variables_and_values: + logger.error( + f"Workspace '{ws_name}' does not have ws folder '{ROS_TEAM_WS_WS_FOLDER_ENV_VAR}', " + "skipping." + ) + continue + ws_folder = variables_and_values[ROS_TEAM_WS_WS_FOLDER_ENV_VAR] + + base_ws = None + if ( + ROS_TEAM_WS_BASE_WS_ENV_VAR in variables_and_values + and variables_and_values[ROS_TEAM_WS_BASE_WS_ENV_VAR] != BASE_WS_CURRENT + ): + logger.debug( + f"Workspace '{ws_name}' has base ws that is not '{BASE_WS_CURRENT}', " + "setting it." + ) + base_ws = variables_and_values[ROS_TEAM_WS_BASE_WS_ENV_VAR] + + ws_docker_support = False + if ( + ROS_TEAM_WS_WS_DOCKER_SUPPORT_ENV_VAR in variables_and_values + and variables_and_values[ROS_TEAM_WS_WS_DOCKER_SUPPORT_ENV_VAR] == "true" + ): + logger.debug( + f"Workspace '{ws_name}' has docker support ('{ROS_TEAM_WS_WS_DOCKER_SUPPORT_ENV_VAR}')" + ) + ws_docker_support = True + + docker_tag = None + if ws_docker_support: + if ROS_TEAM_WS_DOCKER_TAG_ENV_VAR not in variables_and_values: + logger.error( + f"Workspace '{ws_name}' has docker support but does not have docker tag " + f"variable '{ROS_TEAM_WS_DOCKER_TAG_ENV_VAR}', skipping." + ) + continue + docker_tag = variables_and_values[ROS_TEAM_WS_DOCKER_TAG_ENV_VAR] + + docker_container_name = None + if ws_docker_support: + if ROS_TEAM_WS_DOCKER_CONTAINER_NAME_ENV_VAR in variables_and_values: + logger.debug( + f"Workspace '{ws_name}' has docker container name " + f"'{ROS_TEAM_WS_DOCKER_CONTAINER_NAME_ENV_VAR}', setting it." + ) + docker_container_name = variables_and_values[ + ROS_TEAM_WS_DOCKER_CONTAINER_NAME_ENV_VAR + ] + else: + docker_container_name = DEFAULT_DOCKER_CONTAINER_NAME_FORMAT.format( + docker_tag=docker_tag + ) + logger.info( + f"Workspace '{ws_name}' has docker support but does not have docker container name " + f"variable, setting it to {docker_container_name}" + ) + + ws = Workspace( + ws_name=ws_name, + distro=distro, + ws_folder=ws_folder, + ws_docker_support=ws_docker_support, + docker_tag=docker_tag if docker_tag else "", + docker_container_name=docker_container_name if docker_container_name else "", + base_ws=base_ws if base_ws else "", + ) + workspaces[ws_name] = ws + + return workspaces + + +def port_workspace_name_completer(**kwargs) -> List[str]: + workspaces = extract_workspaces_from_bash_script(ROS_TEAM_WS_RC_PATH) + if not workspaces: + return ["NO_WORKSPACES_FOUND"] + return [ws_name for ws_name in workspaces.keys()] + + +def print_port_stats(ported: List[str], to_be_ported: List[str]) -> None: + total = len(to_be_ported) + completed = len(ported) + tree = rich.tree.Tree( + f"{RICH_TREE_LABEL_COLOR}Workspace Porting Statistics: {completed}/{total}", + guide_style=RICH_TREE_GUIDE_STYLE, + ) + for ws_i, ws_name in enumerate(to_be_ported, start=1): + status = PortStatus.SUCCESS if ws_name in ported else PortStatus.FAILED + tree.add( + f"{RICH_TREE_FST_LVL_COLOR}{ws_i}. {ws_name} - {RICH_TREE_STATUS_COLOR}{status.name}", + highlight=True, + ) + rich.print(tree) + + +class PortVerb(VerbExtension): + """Port workspace(s) by creating the corresponding config entry.""" + + def add_arguments(self, parser: argparse.ArgumentParser, cli_name: str) -> None: + arg = parser.add_argument( + "workspace_name", + help="The workspace name", + nargs="?", + ) + arg.completer = port_workspace_name_completer # type: ignore + + def main(self, *, args): + script_workspaces = extract_workspaces_from_bash_script(ROS_TEAM_WS_RC_PATH) + if not script_workspaces: + logger.info(f"No workspaces found in script '{ROS_TEAM_WS_RC_PATH}'") + return + + logger.debug( + f"Script workspaces: {len(script_workspaces)}" + f"\nFirst script workspace: \n{list(script_workspaces.values())[0]}" + f"\nLast script workspace: \n{list(script_workspaces.values())[-1]}" + ) - success = try_port_workspace(workspace_data_to_port, new_ws_name) - if success: - print(f"Ported workspace '{new_ws_name}' successfully") + if args.workspace_name: + logger.debug(f"Porting workspace from args: {args.workspace_name}") + ws_names_to_port = [args.workspace_name] else: - print(f"Porting workspace '{new_ws_name}' failed") + ws_names_to_port = get_selected_ws_names_from_user( + workspaces=script_workspaces, + select_question_msg="Select workspaces to port", + confirm_question_msg="Are you sure you want to port the selected workspaces?", + ) + if not ws_names_to_port: + logger.info("No workspaces selected to port.") + return + logger.debug(f"Ws names to port: {ws_names_to_port}") + + ported = [] + for ws_name in ws_names_to_port: + if update_workspaces_config(WORKSPACES_PATH, script_workspaces[ws_name]): + ported.append(ws_name) + logger.debug(f"Ported workspace: {ws_name}") + else: + logger.error( + f"Failed to port workspace: {ws_name}. Please see the logs for details." + ) + + print_port_stats(ported=ported, to_be_ported=ws_names_to_port) diff --git a/rtwcli/rtw_cmds/rtw_cmds/workspace/use_verb.py b/rtwcli/rtw_cmds/rtw_cmds/workspace/use_verb.py index b657b1ed..edaf28e4 100644 --- a/rtwcli/rtw_cmds/rtw_cmds/workspace/use_verb.py +++ b/rtwcli/rtw_cmds/rtw_cmds/workspace/use_verb.py @@ -22,7 +22,7 @@ ) from rtwcli.utils import create_file_and_write from rtwcli.verb import VerbExtension -from rtwcli.workspace_manger import ( +from rtwcli.workspace_utils import ( create_bash_script_content_for_using_ws, load_workspaces_config_from_yaml_file, workspace_name_completer, diff --git a/rtwcli/rtw_cmds/setup.py b/rtwcli/rtw_cmds/setup.py index 3ba9678e..bc92cc23 100644 --- a/rtwcli/rtw_cmds/setup.py +++ b/rtwcli/rtw_cmds/setup.py @@ -55,6 +55,7 @@ ], "rtw_cmds.workspace.verbs": [ "create = rtw_cmds.workspace.create_verb:CreateVerb", + "delete = rtw_cmds.workspace.delete_verb:DeleteVerb", "import = rtw_cmds.workspace.import_verb:ImportVerb", "port = rtw_cmds.workspace.port_verb:PortVerb", "use = rtw_cmds.workspace.use_verb:UseVerb", diff --git a/rtwcli/rtwcli/rtwcli/__init__.py b/rtwcli/rtwcli/rtwcli/__init__.py index e69de29b..846e34be 100644 --- a/rtwcli/rtwcli/rtwcli/__init__.py +++ b/rtwcli/rtwcli/rtwcli/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2023, Stogl Robotics Consulting UG (haftungsbeschränkt) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .logging_config import logger + +logger.debug("rtwcli package initialized") diff --git a/rtwcli/rtwcli/rtwcli/constants.py b/rtwcli/rtwcli/rtwcli/constants.py index 4fe7bd57..42a7e24f 100644 --- a/rtwcli/rtwcli/rtwcli/constants.py +++ b/rtwcli/rtwcli/rtwcli/constants.py @@ -60,3 +60,10 @@ ] DISPLAY_MANAGER_WAYLAND = "wayland" + +RICH_TREE_LABEL_COLOR = "[bold green]" +RICH_TREE_GUIDE_STYLE = "bold" +RICH_TREE_FST_LVL_COLOR = "[bold blue]" +RICH_TREE_SND_LVL_COLOR = "[yellow]" +RICH_TREE_TRD_LVL_COLOR = "[cyan]" +RICH_TREE_STATUS_COLOR = "[bold magenta]" diff --git a/rtwcli/rtwcli/rtwcli/docker_utils.py b/rtwcli/rtwcli/rtwcli/docker_utils.py index 68270d00..03b5d3bc 100644 --- a/rtwcli/rtwcli/rtwcli/docker_utils.py +++ b/rtwcli/rtwcli/rtwcli/docker_utils.py @@ -15,6 +15,7 @@ import os from typing import Union +from rtwcli import logger from rtwcli.utils import create_file_if_not_exists, run_command import docker @@ -28,7 +29,7 @@ def is_docker_tag_valid(tag: str) -> bool: docker.errors.ImageNotFound, # type: ignore docker.errors.APIError, # type: ignore ) as e: - print(f"Failed to get docker image '{tag}': {e}") + logger.error(f"Failed to get docker image '{tag}': {e}") return False @@ -114,6 +115,50 @@ def docker_stop(container_name: str) -> bool: return run_command(["docker", "stop", container_name]) +def remove_docker_image(tag: str, force: bool = False) -> bool: + """Remove a docker image with the given tag.""" + try: + docker_client = docker.from_env() + image = docker_client.images.get(tag) + docker_client.images.remove(image.id, force=force) + return True + except ( + docker.errors.ImageNotFound, # type: ignore + docker.errors.APIError, # type: ignore + ) as e: + logger.error(f"Failed to remove docker image '{tag}': {e}") + return False + + +def docker_container_exists(id_or_name: str) -> bool: + """Check if a docker container with the given id or name exists.""" + try: + docker_client = docker.from_env() + docker_client.containers.get(id_or_name) + return True + except ( + docker.errors.NotFound, # type: ignore + docker.errors.APIError, # type: ignore + ) as e: + logger.error(f"Failed to get docker container '{id_or_name}': {e}") + return False + + +def remove_docker_container(id_or_name: str, force: bool = False) -> bool: + """Remove a docker container with the given id or name.""" + try: + docker_client = docker.from_env() + container = docker_client.containers.get(id_or_name) + container.remove(force=force) + return True + except ( + docker.errors.NotFound, # type: ignore + docker.errors.APIError, # type: ignore + ) as e: + logger.error(f"Failed to remove docker container '{id_or_name}': {e}") + return False + + def is_docker_container_running(id_or_name: str, running_status: str = "running") -> bool: """Check if a docker container with the given id or name is running.""" try: @@ -124,7 +169,7 @@ def is_docker_container_running(id_or_name: str, running_status: str = "running" docker.errors.NotFound, # type: ignore docker.errors.APIError, # type: ignore ) as e: - print(f"Failed to get docker container '{id_or_name}': {e}") + logger.error(f"Failed to get docker container '{id_or_name}': {e}") return False @@ -137,7 +182,9 @@ def change_docker_path_permissions( """Change the permissions of the given path in the given container to the given user and group.""" user = user_in if user_in else os.getuid() group = group_in if group_in else os.getgid() - print(f"Changing permissions of the path '{path}' to '{user}:{group}' in '{container_name}'.") + logger.info( + f"Changing permissions of the path '{path}' to '{user}:{group}' in '{container_name}'." + ) return docker_exec_bash_cmd(container_name, f"chown -R {user}:{group} {path}") @@ -155,19 +202,19 @@ def fix_missing_xauth_file( docker.errors.NotFound, # type: ignore docker.errors.APIError, # type: ignore ) as e: - print(f"Failed to get docker container '{container_name}': {e}") + logger.error(f"Failed to get docker container '{container_name}': {e}") return False if not container: - print(f"Container object is None for container '{container_name}'.") + logger.error(f"Container object is None for container '{container_name}'.") return False if not container.attrs: - print(f"Container attributes are None for container '{container_name}'.") + logger.error(f"Container attributes are None for container '{container_name}'.") return False if mounts_attr not in container.attrs: - print( + logger.error( f"Container attributes do not contain '{mounts_attr}' for container '{container_name}'." ) return False @@ -176,40 +223,42 @@ def fix_missing_xauth_file( for mount in container.attrs[mounts_attr]: if source_key in mount and xauth_file_ext in mount[source_key]: xauth_file_abs_path = mount[source_key] - print( + logger.info( f"Found {xauth_file_ext} file '{xauth_file_abs_path}' for container '{container_name}'." ) break if not xauth_file_abs_path: - print( + logger.info( f"There is no {xauth_file_ext} file for container '{container_name}'. Nothing to do." ) return True if os.path.isfile(xauth_file_abs_path): - print(f"File '{xauth_file_abs_path}' already exists.") + logger.info(f"File '{xauth_file_abs_path}' already exists.") return True if os.path.isdir(xauth_file_abs_path): - print(f"Path '{xauth_file_abs_path}' is a directory, removing it.") + logger.info(f"Path '{xauth_file_abs_path}' is a directory, removing it.") try: os.rmdir(xauth_file_abs_path) except OSError as e: - print(f"Failed to remove directory '{xauth_file_abs_path}': {e}") - print("========================================") - print("Please remove it manually and try again.") - print("========================================") + logger.error(f"Failed to remove directory '{xauth_file_abs_path}': {e}") + logger.error("========================================") + logger.error("Please remove it manually and try again.") + logger.error("========================================") return False if not create_file_if_not_exists(xauth_file_abs_path): - print(f"Failed to create file '{xauth_file_abs_path}'.") + logger.error(f"Failed to create file '{xauth_file_abs_path}'.") return False cmd = f"xauth nlist :0 | sed -e 's/^..../ffff/' | xauth -f {xauth_file_abs_path} nmerge -" if not run_command(cmd, shell=True): - print(f"Failed to run command '{cmd}'. File '{xauth_file_abs_path}' will be removed.") + logger.error( + f"Failed to run command '{cmd}'. File '{xauth_file_abs_path}' will be removed." + ) os.remove(xauth_file_abs_path) return False diff --git a/rtwcli/rtwcli/rtwcli/logging_config.py b/rtwcli/rtwcli/rtwcli/logging_config.py new file mode 100644 index 00000000..9d9fdd07 --- /dev/null +++ b/rtwcli/rtwcli/rtwcli/logging_config.py @@ -0,0 +1,32 @@ +# Copyright (c) 2023, Stogl Robotics Consulting UG (haftungsbeschränkt) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from rich.logging import RichHandler + + +def setup_logging() -> logging.Logger: + FORMAT = "%(message)s" + logging.basicConfig( + level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)] + ) + + logger = logging.getLogger("rtwcli") + logger.setLevel(logging.DEBUG) # global log level + + return logger + + +# Create and export the logger +logger = setup_logging() diff --git a/rtwcli/rtwcli/rtwcli/workspace_manger.py b/rtwcli/rtwcli/rtwcli/workspace_utils.py similarity index 58% rename from rtwcli/rtwcli/rtwcli/workspace_manger.py rename to rtwcli/rtwcli/rtwcli/workspace_utils.py index b44416be..2e4c73ae 100644 --- a/rtwcli/rtwcli/rtwcli/workspace_manger.py +++ b/rtwcli/rtwcli/rtwcli/workspace_utils.py @@ -15,17 +15,12 @@ import dataclasses import datetime import os -import re import shutil -from typing import Any, Dict, List, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import questionary from rtwcli.constants import ( BACKUP_DATETIME_FORMAT, - F_BASE_WS, - F_DOCKER_CONTAINER_NAME, - F_DOCKER_TAG, - F_WS_DOCKER_SUPPORT, F_WS_NAME, ROS_TEAM_WS_PREFIX, WORKSPACES_KEY, @@ -33,8 +28,8 @@ WORKSPACES_PATH_BACKUP_FORMAT, ROS_TEAM_WS_WS_NAME_ENV_VAR, ) -from rtwcli.docker_utils import is_docker_tag_valid from rtwcli.utils import create_file_if_not_exists, load_yaml_file, write_to_yaml_file +from rtwcli import logger @dataclasses.dataclass @@ -43,30 +38,42 @@ class Workspace: ws_name: str distro: str - ws_folder: str + ws_folder: Optional[str] = None ws_docker_support: bool = False - docker_tag: str = "" - docker_container_name: str = "" - base_ws: str = "" + docker_tag: Optional[str] = None + docker_container_name: Optional[str] = None + base_ws: Optional[str] = None standalone: bool = False def __post_init__(self): - self.distro = str(self.distro) - self.ws_folder = str(self.ws_folder) - self.ws_docker_support = bool(self.ws_docker_support) - if self.docker_tag is not None: - self.docker_tag = str(self.docker_tag) - if self.base_ws is not None: - self.base_ws = str(self.base_ws) - if self.docker_container_name is not None: - self.docker_container_name = str(self.docker_container_name) + if self.ws_folder == "": + self.ws_folder = None + if self.docker_tag == "": + self.docker_tag = None + if self.docker_container_name == "": + self.docker_container_name = None + if self.base_ws == "": + self.base_ws = None + if not self.ws_name: + raise ValueError("Workspace name cannot be empty.") + if not self.distro: + raise ValueError("Distro cannot be empty.") + if self.ws_folder and not os.path.isabs(self.ws_folder): + raise ValueError("Workspace folder must be an absolute path.") + if self.standalone and not self.ws_folder: + raise ValueError("Standalone workspace requires a workspace folder.") + if self.ws_docker_support and not self.docker_tag: + raise ValueError("Docker-supported workspace requires a docker tag.") + if self.ws_docker_support and not self.docker_container_name: + raise ValueError("Docker-supported workspace requires a docker container name.") def to_dict(self) -> Dict[str, Any]: - result = dataclasses.asdict(self) - for key, value in result.items(): - if value == "": - result[key] = None - return result + # result = dataclasses.asdict(self) + # for key, value in result.items(): + # if value == "": + # result[key] = None + # return result + return dataclasses.asdict(self) @dataclasses.dataclass @@ -101,18 +108,25 @@ def get_ws_names(self) -> List[str]: def add_workspace(self, workspace: Workspace) -> bool: if workspace.ws_name in self.workspaces: - print(f"Workspace '{workspace.ws_name}' already exists in the config.") + logger.error(f"Workspace '{workspace.ws_name}' already exists in the config.") return False self.workspaces[workspace.ws_name] = workspace return True + def remove_workspace(self, workspace_name: str) -> bool: + if workspace_name not in self.workspaces: + logger.warning(f"Workspace '{workspace_name}' does not exist in the config.") + return False + del self.workspaces[workspace_name] + return True + -def load_workspaces_config_from_yaml_file(file_path: str): +def load_workspaces_config_from_yaml_file(file_path: str) -> WorkspacesConfig: """Load a WorkspacesConfig from a YAML file.""" return WorkspacesConfig.from_dict(load_yaml_file(file_path)) -def save_workspaces_config(filepath: str, config: WorkspacesConfig): +def save_workspaces_config(filepath: str, config: WorkspacesConfig) -> bool: """Save a WorkspacesConfig to a YAML file.""" return write_to_yaml_file(filepath, config.to_dict()) @@ -120,12 +134,12 @@ def save_workspaces_config(filepath: str, config: WorkspacesConfig): def update_workspaces_config(config_path: str, workspace: Workspace) -> bool: """Update the workspaces config with a new workspace.""" if not create_file_if_not_exists(config_path): - print("Could not create workspaces config file. Cannot proceed with porting.") + logger.error("Could not create workspaces config file.") return False workspaces_config = load_workspaces_config_from_yaml_file(config_path) if not workspaces_config.add_workspace(workspace): - print(f"Failed to add workspace '{workspace.ws_name}' to the config.") + logger.error(f"Failed to add workspace '{workspace.ws_name}' to the config.") return False # Backup current config file @@ -133,13 +147,13 @@ def update_workspaces_config(config_path: str, workspace: Workspace) -> bool: backup_filename = WORKSPACES_PATH_BACKUP_FORMAT.format(current_date) create_file_if_not_exists(backup_filename) shutil.copy(config_path, backup_filename) - print(f"Backed up current workspaces config file to '{backup_filename}'") + logger.info(f"Backed up current workspaces config file to '{backup_filename}'") if not save_workspaces_config(config_path, workspaces_config): - print(f"Failed to update YAML file '{config_path}'.") + logger.error(f"Failed to update YAML file '{config_path}'.") return False - print(f"Updated YAML file '{config_path}' with a new workspace '{workspace.ws_name}'") + logger.info(f"Updated YAML file '{config_path}' with a new workspace '{workspace.ws_name}'") return True @@ -160,29 +174,12 @@ def get_current_workspace_name() -> Union[str, None]: """Retrieve the current workspace name from the environment variable.""" ros_ws_name = os.environ.get(ROS_TEAM_WS_WS_NAME_ENV_VAR, None) if not ros_ws_name: - print(f"Environment variable '{ROS_TEAM_WS_WS_NAME_ENV_VAR}' not set.") + logger.debug(f"Environment variable '{ROS_TEAM_WS_WS_NAME_ENV_VAR}' not set.") return None return ros_ws_name -def extract_workspaces_from_bash_script(script_path: str) -> Dict[str, dict]: - """Extract workspaces from a bash script.""" - with open(script_path) as file: - data = file.read() - - workspaces = re.findall(r"(\w+ \(\) \{[^}]+\})", data, re.MULTILINE) - workspaces_dict = {} - for workspace in workspaces: - ws_name = re.search(r"(\w+) \(", workspace).group(1) # type: ignore - workspaces_dict[ws_name] = {} - variables = re.findall(r'(?:export )?(\w+)=(".*?"|\'.*?\'|[^ ]+)', workspace) - for var, val in variables: - workspaces_dict[ws_name][var] = val.strip('"') - - return workspaces_dict - - def env_var_to_workspace_var(env_var: str, env_var_value: Union[str, None]) -> Tuple[str, Any]: """Convert an environment variable to a workspace variable.""" ws_var = env_var.replace(ROS_TEAM_WS_PREFIX, "").lower() @@ -228,73 +225,6 @@ def get_expected_ws_field_names() -> List[str]: return [field.name for field in dataclasses.fields(Workspace)] -def try_port_workspace(workspace_data_to_port: Dict[str, Any], new_ws_name: str) -> bool: - """Try to port a workspace.""" - # set workspace missing fields to default values - if F_WS_DOCKER_SUPPORT not in workspace_data_to_port: - workspace_data_to_port[F_WS_DOCKER_SUPPORT] = False - if F_DOCKER_TAG not in workspace_data_to_port: - workspace_data_to_port[F_DOCKER_TAG] = None - if F_BASE_WS not in workspace_data_to_port: - workspace_data_to_port[F_BASE_WS] = None - if F_DOCKER_CONTAINER_NAME not in workspace_data_to_port: - workspace_data_to_port[F_DOCKER_CONTAINER_NAME] = None - if F_WS_NAME not in workspace_data_to_port: - workspace_data_to_port[F_WS_NAME] = new_ws_name - - # validate workspace fields - if not workspace_data_to_port[F_WS_DOCKER_SUPPORT]: - workspace_data_to_port[F_DOCKER_TAG] = None - if workspace_data_to_port[F_WS_DOCKER_SUPPORT]: - if not is_docker_tag_valid(workspace_data_to_port[F_DOCKER_TAG]): - return False - - # Identify and inform about unexpected fields - expected_ws_field_names = get_expected_ws_field_names() - current_ws_field_names = list(workspace_data_to_port.keys()) - unexpected_fields = set(current_ws_field_names) - set(expected_ws_field_names) - if unexpected_fields: - print(f"Current fields to port: {', '.join(current_ws_field_names)}") - print(f"Found unexpected fields: {', '.join(unexpected_fields)}") - print(f"Available fields are: {', '.join(expected_ws_field_names)}") - print("These unexpected fields will be skipped.") - for field in unexpected_fields: - del workspace_data_to_port[field] - - # ask user for missing ws fields - choices = [ - "Skip this workspace", - "Enter the missing field (validation not implemented yet)", - "Stop porting entirely", - ] - for expected_ws_field_name in expected_ws_field_names: - if expected_ws_field_name in current_ws_field_names: - continue - choice = questionary.select( - f"Missing field '{expected_ws_field_name}'. What would you like to do?", - choices=choices, - ).ask() - if choice is None: # Cancelled by user - return False - elif choice == choices[0]: - return False - if choice == choices[1]: - value = questionary.text(f"Enter value for {expected_ws_field_name}:").ask() - workspace_data_to_port[expected_ws_field_name] = value - else: - exit("Stopped porting due to a missing field.") - - workspace_to_port = Workspace(**workspace_data_to_port) - print(f"Updating workspace config in '{WORKSPACES_PATH}'") - success = update_workspaces_config(WORKSPACES_PATH, workspace_to_port) - if success: - print(f"Updated workspace config in '{WORKSPACES_PATH}'") - return True - else: - print(f"Updating workspace config in '{WORKSPACES_PATH}' failed") - return False - - def get_compile_cmd( ws_path_abs: str, distro: str, @@ -339,3 +269,74 @@ def workspace_name_completer(**kwargs) -> List[str]: if not ws_names: return ["NO_WORKSPACES_FOUND"] return ws_names + + +def create_choice_format_string( + ws_name_width: int, distro_width: int, ws_folder_width: int, num_spaces: int = 2 +) -> str: + return ( + f"{{ws_name:<{ws_name_width+num_spaces}}}" + f" {{distro:<{distro_width+num_spaces}}}" + f" {{ws_folder:<{ws_folder_width+num_spaces}}}" + f" {{docker_tag}}" + ) + + +def get_choice_format_template(choice_format: str) -> str: + return ( + "[" + + ( + "".join(c for c in choice_format if c.isalpha() or c in "_ {}") + .replace("{", "<") + .replace("}", ">") + ) + + "]" + ) + + +def get_selected_ws_names_from_user( + workspaces: Dict[str, Workspace], + select_question_msg: str = "Select workspaces", + confirm_question_msg: str = "Are you sure you want to select the following workspaces?", + start_index: int = 1, +) -> List[str]: + logger.debug("ws selection") + ws_name_width = max(len(ws_name) for ws_name in workspaces.keys()) + distro_width = max(len(ws.distro) for ws in workspaces.values()) + ws_folder_width = max(len(str(ws.ws_folder)) for ws in workspaces.values()) + choice_format = create_choice_format_string(ws_name_width, distro_width, ws_folder_width) + + choice_data = {} + for ws_name, ws in workspaces.items(): + ws_choice_data = choice_format.format( + ws_name=ws_name, + distro=ws.distro, + ws_folder=ws.ws_folder, + docker_tag=ws.docker_tag, + ) + choice_data[ws_choice_data] = ws_name + + choice_format_template = get_choice_format_template(choice_format) + ws_choices_to_delete = questionary.checkbox( + message=f"{select_question_msg} {choice_format_template}\n", + choices=list(choice_data.keys()), + style=questionary.Style([("highlighted", "bold")]), + ).ask() + if not ws_choices_to_delete: # cancelled by user + exit(0) + + logger.debug("ws selection confirmation") + workspaces_to_confirm = "\n".join( + f"{i}. {ws_choice}" for i, ws_choice in enumerate(ws_choices_to_delete, start=start_index) + ) + + confirm_delete = questionary.confirm( + f"{confirm_question_msg} {choice_format_template}" f"\n{workspaces_to_confirm}\n" + ).ask() + if not confirm_delete: # cancelled by user + exit(0) + + ws_names_to_delete = [ + choice_data[ws_choice_to_delete] for ws_choice_to_delete in ws_choices_to_delete + ] + return ws_names_to_delete diff --git a/rtwcli/rtwcli/setup.py b/rtwcli/rtwcli/setup.py index dbf8561d..3546fa3a 100644 --- a/rtwcli/rtwcli/setup.py +++ b/rtwcli/rtwcli/setup.py @@ -19,7 +19,7 @@ name="rtwcli", version="0.2.0", packages=find_packages(exclude=["test"]), - install_requires=["argcomplete", "docker", "questionary", "rocker"], + install_requires=["argcomplete", "docker", "questionary", "rich", "rocker"], zip_safe=False, keywords=[], classifiers=[