Skip to content

Commit

Permalink
refactor: refactor the data source loading.
Browse files Browse the repository at this point in the history
  • Loading branch information
SanyHe committed Dec 21, 2024
1 parent 2e5928f commit cc0a511
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 96 deletions.
2 changes: 1 addition & 1 deletion geochemistrypi/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.1"
__version__ = "0.7.0"
31 changes: 25 additions & 6 deletions geochemistrypi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
from typing import Optional

import typer
from rich import print

from ._version import __version__
from .data_mining.cli_pipeline import cli_pipeline
from .data_mining.constants import WORKING_PATH
from .data_mining.enum import DataSource

app = typer.Typer()

CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
FRONTEND_PATH = os.path.join(CURRENT_PATH, "frontend")
BACKEND_PATH = os.path.join(CURRENT_PATH, "start_dash_pipeline.py")
PIPELINE_PATH = os.path.join(CURRENT_PATH, "start_cli_pipeline.py")
MLFLOW_STORE_PATH = os.path.join(f"file:{WORKING_PATH}", "geopi_tracking")


def _version_callback(value: bool) -> None:
Expand All @@ -39,6 +39,7 @@ def main(version: Optional[bool] = typer.Option(None, "--version", "-v", help="S
@app.command()
def data_mining(
data: str = typer.Option("", help="The path of the training data without model inference."),
desktop: bool = typer.Option(False, help="Use the data in the directory 'geopi_input' on the desktop for training and inference."),
training: str = typer.Option("", help="The path of the training data."),
application: str = typer.Option("", help="The path of the inference data."),
mlflow: bool = typer.Option(False, help="Start the mlflow server."),
Expand All @@ -58,6 +59,21 @@ def start_frontend():

def start_mlflow():
"""Start the mlflow server."""
# Check if the current working directory has the 'geopi_tracking' directory to store the tracking data for mlflow
# If yes, set the MLFLOW_STORE_PATH to the current working directory
# If no, set the MLFLOW_STORE_PATH to the desktop
geopi_tracking_dir = os.path.join(os.getcwd(), "geopi_tracking")
if not os.path.exists(geopi_tracking_dir):
print("[bold red]The 'geopi_tracking' directory is not found in the current working directory.[bold red]")
geopi_tracking_dir = os.path.join(os.path.expanduser("~"), "Desktop", "geopi_tracking")
if not os.path.exists(geopi_tracking_dir):
print("[bold red]The 'geopi_tracking' directory is not found on the desktop.[bold red]")
print("[bold red]Our software will create a 'geopi_tracking' directory on the desktop to store the tracking data for mlflow.[bold red]")
else:
print("[bold green]The 'geopi_tracking' directory is found on the desktop.[bold green]")
print("[bold green]Our software will use the 'geopi_tracking' directory on the desktop to store the tracking data for mlflow.[bold green]")
MLFLOW_STORE_PATH = os.path.join("file:", geopi_tracking_dir)
print("[bold green]Press [bold magenta]Ctrl + C[/bold magenta] to close mlflow server at any time.[bold green]")
start_mlflow_command = f"mlflow ui --backend-store-uri {MLFLOW_STORE_PATH} "
subprocess.run(start_mlflow_command, shell=True)

Expand All @@ -76,16 +92,19 @@ def start_mlflow():
# Start mlflow server to track the experiment
mlflow_thread = threading.Thread(target=start_mlflow)
mlflow_thread.start()
elif desktop:
# Start the CLI pipeline with the data in the directory 'geopi_input' on the desktop
cli_pipeline(training_data_path="", application_data_path="", data_source=DataSource.DESKTOP)
else:
# If the data is provided, start the CLI pipeline with continuous training
if data:
cli_pipeline(data)
cli_pipeline(training_data_path=data, application_data_path="", data_source=DataSource.ANY_PATH)
# If the training data and inference data are provided, start the CLI pipeline with continuous training and inference
elif training and application:
cli_pipeline(training, application)
# If no data is provided, use built-in data to start the CLI pipeline with continuous training and inference
cli_pipeline(training_data_path=training, application_data_path=application, data_source=DataSource.ANY_PATH)
# If no data is provided, look for the data in the desktop to start the CLI pipeline with continuous training and inference
else:
cli_pipeline(training, application)
cli_pipeline(training_data_path="", application_data_path="", data_source=DataSource.BUILT_IN)


@app.command()
Expand Down
114 changes: 81 additions & 33 deletions geochemistrypi/data_mining/cli_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@
MODE_OPTION_WITH_MISSING_VALUES,
NON_AUTOML_MODELS,
OPTION,
OUTPUT_PATH,
REGRESSION_MODELS,
REGRESSION_MODELS_WITH_MISSING_VALUES,
SECTION,
TEST_DATA_OPTION,
TOGGLE_ADDRESS_STATUS,
WORKING_PATH,
)
from .data.data_readiness import (
basic_info,
Expand All @@ -53,19 +50,19 @@
from .data.inference import build_transform_pipeline, model_inference
from .data.preprocessing import feature_scaler, feature_selector
from .data.statistic import monte_carlo_simulator
from .enum import DataSource
from .plot.map_plot import process_world_map
from .plot.statistic_plot import basic_statistic, check_missing_value, correlation_plot, distribution_plot, is_null_value, log_distribution_plot, probability_plot, ratio_null_vs_filled
from .process.classify import ClassificationModelSelection
from .process.cluster import ClusteringModelSelection
from .process.decompose import DecompositionModelSelection
from .process.detect import AnomalyDetectionModelSelection
from .process.regress import RegressionModelSelection
from .utils.base import check_package, clear_output, copy_files, create_geopi_output_dir, get_os, install_package, log, save_data, show_warning
from .utils.base import check_package, clear_output, copy_files, create_geopi_output_dir, get_os, install_package, list_excel_files, log, save_data, show_warning
from .utils.mlflow_utils import retrieve_previous_experiment_id
from .utils.toggle_address_status import toggle_address_status


def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = None) -> None:
def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = None, data_source: Optional[DataSource] = None) -> None:
"""The command line interface software for Geochemistry π.
The business logic of this CLI software can be found in the figures in the README.md file.
It provides three MLOps core functionalities:
Expand All @@ -82,37 +79,76 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
The path of the application data, by default None
"""

# Local test: Uncomment the following line to utilize built-in datasets to test the pipeline. Don't forget to modify the path value to be consistent with your own location.
# training_data_path = "/Users/can/Documents/github/work/geo_ml/geochemistrypi/geochemistrypi/data_mining/data/dataset/Data_Classification.xlsx"
# application_data_path = "/Users/can/Documents/github/work/geo_ml/geochemistrypi/geochemistrypi/data_mining/data/dataset/Data_Classification.xlsx"

# Local test: If the argument is False, hide all Python level warnings. Developers can turn it on by setting the argument to True.
show_warning(False)

os.makedirs(OUTPUT_PATH, exist_ok=True)
logger = log(OUTPUT_PATH, "geopi_inner_test.log")
logger.info("Geochemistry Pi is running.")

# Display the interactive splash screen when launching the CLI software
console = Console()
print("\n[bold blue]Welcome to Geochemistry π![/bold blue]")
print("[bold]Initializing...[/bold]")
print("[bold blue]Three cores components:[/bold blue]")
print("✨ [bold blue]Continuous Training[/bold blue]")
print("✨ [bold blue]Model Inference[/bold blue]")
print("✨ [bold blue]Machine Learning Lifecycle Management[/bold blue]")
print("[bold green]Initializing...[/bold green]")

# Set the working path based on the data source
# If the user uses the built-in data, the working path is the desktop, the output path is the desktop.
# If the user uses the desktop data, the working path is the desktop, the output path is the desktop.
# If the user uses the any path, the working path is the current working directory, the output path is the current working directory.
if data_source == DataSource.BUILT_IN:
# If the user uses the built-in data, the working path is the desktop.
WORKING_PATH = os.path.join(os.path.expanduser("~"), "Desktop")
elif data_source == DataSource.DESKTOP:
WORKING_PATH = os.path.join(os.path.expanduser("~"), "Desktop")
INPUT_PATH = os.path.join(WORKING_PATH, "geopi_input")
if not os.path.exists(INPUT_PATH):
print("[bold red]The 'geopi_input' directory is not found on the desktop.[/bold red]")
os.makedirs(INPUT_PATH, exist_ok=True)
print("[bold green]Creating the 'geopi_input' directory ...[/bold green]")
print("[bold green]Successfully create 'geopi_input' directory on the desktop.[/bold green]")
print("Please restart the software after putting the data in the 'geopi_input' directory.")
print("Currently, the data file format only supports '.xlsx', '.xls', '.csv'.")
print("If you want to activate the model inference, please put the 'application data' in it as well.")
print("Check our online documentation for more information on the format of the 'application data'.")
clear_output("(Press Enter key to exit)")
exit(1)

with console.status("[bold green]Data Loading...[/bold green]", spinner="dots"):
sleep(1)

# List all existing Excel files in the 'geopi_input' directory on the desktop.
existing_excel_files = list_excel_files(INPUT_PATH)
if len(existing_excel_files) == 0:
print("[bold red]No data files found in the 'geopi_input' directory on the desktop.[/bold red]")
print("[bold green]Please put the data files in the 'geopi_input' directory on the desktop.[/bold green]")
clear_output("(Press Enter key to exit)")
exit(1)
show_excel_columns(existing_excel_files)

# Read the training data from the Excel file.
print("Please select the training data by index:")
# Limit the user input to a number within the range of available files and assign the result to training_data_path
training_data_path = existing_excel_files[limit_num_input(range(1, len(existing_excel_files) + 1), SECTION[0], num_input) - 1]
is_application_data = Confirm.ask("Do you want to activate the inference functionality", default=False)
if is_application_data:
# Read the application data from the Excel file.
print("Please select the application data by index:")
# Limit the user input to a number within the range of available files and assign the result to application_data_path
application_data_path = existing_excel_files[limit_num_input(range(1, len(existing_excel_files) + 1), SECTION[0], num_input) - 1]
elif data_source == DataSource.ANY_PATH:
WORKING_PATH = os.getcwd()

# Set the output path to the working path
OUTPUT_PATH = os.path.join(WORKING_PATH, "geopi_output")
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Set the log file path
logger = log(OUTPUT_PATH, "geopi_inner_test.log")
logger.info("Geochemistry Pi is running.")

# <-- User Training Data Loading -->
with console.status("[bold green]Training Data Loading...[/bold green]", spinner="dots"):
sleep(0.75)

# Call toggle_address_status and pass status and training_data_path as parameters to obtain the address of the training data
training_data_path = toggle_address_status(status=TOGGLE_ADDRESS_STATUS, training_data_path=training_data_path)[0]

# Check if the length of training_data_path is greater than 1
if len(training_data_path) > 1:
# Display the columns of the Excel file located at training_data_path
show_excel_columns(training_data_path)
print("Please select only one file that you want to process:")
# Limit the user input to a number within the range of available files and assign the result to training_data_path
training_data_path = training_data_path[limit_num_input(range(1, len(training_data_path) + 1), SECTION[0], num_input) - 1]

if training_data_path:
# If the user provides file name, then load the training data from the file.
data = read_data(file_path=training_data_path, is_own_data=1)
Expand All @@ -124,6 +160,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# <-- User Application Data Loading -->
with console.status("[bold green]Application Data Loading...[/bold green]", spinner="dots"):
sleep(0.75)
# Three scenarios for the application data loading:
# 1. The user provides the training data path and the application data path.
# - The user wants to use the model inference.
# 2. The user provides the training data path but doesn't provide the application data path.
# - The user doesn't want to use the model inference.
# 3. The user doesn't provide the training data path and the application data path.
# - The continuous training and model inference will use the built-in data.
is_built_in_inference_data = False
if training_data_path and application_data_path:
# If the user provides file name, then load the inference data from the file.
Expand Down Expand Up @@ -169,8 +212,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# Create a new experiment or use the previous experiment
is_used_previous_experiment = Confirm.ask("✨ Use Previous Experiment", default=False)
# Set the tracking uri to the local directory, in the future, we can set it to the remote server.
experiments_localtion = f"file:{WORKING_PATH}/geopi_tracking"
mlflow.set_tracking_uri(experiments_localtion)
experiments_location = os.path.join("file:", WORKING_PATH, "geopi_tracking")
mlflow.set_tracking_uri(experiments_location)
# Print the tracking uri for debugging.
# print("tracking uri:", mlflow.get_tracking_uri())
if is_used_previous_experiment:
Expand Down Expand Up @@ -207,7 +250,7 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# run_description = Prompt.ask("✨ Run Description", default="Use xgboost for GeoPi classification.")
# mlflow.start_run(run_name=run_name, experiment_id=experiment.experiment_id, tags={"version": run_tag, "description": run_description})
mlflow.start_run(run_name=run_name, experiment_id=experiment.experiment_id)
create_geopi_output_dir(experiment.name, run_name)
create_geopi_output_dir(OUTPUT_PATH, experiment.name, run_name)
clear_output()

# <--- Built-in Training Data Loading --->
Expand Down Expand Up @@ -235,6 +278,11 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# <--- Built-in Application Data Loading --->
logger.debug("Built-in Application Data Loading")
# If the user doesn't provide training data path and inference data path, then use the built-in inference data.
# There are two scenarios for the built-in inference data loading:
# 1. The user chooses the built-in training data for regression or classification.
# - Only the supervised learning mode supports model inference.
# 2. The user chooses the built-in training data for clustering, decomposition or anomaly detection.
# - The unsupervised learning mode doesn't support model inference.
if is_built_in_inference_data and built_in_training_data_num == 1:
application_data_path = "ApplicationData_Regression.xlsx"
inference_data = read_data(file_path=application_data_path)
Expand Down Expand Up @@ -616,7 +664,7 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# Add the option of all models
all_models_num = len(MODELS) + 1
print(str(all_models_num) + " - All models above to be trained")
print("Which model do you want to apply?(Enter the Corresponding Number)")
print("Which model do you want to apply?")
MODELS.append("all_models")
model_num = limit_num_input(MODELS, SECTION[2], num_input)
clear_output()
Expand All @@ -628,7 +676,7 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
if mode_num == 1 or mode_num == 2:
# If the model is not in the NON_AUTOML_MODELS, then ask the user whether to use AutoML.
if model_name not in NON_AUTOML_MODELS:
print("Do you want to employ automated machine learning with respect to this algorithm?" "(Enter the Corresponding Number):")
print("Do you want to employ automated machine learning with respect to this algorithm?")
num2option(OPTION)
automl_num = limit_num_input(OPTION, SECTION[2], num_input)
if automl_num == 1:
Expand Down Expand Up @@ -728,7 +776,7 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
for i in range(len(MODELS) - 1):
# Start a nested MLflow run within the current MLflow run
with mlflow.start_run(run_name=MODELS[i], experiment_id=experiment.experiment_id, nested=True):
create_geopi_output_dir(experiment.name, run_name, MODELS[i])
create_geopi_output_dir(OUTPUT_PATH, experiment.name, run_name, MODELS[i])
run = Modes2Initiators[mode_num](MODELS[i])
# If is_automl is False, then run all models without AutoML.
if not is_automl:
Expand Down
11 changes: 0 additions & 11 deletions geochemistrypi/data_mining/constants.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import os

from .utils.toggle_address_status import toggle_address_status

# Adjust the path of project data flow: The number 1 indicates standard mode, and the number 2 indicates APP mode.
TOGGLE_ADDRESS_STATUS = 1

# The number of uploading dataset per user is limited to 5.
MAX_UPLOADS_PER_USER = 5

Expand All @@ -14,12 +9,6 @@
# the directory where the built-in data set to be processed stays
BUILT_IN_DATASET_PATH = os.path.join(PACKAGEDIR, "data", "dataset")

# current working directory in which the user activates the application
WORKING_PATH = toggle_address_status(status=TOGGLE_ADDRESS_STATUS)[1]

# the root directory where all the output stays
OUTPUT_PATH = os.path.join(WORKING_PATH, "geopi_output")

# the directory where the artifact is saved within the MLflow run's artifact directory
MLFLOW_ARTIFACT_DATA_PATH = "data"
MLFLOW_ARTIFACT_IMAGE_STATISTIC_PATH = os.path.join("image", "statistic")
Expand Down
6 changes: 3 additions & 3 deletions geochemistrypi/data_mining/data/data_readiness.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ def select_column_name(data: pd.DataFrame) -> str:
The data set to be selected name.
"""
print(
"You need to choose the number of the column above as the output data identifier column.\n"
"You need to choose the number of the column above as [bold red]the output data identifier column[/bold red].\n"
"The data identifier column helps identify uniquely each row of data point in the output data.\n"
"** For example, when using built-in dataset, you can choose the column SAMPLE NAME’.**\n"
"Once finishing the whole run, in the output data file, all data point will have the value in the column SAMPLE NAME as its unique identifier.\n"
"For example, when using built-in dataset, you can choose the column [bold red]SAMPLE NAME[/bold red].\n"
"Once finishing the whole run, in the output data files, each row of data will have the value in the column [bold red]SAMPLE NAME[/bold red] as its unique identifier.\n"
"Enter the number of the output data identifier column."
)
while True:
Expand Down
6 changes: 6 additions & 0 deletions geochemistrypi/data_mining/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ class ModeOptionWithMissingValues(Enum):
REGRESSION = "Regression"
CLASSIFICATION = "Classification"
CLUSTERING = "Clustering"


class DataSource(Enum):
BUILT_IN = "Built-in"
DESKTOP = "Desktop"
ANY_PATH = "Any Path"
Loading

0 comments on commit cc0a511

Please sign in to comment.