diff --git a/geochemistrypi/data_mining/cli_pipeline.py b/geochemistrypi/data_mining/cli_pipeline.py index 2fdde4e8..a11ebd7f 100644 --- a/geochemistrypi/data_mining/cli_pipeline.py +++ b/geochemistrypi/data_mining/cli_pipeline.py @@ -27,8 +27,10 @@ NON_AUTOML_MODELS, OPTION, OUTPUT_PATH, + PACKAGEDIR, REGRESSION_MODELS, REGRESSION_MODELS_WITH_MISSING_VALUES, + RELEASE_VERSION, SECTION, TEST_DATA_OPTION, TOGGLE_ADDRESS_STATUS, @@ -61,6 +63,8 @@ from .process.decompose import DecompositionModelSelection from .process.detect import AnomalyDetectionModelSelection from .process.regress import RegressionModelSelection +from .update.anaconda_checker import anoconda_installer +from .update.base import get_latest_release_version, run_bat_file from .utils.base import check_package, clear_output, copy_files, create_geopi_output_dir, get_os, install_package, log, save_data, show_warning from .utils.mlflow_utils import retrieve_previous_experiment_id from .utils.toggle_address_status import toggle_address_status @@ -83,6 +87,18 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = The path of the application data, by default None """ + # Check if Anaconda is installed on the computer. + anoconda_installer() + + # Check if it is the latest version, if not, download it. + repo_owner = "ZJUEarthData" + repo_name = "Geochemistrypi" + version = get_latest_release_version(repo_owner, repo_name) + if str(version) != RELEASE_VERSION: + update_path = os.path.join(PACKAGEDIR, "bat", "latestversion_installer.bat") + update_path = "{} '{}'".format(update_path, str(version)) + run_bat_file(update_path) + # Local test: Uncomment the following line to utilize built-in datasets to test the pipeline. Don't forget to modify the path value to be consistent with your own location. # training_data_path = "/Users/can/Documents/github/work/geo_ml/geochemistrypi/geochemistrypi/data_mining/data/dataset/Data_Classification.xlsx" # application_data_path = "/Users/can/Documents/github/work/geo_ml/geochemistrypi/geochemistrypi/data_mining/data/dataset/Data_Classification.xlsx" @@ -104,7 +120,11 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = sleep(0.75) # Call toggle_address_status and pass status and training_data_path as parameters to obtain the address of the training data - training_data_path = toggle_address_status(status=TOGGLE_ADDRESS_STATUS, training_data_path=training_data_path, user_conformation=1)[0] + training_data_path = toggle_address_status( + status=TOGGLE_ADDRESS_STATUS, + training_data_path=training_data_path, + user_conformation=1, + )[0] # Check if the length of training_data_path is greater than 1 if len(training_data_path) > 1: @@ -288,8 +308,20 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = distribution_plot(data_selected.columns, data_selected, name_column_select) log_distribution_plot(data_selected.columns, data_selected, name_column_select) GEOPI_OUTPUT_ARTIFACTS_DATA_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH") - save_data(data, name_column_origin, "Data Original", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) - save_data(data_selected, name_column_select, "Data Selected", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data, + name_column_origin, + "Data Original", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) + save_data( + data_selected, + name_column_select, + "Data Selected", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) data_selected_name = pd.concat([name_column_select, data_selected], axis=1) clear_output() @@ -353,7 +385,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(data_selected_dropped) print("Basic Statistical Information:") drop_name_column = data_selected_dropped_name.iloc[:, 0] - save_data(data_selected_dropped, drop_name_column, "Data Selected Dropped-Imputed", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data_selected_dropped, + drop_name_column, + "Data Selected Dropped-Imputed", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) drop_rows_with_missing_value_flag = True imputed_flag = False elif drop_missing_value_strategy_num == 2: @@ -377,7 +415,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(data_selected_dropped) print("Basic Statistical Information:") drop_name_column = data_selected_dropped_name.iloc[:, 0] - save_data(data_selected_dropped, drop_name_column, "Data Selected Dropped-Imputed", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data_selected_dropped, + drop_name_column, + "Data Selected Dropped-Imputed", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) drop_rows_with_missing_value_flag = True imputed_flag = False missing_value_flag = check_missing_value(data_selected_dropped) @@ -391,12 +435,24 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = else: # Don't deal with the missing values, which means neither drop the rows with missing values nor use imputation techniques. imputed_flag = False - save_data(data_selected, name_column_select, "Data Selected Dropped-Imputed", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data_selected, + name_column_select, + "Data Selected Dropped-Imputed", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) clear_output() else: # If the selected data set doesn't have missing values, then don't deal with the missing values. imputed_flag = False - save_data(data_selected, name_column_select, "Data Selected Dropped-Imputed", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data_selected, + name_column_select, + "Data Selected Dropped-Imputed", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) clear_output() data_selected = data_selected_dropped if drop_rows_with_missing_value_flag else data_selected drop_name_column = data_selected_dropped_name.iloc[:, 0] if drop_rows_with_missing_value_flag else name_column_select @@ -426,7 +482,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = probability_plot(data_selected.columns, data_selected, data_selected_imputed) basic_info(data_selected_imputed) basic_statistic(data_selected_imputed) - save_data(data_selected_imputed, drop_name_column, "Data Selected Dropped-Imputed", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + data_selected_imputed, + drop_name_column, + "Data Selected Dropped-Imputed", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) del data_selected clear_output() else: @@ -483,7 +545,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(X) print("Basic Statistical Information: ") basic_statistic(X) - save_data(X, name_all, "X Without Scaling", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + X, + name_all, + "X Without Scaling", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) clear_output() # Create Y data set @@ -499,7 +567,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(y) print("Basic Statistical Information: ") basic_statistic(y) - save_data(y, name_all, "Y", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + y, + name_all, + "Y", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) clear_output() # <--- Feature Scaling ---> @@ -517,7 +591,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(X) print("Basic Statistical Information: ") basic_statistic(X) - save_data(X, name_all, "X With Scaling", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + X, + name_all, + "X With Scaling", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) else: feature_scaling_config = {} clear_output() @@ -533,7 +613,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = feature_selection_config, X = feature_selector(X, y, mode_num, FEATURE_SELECTION_STRATEGY, feature_selection_num - 1) print("--Selected Features-") show_data_columns(X.columns) - save_data(X, name_all, "X After Feature Selection", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + X, + name_all, + "X After Feature Selection", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) else: feature_selection_config = {} clear_output() @@ -555,10 +641,19 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = data_name_column = train_test_data["Name Train"] else: data_name_column = train_test_data["Name Test"] - save_data(value, data_name_column, key, GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + value, + data_name_column, + key, + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) X_train, X_test = train_test_data["X Train"], train_test_data["X Test"] y_train, y_test = train_test_data["Y Train"], train_test_data["Y Test"] - name_train, name_test = train_test_data["Name Train"], train_test_data["Name Test"] + name_train, name_test = ( + train_test_data["Name Train"], + train_test_data["Name Test"], + ) del data_selected_imputed_fe clear_output() else: @@ -580,7 +675,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = print(X) print("Basic Statistical Information: ") basic_statistic(X) - save_data(X, name_all, "X With Scaling", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + X, + name_all, + "X With Scaling", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) else: feature_scaling_config = {} clear_output() @@ -588,7 +689,14 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = feature_selection_config = {} # Create training data without data split because it is unsupervised learning X_train = X - y, X_test, y_train, y_test, name_train, name_test = None, None, None, None, None, None + y, X_test, y_train, y_test, name_train, name_test = ( + None, + None, + None, + None, + None, + None, + ) name_all = drop_name_column # <--- Model Selection ---> logger.debug("Model Selection") @@ -604,10 +712,24 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = # If the selected data set is with missing values and is not been imputed, then only allow the user to choose regression, classification and clustering models. # Otherwise, allow the user to choose decomposition models. if missing_value_flag and not process_missing_value_flag: - Modes2Models = {1: REGRESSION_MODELS_WITH_MISSING_VALUES, 2: CLASSIFICATION_MODELS_WITH_MISSING_VALUES, 3: CLUSTERING_MODELS_WITH_MISSING_VALUES} - Modes2Initiators = {1: RegressionModelSelection, 2: ClassificationModelSelection, 3: ClusteringModelSelection} + Modes2Models = { + 1: REGRESSION_MODELS_WITH_MISSING_VALUES, + 2: CLASSIFICATION_MODELS_WITH_MISSING_VALUES, + 3: CLUSTERING_MODELS_WITH_MISSING_VALUES, + } + Modes2Initiators = { + 1: RegressionModelSelection, + 2: ClassificationModelSelection, + 3: ClusteringModelSelection, + } else: - Modes2Models = {1: REGRESSION_MODELS, 2: CLASSIFICATION_MODELS, 3: CLUSTERING_MODELS, 4: DECOMPOSITION_MODELS, 5: ANOMALYDETECTION_MODELS} + Modes2Models = { + 1: REGRESSION_MODELS, + 2: CLASSIFICATION_MODELS, + 3: CLUSTERING_MODELS, + 4: DECOMPOSITION_MODELS, + 5: ANOMALYDETECTION_MODELS, + } Modes2Initiators = { 1: RegressionModelSelection, 2: ClassificationModelSelection, @@ -654,16 +776,46 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = inference_data_fe = new_feature_builder.batch_build(feature_engineering_config) inference_data_fe_selected = inference_data_fe[selected_columns] inference_name_column = inference_data[NAME] - save_data(inference_data, name_column_origin, "Application Data Original", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) - save_data(inference_data_fe, inference_name_column, "Application Data Feature-Engineering", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) - save_data(inference_data_fe_selected, inference_name_column, "Application Data Feature-Engineering Selected", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + inference_data, + name_column_origin, + "Application Data Original", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) + save_data( + inference_data_fe, + inference_name_column, + "Application Data Feature-Engineering", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) + save_data( + inference_data_fe_selected, + inference_name_column, + "Application Data Feature-Engineering Selected", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) else: print("You have not applied feature engineering to the training data.") print("Hence, no feature engineering operation will be applied to the inference data.") inference_data_fe_selected = inference_data[selected_columns] inference_name_column = inference_data[NAME] - save_data(inference_data, name_column_origin, "Application Data Original", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) - save_data(inference_data_fe_selected, inference_name_column, "Application Data Selected", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH) + save_data( + inference_data, + name_column_origin, + "Application Data Original", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) + save_data( + inference_data_fe_selected, + inference_name_column, + "Application Data Selected", + GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, + MLFLOW_ARTIFACT_DATA_PATH, + ) else: # If the user doesn't provide the inference data path, it means that the user doesn't want to run the model inference. print("You did not enter inference data.") @@ -687,14 +839,32 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = if not is_automl: run.activate(X, y, X_train, X_test, y_train, y_test, name_train, name_test, name_all) else: - run.activate(X, y, X_train, X_test, y_train, y_test, name_train, name_test, name_all, is_automl) + run.activate( + X, + y, + X_train, + X_test, + y_train, + y_test, + name_train, + name_test, + name_all, + is_automl, + ) clear_output() # <--- Transform Pipeline ---> # Construct the transform pipeline using sklearn.pipeline.make_pipeline method. logger.debug("Transform Pipeline") print("-*-*- Transform Pipeline Construction -*-*-") - transformer_config, transform_pipeline = build_transform_pipeline(imputation_config, feature_scaling_config, feature_selection_config, run, X_train, y_train) + transformer_config, transform_pipeline = build_transform_pipeline( + imputation_config, + feature_scaling_config, + feature_selection_config, + run, + X_train, + y_train, + ) clear_output() # <--- Model Inference ---> @@ -706,7 +876,14 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = if drop_rows_with_missing_value_flag: inference_data_fe_selected_dropped = inference_data_fe_selected.dropna() inference_name_column = inference_data[NAME] - model_inference(inference_data_fe_selected_dropped, inference_name_column, is_inference, run, transformer_config, transform_pipeline) + model_inference( + inference_data_fe_selected_dropped, + inference_name_column, + is_inference, + run, + transformer_config, + transform_pipeline, + ) save_data( inference_data_fe_selected_dropped, inference_name_column, @@ -716,7 +893,14 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = ) else: inference_name_column = inference_data[NAME] - model_inference(inference_data_fe_selected, inference_name_column, is_inference, run, transformer_config, transform_pipeline) + model_inference( + inference_data_fe_selected, + inference_name_column, + is_inference, + run, + transformer_config, + transform_pipeline, + ) clear_output() # <--- Data Dumping ---> @@ -725,7 +909,12 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = GEOPI_OUTPUT_ARTIFACTS_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_PATH") GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH") GEOPI_OUTPUT_PARAMETERS_PATH = os.getenv("GEOPI_OUTPUT_PARAMETERS_PATH") - copy_files(GEOPI_OUTPUT_ARTIFACTS_PATH, GEOPI_OUTPUT_METRICS_PATH, GEOPI_OUTPUT_PARAMETERS_PATH, GEOPI_OUTPUT_SUMMARY_PATH) + copy_files( + GEOPI_OUTPUT_ARTIFACTS_PATH, + GEOPI_OUTPUT_METRICS_PATH, + GEOPI_OUTPUT_PARAMETERS_PATH, + GEOPI_OUTPUT_SUMMARY_PATH, + ) else: # Run all models @@ -736,20 +925,58 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = run = Modes2Initiators[mode_num](MODELS[i]) # If is_automl is False, then run all models without AutoML. if not is_automl: - run.activate(X, y, X_train, X_test, y_train, y_test, name_train, name_test, name_all) + run.activate( + X, + y, + X_train, + X_test, + y_train, + y_test, + name_train, + name_test, + name_all, + ) else: # If is_automl is True, but MODELS[i] is in the NON_AUTOML_MODELS, then run the model without AutoML. if MODELS[i] in NON_AUTOML_MODELS: - run.activate(X, y, X_train, X_test, y_train, y_test, name_train, name_test, name_all) + run.activate( + X, + y, + X_train, + X_test, + y_train, + y_test, + name_train, + name_test, + name_all, + ) else: # If is_automl is True, and MODELS[i] is not in the NON_AUTOML_MODELS, then run the model with AutoML. - run.activate(X, y, X_train, X_test, y_train, y_test, name_train, name_test, name_all, is_automl) + run.activate( + X, + y, + X_train, + X_test, + y_train, + y_test, + name_train, + name_test, + name_all, + is_automl, + ) # <--- Transform Pipeline ---> # Construct the transform pipeline using sklearn.pipeline.make_pipeline method. logger.debug("Transform Pipeline") print("-*-*- Transform Pipeline Construction -*-*-") - transformer_config, transform_pipeline = build_transform_pipeline(imputation_config, feature_scaling_config, feature_selection_config, run, X_train, y_train) + transformer_config, transform_pipeline = build_transform_pipeline( + imputation_config, + feature_scaling_config, + feature_selection_config, + run, + X_train, + y_train, + ) # <--- Model Inference ---> # If the user provides the inference data, then run the model inference. @@ -760,7 +987,14 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = if drop_rows_with_missing_value_flag: inference_data_fe_selected_dropped = inference_data_fe_selected.dropna() inference_name_column = inference_data[NAME] - model_inference(inference_data_fe_selected_dropped, inference_name_column, is_inference, run, transformer_config, transform_pipeline) + model_inference( + inference_data_fe_selected_dropped, + inference_name_column, + is_inference, + run, + transformer_config, + transform_pipeline, + ) save_data( inference_data_fe_selected_dropped, inference_name_column, @@ -770,7 +1004,14 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = ) else: inference_name_column = inference_data[NAME] - model_inference(inference_data_fe_selected, inference_name_column, is_inference, run, transformer_config, transform_pipeline) + model_inference( + inference_data_fe_selected, + inference_name_column, + is_inference, + run, + transformer_config, + transform_pipeline, + ) clear_output() # <--- Data Dumping ---> @@ -779,6 +1020,11 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = GEOPI_OUTPUT_ARTIFACTS_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_PATH") GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH") GEOPI_OUTPUT_PARAMETERS_PATH = os.getenv("GEOPI_OUTPUT_PARAMETERS_PATH") - copy_files(GEOPI_OUTPUT_ARTIFACTS_PATH, GEOPI_OUTPUT_METRICS_PATH, GEOPI_OUTPUT_PARAMETERS_PATH, GEOPI_OUTPUT_SUMMARY_PATH) + copy_files( + GEOPI_OUTPUT_ARTIFACTS_PATH, + GEOPI_OUTPUT_METRICS_PATH, + GEOPI_OUTPUT_PARAMETERS_PATH, + GEOPI_OUTPUT_SUMMARY_PATH, + ) mlflow.end_run() diff --git a/geochemistrypi/data_mining/constants.py b/geochemistrypi/data_mining/constants.py index 0ed654c0..4191607b 100644 --- a/geochemistrypi/data_mining/constants.py +++ b/geochemistrypi/data_mining/constants.py @@ -2,6 +2,9 @@ from .utils.toggle_address_status import toggle_address_status +# Release version +RELEASE_VERSION = "0.5.0" + # Adjust the path of project data flow: The number 1 indicates standard mode, and the number 2 indicates APP mode. TOGGLE_ADDRESS_STATUS = 1 diff --git a/geochemistrypi/data_mining/update/_base.py b/geochemistrypi/data_mining/update/_base.py deleted file mode 100644 index e69de29b..00000000 diff --git a/geochemistrypi/data_mining/update/anaconda_checker.py b/geochemistrypi/data_mining/update/anaconda_checker.py index e69de29b..a571adef 100644 --- a/geochemistrypi/data_mining/update/anaconda_checker.py +++ b/geochemistrypi/data_mining/update/anaconda_checker.py @@ -0,0 +1,17 @@ +import os + +from ..constants import PACKAGEDIR +from .base import is_software_installed, run_bat_file + + +def anoconda_installer() -> None: + """ + Install Anaconda if it is not already installed. + """ + + detection = is_software_installed("conda.exe") + if detection: + pass + else: + conda_installer_path = os.path.join(PACKAGEDIR, "bat", "pre-installer.bat") + run_bat_file(conda_installer_path) diff --git a/geochemistrypi/data_mining/update/base.py b/geochemistrypi/data_mining/update/base.py new file mode 100644 index 00000000..058dcc74 --- /dev/null +++ b/geochemistrypi/data_mining/update/base.py @@ -0,0 +1,96 @@ +import subprocess +from typing import Optional + +import requests + + +def is_software_installed(software_name: str) -> bool: + """ + Check if a specified software is installed on the system. + + Parameters + ---------- + software_name : str + The name of the software to check for installation. + + Returns + ------- + bool + True if the software is installed, False otherwise. + + """ + + try: + # Use the where command to find the executable file of the software + result = subprocess.run(["where", software_name], capture_output=True, text=True) + + # If the where command returns 0, it means the software was found + if result.returncode == 0: + print(f"{software_name} is installed.") + return True + else: + print(f"{software_name} is not installed.") + return False + except Exception as e: + print(f"An error occurred: {e}") + return False + + +def run_bat_file(bat_file_path: Optional[list]) -> None: + """ + Execute a specified batch file. + + Parameters + ---------- + bat_file_path : list + The path to the batch file to be executed. + """ + + try: + subprocess.run([bat_file_path], check=True, shell=True) + print(f"Bat file executed successfully: {bat_file_path}") + except subprocess.CalledProcessError as e: + print(f"An error occurred while executing the bat file: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +def get_latest_release_version(repo_owner: str, repo_name: str) -> str: + """ + Retrieve the latest release version of a GitHub repository. + + Parameters + ---------- + repo_owner : str + The owner of the GitHub repository. + repo_name : str + The name of the GitHub repository. + + Returns + ------- + str + The latest release version number, or None if the version cannot be retrieved. + """ + + # GitHub API URL for the latest release + api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/releases/latest" + + try: + # Send a GET request to the GitHub API + response = requests.get(api_url) + response.raise_for_status() # Raise an exception for HTTP errors + + # Parse the JSON response + release_info = response.json() + + # Extract the version number from the tag_name + if "tag_name" in release_info: + version_number = release_info["tag_name"] + print(f"Latest release version: {version_number}") + return version_number + else: + print("No tag_name found in the latest release.") + return None + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + return None