diff --git a/valhub/base/errorcodes.json b/valhub/base/errorcodes.json index 16f2ac12..f942646f 100644 --- a/valhub/base/errorcodes.json +++ b/valhub/base/errorcodes.json @@ -6,7 +6,9 @@ "4": "No system metadata returned from API", "5": "Configuration file not found in current evaluation directory", "6": "Required function name not found within submission python file", - "7": "Failure cutoff met for submission evaluation and execution has been terminated" + "7": "Failure cutoff met for submission evaluation and execution has been terminated", + "8": "Submission result length does not match ground truth length", + "500": "Internal server error" }, "wr": { "1": "Error uploading file to s3", @@ -19,9 +21,13 @@ "8": "No file metadata found in API for analysis ID", "9": "Not all ground truth data files found in s3 bucket for analysis", "10": "Not all analytical data files found in s3 bucket for analysis", - "11": "Runner module does not have a 'run' function" + "11": "Runner module does not have a 'run' function", + "12": "Error posting Error Report to API", + "13": "API did not return a valid response", + "500": "Internal server error" }, "sb": { - "1": "" + "1": "Submission function exceeded maximum execution time", + "500": "Internal server error" } } \ No newline at end of file diff --git a/workers/src/errorcodes.json b/workers/src/errorcodes.json new file mode 100644 index 00000000..f942646f --- /dev/null +++ b/workers/src/errorcodes.json @@ -0,0 +1,33 @@ +{ + "op": { + "1": "Submission file is not a valid zipped file", + "2": "Error installing Python submission dependencies", + "3": "Failed to get system metadata information", + "4": "No system metadata returned from API", + "5": "Configuration file not found in current evaluation directory", + "6": "Required function name not found within submission python file", + "7": "Failure cutoff met for submission evaluation and execution has been terminated", + "8": "Submission result length does not match ground truth length", + "500": "Internal server error" + }, + "wr": { + "1": "Error uploading file to s3", + "2": "Error downloading file from s3", + "3": "No files found in s3 bucket for analysis", + "4": "Error updating submission result to API", + "5": "Error updating submission status to API", + "6": "Required evaluation files not found in s3 bucket", + "7": "File metadata for file ID not found in API", + "8": "No file metadata found in API for analysis ID", + "9": "Not all ground truth data files found in s3 bucket for analysis", + "10": "Not all analytical data files found in s3 bucket for analysis", + "11": "Runner module does not have a 'run' function", + "12": "Error posting Error Report to API", + "13": "API did not return a valid response", + "500": "Internal server error" + }, + "sb": { + "1": "Submission function exceeded maximum execution time", + "500": "Internal server error" + } +} \ No newline at end of file diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index 8547c30a..979c22af 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -16,9 +16,6 @@ This section will be dependent on the type of analysis being run. """ -from email import errors -from multiprocessing.spawn import prepare -import re from typing import Any, Callable, cast import pandas as pd import os @@ -39,9 +36,14 @@ import boto3 from logger import setup_logging from utility import ( + RUNNER_ERROR_PREFIX, RunnerException, + SubmissionException, dask_multiprocess, + get_error_by_code, + get_error_codes_dict, pull_from_s3, + timeout, timing, is_local, ) @@ -60,6 +62,15 @@ API_BASE_URL = "api:8005" if IS_LOCAL else "api.pv-validation-hub.org" +FILE_DIR = os.path.dirname(os.path.abspath(__file__)) + + +runner_error_codes = get_error_codes_dict( + FILE_DIR, RUNNER_ERROR_PREFIX, logger +) + +SUBMISSION_TIMEOUT = 30 * 60 # seconds + def push_to_s3( local_file_path, @@ -365,8 +376,9 @@ def run( # noqa: C901 logger.error("error installing submission dependencies:", e) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 2 raise RunnerException( - 2, "error installing python submission dependencies" + *get_error_by_code(error_code, runner_error_codes, logger) ) shutil.move( @@ -389,7 +401,10 @@ def run( # noqa: C901 ) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise RunnerException(3, "Failed to get system metadata information") + error_code = 3 + raise RunnerException( + *get_error_by_code(error_code, runner_error_codes, logger) + ) # Convert the responses to DataFrames @@ -402,7 +417,10 @@ def run( # noqa: C901 logger.error("System metadata is empty") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise RunnerException(4, "No system metadata returned from API") + error_code = 4 + raise RunnerException( + *get_error_by_code(error_code, runner_error_codes, logger) + ) # Read in the configuration JSON for the particular run with open(os.path.join(current_evaluation_dir, "config.json")) as f: @@ -410,8 +428,9 @@ def run( # noqa: C901 logger.error("config.json not found") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 5 raise RunnerException( - 5, "config.json not found in current evaluation directory" + *get_error_by_code(error_code, runner_error_codes, logger) ) config_data: dict[str, Any] = json.load(f) @@ -435,8 +454,9 @@ def run( # noqa: C901 ) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 6 raise RunnerException( - 6, f"function {function_name} not found in module {module_name}" + *get_error_by_code(error_code, runner_error_codes, logger) ) total_number_of_files = len(file_metadata_df) @@ -614,18 +634,6 @@ def prepare_function_args_for_parallel_processing( function_name: str, performance_metrics: list[str], ): - # logger.debug( - # f"index: {index}, FAILURE_CUTOFF: {FAILURE_CUTOFF}, number_of_errors: {number_of_errors}" - # ) - # if index <= FAILURE_CUTOFF: - # if number_of_errors == FAILURE_CUTOFF: - # raise RunnerException( - # 7, - # f"Too many errors ({number_of_errors}) occurred in the first {FAILURE_CUTOFF} files. Exiting.", - # current_error_rate(number_of_errors, index), - # ) - - # logger.info(f"processing file {index + 1} of {total_number_of_files}") function_args_list: list[tuple] = [] @@ -732,22 +740,40 @@ def loop_over_files_and_generate_results( logger.error( f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting." ) + error_code = 7 raise RunnerException( - 7, - f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting.", + *get_error_by_code(error_code, runner_error_codes, logger) ) # Test the rest of the files logger.info(f"Testing the rest of the files...") - rest_results = dask_multiprocess( - run_submission_and_generate_performance_metrics, - rest_func_argument_list, - # n_workers=4, - threads_per_worker=1, - # memory_limit="16GiB", - logger=logger, - ) + rest_results = [] + try: + rest_results = dask_multiprocess( + run_submission_and_generate_performance_metrics, + rest_func_argument_list, + # n_workers=4, + threads_per_worker=1, + # memory_limit="16GiB", + logger=logger, + ) + except SubmissionException as e: + logger.error(f"Submission error: {e}") + raise e + except RunnerException as e: + logger.error(f"Runner error: {e}") + raise e + except Exception as e: + if e.args: + if len(e.args) == 2: + logger.error(f"Submission error: {e}") + + raise RunnerException(*e.args) + logger.error(f"Submission error: {e}") + raise RunnerException( + *get_error_by_code(500, runner_error_codes, logger) + ) errors = [error for _, error in rest_results] number_of_errors += sum(errors) @@ -790,10 +816,10 @@ def generate_performance_metrics_for_submission( logger.error( f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}" ) + error_code = 8 raise RunnerException( - 100, - f"submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}", + *get_error_by_code(error_code, runner_error_codes, logger) ) # Convert the data outputs to a dictionary identical to the @@ -832,6 +858,7 @@ def generate_performance_metrics_for_submission( return results_dictionary +@timeout(SUBMISSION_TIMEOUT) def run_submission_and_generate_performance_metrics( file_name: str, data_dir: str, diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 5bfc1e56..c1f03552 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -1,6 +1,5 @@ from importlib import import_module from typing import Any, Callable, Optional -from matplotlib.pylab import f import requests import sys import os @@ -22,6 +21,8 @@ RunnerException, SubmissionException, WorkerException, + get_error_by_code, + get_error_codes_dict, pull_from_s3, timing, is_local, @@ -52,8 +53,10 @@ def update_submission_status( api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/change_submission_status/{submission_id}" r = requests.put(api_route, data={"status": new_status}) if not r.ok: + logger.error(f"Error updating submission status to {new_status}") + error_code = 5 raise WorkerException( - 5, f"Error updating submission status to {new_status}" + *get_error_by_code(error_code, worker_error_codes, logger), ) data: dict[str, Any] = r.json() return data @@ -87,8 +90,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): r = requests.put(s3_file_full_path, data=file_content) logger.info(f"Received S3 emulator response: {r.status_code}") if not r.ok: + logger.error(f"S3 emulator error: {r.content}") + error_code = 1 raise WorkerException( - 1, f"Error uploading file to s3 emulator: {r.content}" + *get_error_by_code(error_code, worker_error_codes, logger), ) return {"status": "success"} else: @@ -99,7 +104,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): logger.error(f"Error: {e}") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise WorkerException(1, f"Error uploading file to s3") + error_code = 1 + raise WorkerException( + *get_error_by_code(error_code, worker_error_codes, logger) + ) def list_s3_bucket(s3_dir: str): @@ -151,8 +159,10 @@ def update_submission_result( api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/update_submission_result/{submission_id}" r = requests.put(api_route, json=result_json, headers=headers) if not r.ok: + logger.error(f"Error updating submission result to Django API") + error_code = 4 raise WorkerException( - 4, "Error updating submission result to Django API" + *get_error_by_code(error_code, worker_error_codes, logger), ) data: dict[str, Any] = r.json() return data @@ -231,9 +241,12 @@ def extract_analysis_data( # noqa: C901 ) response = requests.get(fmd_url) if not response.ok: + error_code = 7 + logger.error( + f"File metadata for file id {file_id} not found in Django API" + ) raise requests.exceptions.HTTPError( - 7, - f"File metadata for file id {file_id} not found in Django API", + *get_error_by_code(error_code, worker_error_codes, logger), ) file_metadata_list.append(response.json()) @@ -241,9 +254,12 @@ def extract_analysis_data( # noqa: C901 file_metadata_df = pd.DataFrame(file_metadata_list) if file_metadata_df.empty: + logger.error( + f"File metadata DataFrame is empty for analysis {analysis_id}" + ) + error_code = 8 raise WorkerException( - 8, - f"No file metadata found in Django API for analysis {analysis_id}", + *get_error_by_code(error_code, worker_error_codes, logger), ) files_for_analysis: list[str] = file_metadata_df["file_name"].tolist() @@ -329,6 +345,12 @@ def load_analysis( os.path.join(current_evaluation_dir, "pvinsight-validation-runner.py"), ) + # Copy the error codes file into the current evaluation directory + shutil.copy( + os.path.join("/root/worker/src", "errorcodes.json"), + os.path.join(current_evaluation_dir, "errorcodes.json"), + ) + # import analysis runner as a module sys.path.insert(0, current_evaluation_dir) runner_module_name = "pvinsight-validation-runner" @@ -571,12 +593,20 @@ def post_error_to_api( logger.info(f"Received response: {r.text}") if not r.ok: - raise WorkerException(1, "Error posting error report to Django API") + logger.error("Error posting error report to API") + cur_error_code = 12 + raise WorkerException( + *get_error_by_code(cur_error_code, worker_error_codes, logger) + ) try: data: dict[str, Any] = r.json() except json.JSONDecodeError: - raise WorkerException(1, "Django API did not return a JSON response") + logger.error("Django API did not return a JSON response") + cur_error_code = 13 + raise WorkerException( + *get_error_by_code(cur_error_code, worker_error_codes, logger) + ) return data @@ -773,6 +803,11 @@ def main(): logger.info(f"CURRENT_EVALUATION_DIR: {CURRENT_EVALUATION_DIR}") # Set to folder where the evaluation scripts are stored logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") + + worker_error_codes = get_error_codes_dict( + FILE_DIR, WORKER_ERROR_PREFIX, logger + ) + _, execution_time = main() logger.info( f"Shutting down Submission Worker. Runtime: {execution_time:.3f} seconds." diff --git a/workers/src/utility.py b/workers/src/utility.py index 5ca9f283..34a072ce 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -1,9 +1,14 @@ -import sys +import json from dask.delayed import delayed from dask.distributed import Client from dask import config -from concurrent.futures import ProcessPoolExecutor, as_completed, thread +from concurrent.futures import ( + ProcessPoolExecutor, + ThreadPoolExecutor, + as_completed, + thread, +) from functools import wraps from logging import Logger from time import perf_counter, sleep, time @@ -13,6 +18,7 @@ import boto3 import botocore.exceptions from distributed import LocalCluster +from matplotlib.pylab import f import psutil import requests import math @@ -22,8 +28,74 @@ RUNNER_ERROR_PREFIX = "op" SUBMISSION_ERROR_PREFIX = "sb" + T = TypeVar("T") +FILE_DIR = os.path.dirname(os.path.abspath(__file__)) + + +def logger_if_able( + message: str, logger: Logger | None = None, level: str = "INFO" +): + if logger is not None: + levels_dict = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, + } + + level = level.upper() + + if level not in levels_dict: + raise Exception(f"Invalid log level: {level}") + + log_level = levels_dict[level] + + logger.log(log_level, message) + else: + print(message) + + +def get_error_codes_dict( + dir: str, prefix: str, logger: Logger | None +) -> dict[str, str]: + try: + with open( + os.path.join(dir, "errorcodes.json"), "r" + ) as error_codes_file: + error_codes = json.load(error_codes_file) + if prefix not in error_codes: + logger_if_able( + f"Error prefix {prefix} not found in error codes file", + logger, + "ERROR", + ) + + raise KeyError( + f"Error prefix {prefix} not found in error codes file" + ) + return error_codes[prefix] + except FileNotFoundError: + logger_if_able("Error codes file not found", logger, "ERROR") + raise FileNotFoundError("Error codes file not found") + except KeyError: + logger_if_able( + f"Error prefix {prefix} not found in error codes file", + logger, + "ERROR", + ) + raise KeyError(f"Error prefix {prefix} not found in error codes file") + except Exception as e: + logger_if_able(f"Error loading error codes: {e}", logger, "ERROR") + raise e + + +submission_error_codes = get_error_codes_dict( + FILE_DIR, SUBMISSION_ERROR_PREFIX, None +) + def timing(verbose: bool = True, logger: Union[Logger, None] = None): @wraps(timing) @@ -38,10 +110,7 @@ def wrapper(*args, **kwargs) -> Tuple[T, float]: msg = ( f"{func.__name__} took {execution_time:.3f} seconds to run" ) - if logger: - logger.info(msg) - else: - print(msg) + logger_if_able(msg, logger) return result, execution_time return wrapper @@ -64,31 +133,25 @@ def multiprocess( return results -def logger_if_able( - message: str, logger: Logger | None = None, level: str = "INFO" -): - if logger is not None: - levels_dict = { - "DEBUG": logging.DEBUG, - "INFO": logging.INFO, - "WARNING": logging.WARNING, - "ERROR": logging.ERROR, - "CRITICAL": logging.CRITICAL, - } - - level = level.upper() - - if level not in levels_dict: - raise Exception(f"Invalid log level: {level}") - - log_level = levels_dict[level] - - logger.log(log_level, message) - else: - print(message) +def timeout(seconds: int, logger: Union[Logger, None] = None): + def decorator(func: Callable[..., T]): + @wraps(func) + def wrapper(*args, **kwargs) -> T: + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(func, *args, **kwargs) + try: + return future.result(timeout=seconds) + except TimeoutError: + error_code = 1 + raise SubmissionException( + *get_error_by_code( + error_code, submission_error_codes, logger + ) + ) + return wrapper -MEMORY_PER_RUN = 7.0 # in GB + return decorator def set_workers_and_threads( @@ -115,12 +178,13 @@ def handle_exceeded_resources( total_threads: int = 1 if cpu_count is None: - raise Exception("Could not determine number of CPUs.") + raise Exception(10, "Could not determine number of CPUs.") if n_workers is not None and threads_per_worker is not None: if n_workers * threads_per_worker > cpu_count: raise Exception( - f"workers and threads exceed local resources, {cpu_count} cores present" + 9, + f"workers and threads exceed local resources, {cpu_count} cores present", ) handle_exceeded_resources( n_workers, threads_per_worker, memory_per_run, sys_memory @@ -164,7 +228,7 @@ def handle_exceeded_resources( if n_workers is None and threads_per_worker is None: thread_worker_total = math.floor(sys_memory / memory_per_run) - if thread_worker_total < 2: + if thread_worker_total < 1: logger_if_able( "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker", logger, @@ -198,13 +262,24 @@ def handle_exceeded_resources( "ERROR", ) raise Exception( - "Could not determine number of workers and threads" + 9, "Could not determine number of workers and threads" ) handle_exceeded_resources( n_workers, threads_per_worker, memory_per_run, sys_memory ) total_workers, total_threads = n_workers, threads_per_worker + + while total_workers * total_threads > cpu_count: + if total_workers > 1: + total_workers -= 1 + elif total_threads > 1: + total_threads -= 1 + else: + raise Exception( + 9, "Could not determine number of workers and threads" + ) + return total_workers, total_threads @@ -217,6 +292,9 @@ def dask_multiprocess( logger: Logger | None = None, **kwargs, ) -> list[T]: + + MEMORY_PER_RUN = 7.0 # in GB + memory_per_run = memory_per_run or MEMORY_PER_RUN cpu_count = os.cpu_count() @@ -364,6 +442,23 @@ def pull_from_s3( return target_file_path +def get_error_by_code( + error_code: int, error_codes_dict: dict[str, str], logger: Logger | None +) -> tuple[int, str]: + if error_codes_dict is None: + logger_if_able("Error codes dictionary is None", logger, "ERROR") + raise ValueError("Error codes dictionary is None") + error_code_str = str(error_code) + if error_code_str not in error_codes_dict: + logger_if_able( + f"Error code {error_code} not found in error codes", + logger, + "ERROR", + ) + raise KeyError(f"Error code {error_code} not found in error codes") + return error_code, error_codes_dict[error_code_str] + + if __name__ == "__main__": def expensive_function(x):