diff --git a/compressions/2/pvanalytics-submission/requirements.txt b/compressions/2/pvanalytics-submission/requirements.txt deleted file mode 100644 index bd124a40..00000000 --- a/compressions/2/pvanalytics-submission/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pandas==2.0.3 -pvanalytics==0.2.0 -pvlib==0.10.2 -matplotlib==3.7.2 -ruptures==1.1.8 \ No newline at end of file diff --git a/compressions/2/pvanalytics-submission.zip b/compressions/2/pvanalytics_submission.zip similarity index 75% rename from compressions/2/pvanalytics-submission.zip rename to compressions/2/pvanalytics_submission.zip index 57501d3f..535b1744 100644 Binary files a/compressions/2/pvanalytics-submission.zip and b/compressions/2/pvanalytics_submission.zip differ diff --git a/compressions/2/pvanalytics_submission/requirements.txt b/compressions/2/pvanalytics_submission/requirements.txt new file mode 100644 index 00000000..519c3aeb --- /dev/null +++ b/compressions/2/pvanalytics_submission/requirements.txt @@ -0,0 +1,6 @@ +matplotlib==3.9.1 +numpy==1.26.4 +pandas==2.2.2 +pvanalytics==0.2.0 +pvlib==0.10.5 +ruptures==1.1.8 \ No newline at end of file diff --git a/compressions/2/pvanalytics-submission/pvanalytics-cpd-module.py b/compressions/2/pvanalytics_submission/submission_wrapper.py similarity index 50% rename from compressions/2/pvanalytics-submission/pvanalytics-cpd-module.py rename to compressions/2/pvanalytics_submission/submission_wrapper.py index f31ac56f..1652c28d 100644 --- a/compressions/2/pvanalytics-submission/pvanalytics-cpd-module.py +++ b/compressions/2/pvanalytics_submission/submission_wrapper.py @@ -1,151 +1,128 @@ -""" -PVAnalytics-CPD based module. This module will be uploaded by the user -and tested using the data sets accordingly. -""" - -import pandas as pd -from pvanalytics.quality.time import shifts_ruptures -from pvanalytics.features import daytime -from pvanalytics.quality.outliers import zscore -from pvanalytics.quality import gaps -import pvlib -import matplotlib.pyplot as plt -import pvanalytics -import ruptures - - -def detect_time_shifts( - time_series, latitude, longitude, data_sampling_frequency -): - """ - Master function for testing for time shifts in a series and returning - time-shifted periods - - """ - # Save the dates of the time series index for reindexing at the end - date_index = pd.Series(time_series.index.date).drop_duplicates() - # Data pre-processing: - # 1) Removal of frozen/stuck data - # 2) Removal of data periods with low data 'completeness' - # 3) Removal of negative data - # 4) Removal of outliers via Hampel + outlier filter - # Trim based on frozen data values - # REMOVE STALE DATA (that isn't during nighttime periods) - # Day/night mask - daytime_mask = daytime.power_or_irradiance(time_series) - # Stale data mask - stale_data_mask = gaps.stale_values_round( - time_series, window=3, decimals=2 - ) - stale_data_mask = stale_data_mask & daytime_mask - - # REMOVE NEGATIVE DATA - negative_mask = time_series < 0 - - # FIND ABNORMAL PERIODS - daily_min = time_series.resample("D").min() - series_min = 0.1 * time_series.mean() - erroneous_mask = daily_min >= series_min - erroneous_mask = erroneous_mask.reindex( - index=time_series.index, method="ffill", fill_value=False - ) - # FIND OUTLIERS (Z-SCORE FILTER) - zscore_outlier_mask = zscore(time_series, zmax=3.5, nan_policy="omit") - - # Filter the time series, taking out all of the issues - issue_mask = ( - (~stale_data_mask) - & (~negative_mask) - & (~erroneous_mask) - & (~zscore_outlier_mask) - ) - - time_series = time_series[issue_mask] - time_series = time_series.asfreq(str(data_sampling_frequency) + "T") - # Data completeness - # Trim the series based on daily completeness score - trim_series_mask = pvanalytics.quality.gaps.trim_incomplete( - time_series, - minimum_completeness=0.25, - freq=str(data_sampling_frequency) + "T", - ) - - time_series = time_series[trim_series_mask] - if len(time_series) > 0: - # Calculate a nighttime offset - # Mask daytime periods for the time series - daytime_mask = daytime.power_or_irradiance( - time_series, - freq=str(data_sampling_frequency) + "T", - low_value_threshold=0.005, - ) - # Get the modeled sunrise and sunset time series based on the system's - # latitude-longitude coordinates - modeled_sunrise_sunset_df = ( - pvlib.solarposition.sun_rise_set_transit_spa( - time_series.index, latitude, longitude - ) - ) - - # Calculate the midday point between sunrise and sunset for each day - # in the modeled irradiance series - modeled_midday_series = ( - modeled_sunrise_sunset_df["sunrise"] - + ( - modeled_sunrise_sunset_df["sunset"] - - modeled_sunrise_sunset_df["sunrise"] - ) - / 2 - ) - - # Generate the sunrise, sunset, and halfway pts for the data stream - sunrise_series = daytime.get_sunrise(daytime_mask) - sunset_series = daytime.get_sunset(daytime_mask) - midday_series = sunrise_series + ((sunset_series - sunrise_series) / 2) - # Convert the midday and modeled midday series to daily values - midday_series_daily, modeled_midday_series_daily = ( - midday_series.resample("D").mean(), - modeled_midday_series.resample("D").mean(), - ) - - # Set midday value series as minutes since midnight, from midday datetime - # values - midday_series_daily = ( - midday_series_daily.dt.hour * 60 - + midday_series_daily.dt.minute - + midday_series_daily.dt.second / 60 - ) - modeled_midday_series_daily = ( - modeled_midday_series_daily.dt.hour * 60 - + modeled_midday_series_daily.dt.minute - + modeled_midday_series_daily.dt.second / 60 - ) - - # Estimate the time shifts by comparing the modelled midday point to the - # measured midday point. - is_shifted, time_shift_series = shifts_ruptures( - midday_series_daily, - modeled_midday_series_daily, - period_min=15, - shift_min=15, - zscore_cutoff=0.75, - ) - time_shift_series = -1 * time_shift_series - - # Create a midday difference series between modeled and measured midday, to - # visualize time shifts. First, resample each time series to daily frequency, - # and compare the data stream's daily halfway point to the modeled halfway - # point - midday_diff_series = ( - modeled_midday_series.resample("D").mean() - - midday_series.resample("D").mean() - ).dt.total_seconds() / 60 - - midday_diff_series.plot() - time_shift_series.plot() - plt.show() - plt.close() - time_shift_series.index = time_shift_series.index.date - return time_shift_series - else: - return pd.Series(0, index=date_index) +""" +PVAnalytics-CPD based module. This module will be uploaded by the user +and tested using the data sets accordingly. +""" + +import pandas as pd +from pvanalytics.quality.time import shifts_ruptures +from pvanalytics.features import daytime +from pvanalytics.quality.outliers import zscore +from pvanalytics.quality import gaps +import pvlib +import matplotlib.pyplot as plt +import pvanalytics +import ruptures + +def detect_time_shifts(time_series, + latitude, longitude, + data_sampling_frequency): + """ + Master function for testing for time shifts in a series and returning + time-shifted periods + + """ + # Save the dates of the time series index for reindexing at the end + date_index = pd.Series(time_series.index.date).drop_duplicates() + # Data pre-processing: + # 1) Removal of frozen/stuck data + # 2) Removal of data periods with low data 'completeness' + # 3) Removal of negative data + # 4) Removal of outliers via Hampel + outlier filter + # Trim based on frozen data values + # REMOVE STALE DATA (that isn't during nighttime periods) + # Day/night mask + daytime_mask = daytime.power_or_irradiance(time_series) + # Stale data mask + stale_data_mask = gaps.stale_values_round(time_series, + window=3, + decimals=2) + stale_data_mask = stale_data_mask & daytime_mask + + # REMOVE NEGATIVE DATA + negative_mask = (time_series < 0) + + # FIND ABNORMAL PERIODS + daily_min = time_series.resample('D').min() + series_min = 0.1 * time_series.mean() + erroneous_mask = (daily_min >= series_min) + erroneous_mask = erroneous_mask.reindex(index=time_series.index, + method='ffill', + fill_value=False) + # FIND OUTLIERS (Z-SCORE FILTER) + zscore_outlier_mask = zscore(time_series, zmax=3.5, + nan_policy='omit') + + # Filter the time series, taking out all of the issues + issue_mask = ((~stale_data_mask) & (~negative_mask) & + (~erroneous_mask) & (~zscore_outlier_mask)) + + time_series = time_series[issue_mask] + time_series = time_series.asfreq(str(data_sampling_frequency) + 'T') + # Data completeness + # Trim the series based on daily completeness score + trim_series_mask = pvanalytics.quality.gaps.trim_incomplete(time_series, + minimum_completeness=.25, + freq=str(data_sampling_frequency) + 'T') + + time_series = time_series[trim_series_mask] + if len(time_series) > 0: + # Calculate a nighttime offset + # Mask daytime periods for the time series + daytime_mask = daytime.power_or_irradiance(time_series, + freq=str(data_sampling_frequency) + 'T', + low_value_threshold=.005) + # Get the modeled sunrise and sunset time series based on the system's + # latitude-longitude coordinates + modeled_sunrise_sunset_df = pvlib.solarposition.sun_rise_set_transit_spa( + time_series.index, latitude, longitude) + + # Calculate the midday point between sunrise and sunset for each day + # in the modeled irradiance series + modeled_midday_series = modeled_sunrise_sunset_df['sunrise'] + \ + (modeled_sunrise_sunset_df['sunset'] - + modeled_sunrise_sunset_df['sunrise']) / 2 + + #Generate the sunrise, sunset, and halfway pts for the data stream + sunrise_series = daytime.get_sunrise(daytime_mask) + sunset_series = daytime.get_sunset(daytime_mask) + midday_series = sunrise_series + ((sunset_series - sunrise_series)/2) + # Convert the midday and modeled midday series to daily values + midday_series_daily, modeled_midday_series_daily = ( + midday_series.resample('D').mean(), + modeled_midday_series.resample('D').mean()) + + # Set midday value series as minutes since midnight, from midday datetime + # values + midday_series_daily = (midday_series_daily.dt.hour * 60 + + midday_series_daily.dt.minute + + midday_series_daily.dt.second / 60) + modeled_midday_series_daily = \ + (modeled_midday_series_daily.dt.hour * 60 + + modeled_midday_series_daily.dt.minute + + modeled_midday_series_daily.dt.second / 60) + + # Estimate the time shifts by comparing the modelled midday point to the + # measured midday point. + is_shifted, time_shift_series = shifts_ruptures(midday_series_daily, + modeled_midday_series_daily, + period_min=15, + shift_min=15, + zscore_cutoff=.75) + time_shift_series = -1 * time_shift_series + + # Create a midday difference series between modeled and measured midday, to + # visualize time shifts. First, resample each time series to daily frequency, + # and compare the data stream's daily halfway point to the modeled halfway + # point + midday_diff_series = (modeled_midday_series.resample('D').mean() - + midday_series.resample('D').mean() + ).dt.total_seconds() / 60 + + midday_diff_series.plot() + time_shift_series.plot() + plt.show() + plt.close() + time_shift_series.index = time_shift_series.index.date + return time_shift_series + else: + return pd.Series(0, index=date_index) \ No newline at end of file diff --git a/valhub/Dockerfile b/valhub/Dockerfile index 4abf6638..27724bc5 100644 --- a/valhub/Dockerfile +++ b/valhub/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11-slim as base +FROM python:3.11-slim # docker build --progress=plain -t "hmm:Dockerfile" -f valhub/Dockerfile . WORKDIR /root @@ -13,16 +13,7 @@ COPY . . # Set Django admin username, password, and other environment variables -ARG admin_username -ARG admin_password -ARG admin_email -ARG djangosk - -ENV DJANGO_SUPERUSER_USERNAME=$admin_username -ENV DJANGO_SUPERUSER_PASSWORD=$admin_password -ENV DJANGO_SUPERUSER_EMAIL=$admin_email -ENV DJANGO_SECRET_KEY=${djangosk} -ENV PORT 8005 +ENV PORT=8005 RUN apt-get update -qq RUN apt-get install -y libpq-dev python3-psycopg2 @@ -34,6 +25,6 @@ WORKDIR /root/valhub RUN python3 -m pip install -r requirements.txt # RUN cd ./valhub -RUN python3 manage.py makemigrations -RUN python3 manage.py collectstatic --noinput +# RUN python3 manage.py makemigrations +# RUN python3 manage.py collectstatic --noinput CMD ["/usr/bin/supervisord", "-c", "/root/valhub/supervisord.conf"] diff --git a/valhub/supervisord.conf b/valhub/supervisord.conf index 078fbfcc..29ded7d1 100644 --- a/valhub/supervisord.conf +++ b/valhub/supervisord.conf @@ -1,12 +1,26 @@ [supervisord] nodaemon = true +[program:valhub_makemigrations] +directory=/root/valhub +command=python3 /root/valhub/manage.py makemigrations +autostart=true +autorestart=false + +[program:valhub_collectstatic] +directory=/root/valhub +command=python3 /root/valhub/manage.py collectstatic --noinput +autostart=true +autorestart=false +depends_on=valhub_makemigrations + [program:valhub_webserver] directory=/root/valhub command=python3 /root/valhub/manage.py runserver 0.0.0.0:8005 stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 redirect_stderr=true +depends_on=valhub_collectstatic autostart=true [program:valhub_migrate] diff --git a/workers/src/docker/Dockerfile b/workers/src/docker/Dockerfile index a2ea8946..450690fb 100644 --- a/workers/src/docker/Dockerfile +++ b/workers/src/docker/Dockerfile @@ -12,9 +12,10 @@ RUN apt-get update && apt-get install -y \ libhdf5-dev \ python3-dev \ cmake \ - pkg-config + pkg-config \ + unzip -COPY unzip.py . +# COPY unzip.py . COPY requirements.txt . # Install the Python dependencies for the submission wrapper @@ -27,12 +28,14 @@ COPY $zip_file . # Unzip the submission package -RUN python -m unzip $zip_file submission +RUN unzip -j $zip_file -d submission WORKDIR /app/submission # Install the Python dependencies +RUN pip install --upgrade pip RUN pip install -r requirements.txt +RUN pip install typing-extensions # Set the working directory in the container diff --git a/workers/src/docker/submission/requirements.txt b/workers/src/docker/submission/requirements.txt deleted file mode 100644 index ede8c739..00000000 --- a/workers/src/docker/submission/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -numpy -solar-data-tools diff --git a/workers/src/docker/submission/submission_wrapper.py b/workers/src/docker/submission/submission_wrapper.py deleted file mode 100644 index 78112f51..00000000 --- a/workers/src/docker/submission/submission_wrapper.py +++ /dev/null @@ -1,10 +0,0 @@ -import numpy as np -from solardatatools import DataHandler - - -def detect_time_shifts( - time_series, latitude=None, longitude=None, data_sampling_frequency=None -): - dh = DataHandler(time_series.to_frame()) - dh.run_pipeline(fix_shifts=True, verbose=False, round_shifts_to_hour=False) - return dh.time_shift_analysis.correction_estimate diff --git a/workers/src/docker/submission_wrapper.py b/workers/src/docker/submission_wrapper.py index 3a6ce613..cd37f237 100644 --- a/workers/src/docker/submission_wrapper.py +++ b/workers/src/docker/submission_wrapper.py @@ -1,23 +1,27 @@ from importlib import import_module import inspect import pathlib -from re import sub import sys +import json import pandas as pd import numpy as np from time import perf_counter -from functools import wraps -from typing import Any, ParamSpec, Union, Tuple, TypeVar, Callable, cast +from typing import Any, Union, Tuple, TypeVar, Callable, cast, Optional from logging import Logger import logging +if sys.version_info < (3, 10): + from typing_extensions import ParamSpec +else: + from typing import ParamSpec + T = TypeVar("T") P = ParamSpec("P") def logger_if_able( - message: object, logger: Logger | None = None, level: str = "INFO" + message: object, logger: Optional[Logger] = None, level: str = "INFO" ): if logger is not None: levels_dict = { @@ -116,7 +120,7 @@ def import_submission_function(submission_file_name: str, function_name: str): try: submission_function: Callable[ - [pd.Series, Any], np.ndarray | tuple[float, float] + [pd.Series, Any], Union[np.ndarray, tuple[float, float]] ] = getattr(submission_module, function_name) function_parameters = list( inspect.signature(submission_function).parameters.keys() @@ -141,6 +145,8 @@ def main(): function_name = args[1] data_file_name = args[2] + print(args) + print("Getting submission function...") try: @@ -170,17 +176,30 @@ def main(): print(f"Execution time: {execution_time}") - # save results to csv file + # Save information about the submission function for later use with worker + submission_info_file = pathlib.Path( + f"{results_dir}/submission_function_info.json" + ) + if not submission_info_file.exists(): + submission_function_info = { + "data_file_name": data_file_name, + "function_name": function_name, + "function_parameters": function_parameters, + } + + with open(f"{results_dir}/submission_function_info.json", "w") as fp: + json.dump(submission_function_info, fp) - print(f"Saving results to {results_dir}/{data_file_name}") + # save results to csv file + results_file_path = f"{results_dir}/files/{data_file_name}" + print(f"Saving results to {results_file_path}") if isinstance(results, tuple): results_df = pd.DataFrame([results]) else: results_df = pd.DataFrame(results) print(f"Results: {results_df}") - results_file = f"{results_dir}/{data_file_name}" - results_df.to_csv(results_file, header=True) + results_df.to_csv(results_file_path, header=True) columns = ["file_name", "execution_time"] diff --git a/workers/src/logging_config.json b/workers/src/logging_config.json index c4949fae..93bbcfbd 100644 --- a/workers/src/logging_config.json +++ b/workers/src/logging_config.json @@ -33,13 +33,13 @@ }, "file": { "class": "logging.handlers.RotatingFileHandler", - "level": "DEBUG", + "level": "INFO", "formatter": "detailed", "filename": "logs/submission.log" }, "json_file": { "class": "logging.handlers.RotatingFileHandler", - "level": "DEBUG", + "level": "INFO", "formatter": "json", "filename": "logs/submission.log.jsonl" }, diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index 91837a0e..0b1114a7 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -16,7 +16,16 @@ This section will be dependent on the type of analysis being run. """ -from typing import Any, Callable, Sequence, Tuple, TypeVar, cast, ParamSpec +from typing import ( + Any, + Callable, + Sequence, + Tuple, + TypeVar, + TypedDict, + cast, + ParamSpec, +) import pandas as pd import os from importlib import import_module @@ -345,6 +354,7 @@ def run( # noqa: C901 # Ensure results directory exists os.makedirs(results_dir, exist_ok=True) + os.makedirs(os.path.join(results_dir, "files"), exist_ok=True) # Ensure results directory exists os.makedirs(data_dir, exist_ok=True) @@ -559,6 +569,28 @@ def run( # noqa: C901 # raise Exception("Finished Successfully") + # Get information about the submission function + + class SubmissionFunctionInfo(TypedDict): + data_file_name: str + function_name: str + function_parameters: list[str] + + try: + with open(f"{results_dir}/submission_function_info.json", "r") as fp: + submission_function_info: SubmissionFunctionInfo = json.load(fp) + except FileNotFoundError as e: + logger.error("submission_function_info.json not found") + logger.exception(e) + + # logger.info(f"update submission status to {FAILED}") + # update_submission_status(submission_id, FAILED) + # TODO: add error code + error_code = 500 + raise RunnerException( + *get_error_by_code(error_code, runner_error_codes, logger) + ) + # Convert the results to a pandas dataframe and perform all of the # post-processing in the script results_df = pd.DataFrame(results_list) @@ -576,9 +608,11 @@ def run( # noqa: C901 # Get the mean and median run times public_metrics_dict["mean_runtime"] = results_df["runtime"].mean() public_metrics_dict["median_runtime"] = results_df["runtime"].median() - public_metrics_dict["function_parameters"] = [ - "time_series", - *config_data["allowable_kwargs"], + logger.info( + f'function_parameters: {submission_function_info["function_parameters"]}' + ) + public_metrics_dict["function_parameters"] = submission_function_info[ + "function_parameters" ] public_metrics_dict["data_requirements"] = results_df[ "data_requirements" @@ -721,15 +755,15 @@ def m_median(df: pd.DataFrame, column: str): logger.info(f"number_of_errors: {number_of_errors}") - success_rate = ( - (total_number_of_files - number_of_errors) / total_number_of_files - ) * 100 - logger.info(f"success_rate: {success_rate}%") - logger.info( - f"{total_number_of_files - number_of_errors} out of {total_number_of_files} files processed successfully" - ) + # success_rate = ( + # (total_number_of_files - number_of_errors) / total_number_of_files + # ) * 100 + # logger.info(f"success_rate: {success_rate}%") + # logger.info( + # f"{total_number_of_files - number_of_errors} out of {total_number_of_files} files processed successfully" + # ) - public_metrics_dict["success_rate"] = success_rate + # public_metrics_dict["success_rate"] = success_rate return public_metrics_dict @@ -988,8 +1022,8 @@ def loop_over_files_and_generate_results( logger=logger, ) - errors = [error for error, error_code in test_errors] - number_of_errors += sum(errors) + is_errors_list = [error for error, error_code in test_errors] + number_of_errors += sum(is_errors_list) if number_of_errors == NUM_FILES_TO_TEST: logger.error( @@ -1029,8 +1063,9 @@ def loop_over_files_and_generate_results( raise RunnerException( *get_error_by_code(500, runner_error_codes, logger) ) - errors = [error for error, error_code in rest_errors] - number_of_errors += sum(errors) + is_errors_list = [error for error, error_code in rest_errors] + + number_of_errors += sum(is_errors_list) # test_errors = [result for result, _ in test_errors if result is not None] # rest_errors = [result for result, _ in rest_errors if result is not None] @@ -1049,6 +1084,8 @@ def loop_over_results_and_generate_metrics( all_results: list[dict[str, Any]] = [] number_of_errors = 0 + result_files_dir = os.path.join(results_dir, "files") + file_metadata_df: pd.DataFrame = pd.read_csv( os.path.join(data_dir, "metadata", "file_metadata.csv") ) @@ -1096,7 +1133,7 @@ def loop_over_results_and_generate_metrics( file_name, config_data, system_metadata_dict, - results_dir, + result_files_dir, data_dir, submission_runtime, function_parameters, diff --git a/workers/src/utility.py b/workers/src/utility.py index 6e0be477..2fe366cd 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -998,6 +998,27 @@ def submission_task( error_code = 500 logger_if_able(f"Error: {e}", None, "ERROR") + # Finished a file for submission + # TODO: Add timestamp to API + # +1 file completed to API + # json = { + # "timestamp": datetime.now().isoformat(), + # "file": submission_file_name, + # } + + # TODO: Send to API + # Create an error report for all non breaking errors and send to API + + file_error_report = { + "error_code": error_code, + "error_type": SubmissionException, + "error_message": error, + "file_name": submission_file_name, + } + + # # Send error report to API + # send_error_report_to_API(json_errors, error_rate) + return error, error_code @@ -1058,6 +1079,7 @@ def create_docker_image( path=dir_path, tag=tag, rm=True, + nocache=True, dockerfile="Dockerfile", buildargs={ "zip_file": f"{submission_file_name}", @@ -1108,8 +1130,8 @@ def __exit__(self, exc_type, exc_value, exc_traceback): def initialize_docker_client(): base_url = os.environ.get("DOCKER_HOST", None) - if not base_url: - logger_if_able("Docker host not set", None, "WARNING") + # if not base_url: + # logger_if_able("Docker host not set", None, "WARNING") # cert_path = os.environ.get("DOCKER_CERT_PATH") # if not cert_path: