Skip to content

Commit

Permalink
Cleaned submission worker and validation runner to fix python errors …
Browse files Browse the repository at this point in the history
…+ added ability to handle submission of zipped files or a zipped folder
  • Loading branch information
MitchellAV committed Mar 14, 2024
1 parent 4cbc8aa commit b6b299b
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 201 deletions.
160 changes: 137 additions & 23 deletions workers/pvinsight-validation-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
"""

import pandas as pd
import ast
import os
from importlib import import_module
import inspect
import time
from collections import ChainMap
from sklearn.metrics import mean_absolute_error
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
Expand Down Expand Up @@ -129,28 +127,124 @@ def convert_compressed_file_path_to_directory(compressed_file_path):
return path_components


def get_file_extension(path):
return path.split("/")[-1].split(".")[-1]
def extract_zip(zip_path: str, extract_path: str):
if not os.path.exists(extract_path):
os.makedirs(extract_path)

if zipfile.is_zipfile(zip_path):
with zipfile.ZipFile(zip_path, "r") as zip_ref:
logger.info("Extracting files from: " + zip_path)
file_names = zip_ref.namelist()
logger.info("File names:")
logger.info(file_names)
folders = [f for f in file_names if f.endswith("/")]
logger.info("Folders:")
logger.info(folders)

def decompress_file(path):
if get_file_extension(path) == "gz":
with tarfile.open(path, "r:gz") as tar:
tar.extractall(convert_compressed_file_path_to_directory(path))
if len(folders) == 0:
logger.info("Extracting all files...")
zip_ref.extractall(path=extract_path)
else:
# if all files have the same root any folder can be used to check since all will have the same root if true
do_all_files_have_same_root = all(
[f.startswith(folders[0]) for f in file_names]
)
logger.info(
"Do all files have the same root? "
+ str(do_all_files_have_same_root)
)

if do_all_files_have_same_root:
# extract all files within the folder with folder of the zipfile that has the same root
root_folder_name = folders[0]

logger.info("Extracting files...")
for file in file_names:
if file.endswith("/") and file != root_folder_name:
os.makedirs(
os.path.join(
extract_path, file.removeprefix(root_folder_name)
)
)
if not file.endswith("/"):
zip_ref.extract(file, path=extract_path)
os.rename(
os.path.join(extract_path, file),
os.path.join(
extract_path, file.removeprefix(root_folder_name)
),
)

# remove the root folder and all other folders
shutil.rmtree(os.path.join(extract_path, root_folder_name))

else:
logger.info("Extracting all files...")
zip_ref.extractall(path=extract_path)
elif tarfile.is_tarfile(zip_path):
with tarfile.open(zip_path, "r") as tar_ref:
logger.info("Extracting files from: " + zip_path)
file_names = tar_ref.getnames()
logger.info("File names:")
logger.info(file_names)
folders = [f for f in file_names if f.endswith("/")]
logger.info("Folders:")
logger.info(folders)

if len(folders) == 0:
logger.info("Extracting all files...")
tar_ref.extractall(path=extract_path, filter="data")
else:
# if all files have the same root any folder can be used to check since all will have the same root if true
do_all_files_have_same_root = all(
[f.startswith(folders[0]) for f in file_names]
)
logger.info(
"Do all files have the same root? "
+ str(do_all_files_have_same_root)
)

if do_all_files_have_same_root:
# extract all files within the folder with folder of the tarfile that has the same root
root_folder_name = folders[0]

logger.info("Extracting files...")
for file in file_names:
if file.endswith("/") and file != root_folder_name:
os.makedirs(
os.path.join(
extract_path, file.removeprefix(root_folder_name)
)
)
if not file.endswith("/"):
tar_ref.extract(file, path=extract_path, filter="data")
os.rename(
os.path.join(extract_path, file),
os.path.join(
extract_path, file.removeprefix(root_folder_name)
),
)

# remove the root folder and all other folders
shutil.rmtree(os.path.join(extract_path, root_folder_name))

else:
logger.info("Extracting all files...")
tar_ref.extractall(path=extract_path, filter="data")
else:
with zipfile.ZipFile(path, "r") as zip_ref:
zip_ref.extractall(convert_compressed_file_path_to_directory(path))
return convert_compressed_file_path_to_directory(path)
raise Exception("File is not a zip or tar file.")


def get_module_file_name(module_dir):
def get_module_file_name(module_dir: str):
for root, _, files in os.walk(module_dir, topdown=True):
for name in files:
if name.endswith(".py"):
return name.split("/")[-1]
else:
raise FileNotFoundError("No python file found in the module directory.")


def get_module_name(module_dir):
def get_module_name(module_dir: str):
return get_module_file_name(module_dir)[:-3]


Expand All @@ -175,7 +269,7 @@ def generate_scatter_plot(dataframe, x_axis, y_axis, title):
return plt


def run(module_to_import_s3_path, current_evaluation_dir=None):
def run(module_to_import_s3_path, current_evaluation_dir: str | None = None):
# If a path is provided, set the directories to that path, otherwise use default
if current_evaluation_dir is not None:
results_dir = (
Expand All @@ -188,14 +282,13 @@ def run(module_to_import_s3_path, current_evaluation_dir=None):
if not current_evaluation_dir.endswith("/")
else current_evaluation_dir + "data"
)
else:
results_dir = "./results"
data_dir = "./data"

if current_evaluation_dir is not None:
sys.path.append(
current_evaluation_dir
) # append current_evaluation_dir to sys.path
else:
results_dir = "./results"
data_dir = "./data"
current_evaluation_dir = os.getcwd()

# Ensure results directory exists
os.makedirs(results_dir, exist_ok=True)
Expand All @@ -204,16 +297,32 @@ def run(module_to_import_s3_path, current_evaluation_dir=None):
os.makedirs(data_dir, exist_ok=True)

# Load in the module that we're going to test on.

logger.info(f"module_to_import_s3_path: {module_to_import_s3_path}")
target_module_compressed_file_path = pull_from_s3(module_to_import_s3_path)
logger.info(
f"target_module_compressed_file_path: {target_module_compressed_file_path}"
)
target_module_path = convert_compressed_file_path_to_directory(
target_module_compressed_file_path
)
logger.info(
f"decompressing file {target_module_compressed_file_path} to {target_module_path}"
)

target_module_path = decompress_file(target_module_compressed_file_path)
extract_zip(target_module_compressed_file_path, target_module_path)
logger.info(
f"decompressed file {target_module_compressed_file_path} to {target_module_path}"
)

logger.info(f"target_module_path: {target_module_path}")
# get current directory, i.e. directory of runner.py file
new_dir = os.path.dirname(os.path.abspath(__file__))
logger.info(f"new_dir: {new_dir}")

file_name = get_module_file_name(target_module_path)
logger.info(f"file_name: {file_name}")
module_name = get_module_name(target_module_path)
logger.info(f"module_name: {module_name}")

# install submission dependency
try:
Expand Down Expand Up @@ -299,6 +408,7 @@ def run(module_to_import_s3_path, current_evaluation_dir=None):
# Get file_name, which will be pulled from database or S3 for
# each analysis
file_name = row["file_name"]
logger.info(f"file_name: {file_name}")
# Get associated system ID
system_id = row["system_id"]
# Get all of the associated metadata for the particular file based
Expand Down Expand Up @@ -326,12 +436,16 @@ def run(module_to_import_s3_path, current_evaluation_dir=None):
# Now that we've collected all of the information associated with the
# test, let's read in the file as a pandas dataframe (this data
# would most likely be stored in an S3 bucket)
time_series = pd.read_csv(
time_series: pd.DataFrame = pd.read_csv(
os.path.join(data_dir + "/file_data/", file_name),
index_col=0,
parse_dates=True,
)

time_series = time_series.asfreq(
str(row["data_sampling_frequency"]) + "min"
).squeeze()
time_series = time_series.asfreq(str(row["data_sampling_frequency"]) + "T")

# Filter the kwargs dictionary based on required function params
kwargs = dict(
(k, kwargs_dict[k]) for k in function_parameters if k in kwargs_dict
Expand Down
Loading

0 comments on commit b6b299b

Please sign in to comment.