Skip to content

Commit

Permalink
Update errorcodes.json with new error messages and added timeout to s…
Browse files Browse the repository at this point in the history
…ubmission function
  • Loading branch information
MitchellAV committed May 22, 2024
1 parent 56c7507 commit 4f9e957
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 79 deletions.
12 changes: 9 additions & 3 deletions valhub/base/errorcodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"4": "No system metadata returned from API",
"5": "Configuration file not found in current evaluation directory",
"6": "Required function name not found within submission python file",
"7": "Failure cutoff met for submission evaluation and execution has been terminated"
"7": "Failure cutoff met for submission evaluation and execution has been terminated",
"8": "Submission result length does not match ground truth length",
"500": "Internal server error"
},
"wr": {
"1": "Error uploading file to s3",
Expand All @@ -19,9 +21,13 @@
"8": "No file metadata found in API for analysis ID",
"9": "Not all ground truth data files found in s3 bucket for analysis",
"10": "Not all analytical data files found in s3 bucket for analysis",
"11": "Runner module does not have a 'run' function"
"11": "Runner module does not have a 'run' function",
"12": "Error posting Error Report to API",
"13": "API did not return a valid response",
"500": "Internal server error"
},
"sb": {
"1": ""
"1": "Submission function exceeded maximum execution time",
"500": "Internal server error"
}
}
33 changes: 33 additions & 0 deletions workers/src/errorcodes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"op": {
"1": "Submission file is not a valid zipped file",
"2": "Error installing Python submission dependencies",
"3": "Failed to get system metadata information",
"4": "No system metadata returned from API",
"5": "Configuration file not found in current evaluation directory",
"6": "Required function name not found within submission python file",
"7": "Failure cutoff met for submission evaluation and execution has been terminated",
"8": "Submission result length does not match ground truth length",
"500": "Internal server error"
},
"wr": {
"1": "Error uploading file to s3",
"2": "Error downloading file from s3",
"3": "No files found in s3 bucket for analysis",
"4": "Error updating submission result to API",
"5": "Error updating submission status to API",
"6": "Required evaluation files not found in s3 bucket",
"7": "File metadata for file ID not found in API",
"8": "No file metadata found in API for analysis ID",
"9": "Not all ground truth data files found in s3 bucket for analysis",
"10": "Not all analytical data files found in s3 bucket for analysis",
"11": "Runner module does not have a 'run' function",
"12": "Error posting Error Report to API",
"13": "API did not return a valid response",
"500": "Internal server error"
},
"sb": {
"1": "Submission function exceeded maximum execution time",
"500": "Internal server error"
}
}
91 changes: 59 additions & 32 deletions workers/src/pvinsight-validation-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
This section will be dependent on the type of analysis being run.
"""

from email import errors
from multiprocessing.spawn import prepare
import re
from typing import Any, Callable, cast
import pandas as pd
import os
Expand All @@ -39,9 +36,14 @@
import boto3
from logger import setup_logging
from utility import (
RUNNER_ERROR_PREFIX,
RunnerException,
SubmissionException,
dask_multiprocess,
get_error_by_code,
get_error_codes_dict,
pull_from_s3,
timeout,
timing,
is_local,
)
Expand All @@ -60,6 +62,15 @@

API_BASE_URL = "api:8005" if IS_LOCAL else "api.pv-validation-hub.org"

FILE_DIR = os.path.dirname(os.path.abspath(__file__))


runner_error_codes = get_error_codes_dict(
FILE_DIR, RUNNER_ERROR_PREFIX, logger
)

SUBMISSION_TIMEOUT = 30 * 60 # seconds


def push_to_s3(
local_file_path,
Expand Down Expand Up @@ -365,8 +376,9 @@ def run( # noqa: C901
logger.error("error installing submission dependencies:", e)
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
error_code = 2
raise RunnerException(
2, "error installing python submission dependencies"
*get_error_by_code(error_code, runner_error_codes, logger)
)

shutil.move(
Expand All @@ -389,7 +401,10 @@ def run( # noqa: C901
)
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
raise RunnerException(3, "Failed to get system metadata information")
error_code = 3
raise RunnerException(
*get_error_by_code(error_code, runner_error_codes, logger)
)

# Convert the responses to DataFrames

Expand All @@ -402,16 +417,20 @@ def run( # noqa: C901
logger.error("System metadata is empty")
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
raise RunnerException(4, "No system metadata returned from API")
error_code = 4
raise RunnerException(
*get_error_by_code(error_code, runner_error_codes, logger)
)

# Read in the configuration JSON for the particular run
with open(os.path.join(current_evaluation_dir, "config.json")) as f:
if not f:
logger.error("config.json not found")
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
error_code = 5
raise RunnerException(
5, "config.json not found in current evaluation directory"
*get_error_by_code(error_code, runner_error_codes, logger)
)
config_data: dict[str, Any] = json.load(f)

Expand All @@ -435,8 +454,9 @@ def run( # noqa: C901
)
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
error_code = 6
raise RunnerException(
6, f"function {function_name} not found in module {module_name}"
*get_error_by_code(error_code, runner_error_codes, logger)
)

total_number_of_files = len(file_metadata_df)
Expand Down Expand Up @@ -614,18 +634,6 @@ def prepare_function_args_for_parallel_processing(
function_name: str,
performance_metrics: list[str],
):
# logger.debug(
# f"index: {index}, FAILURE_CUTOFF: {FAILURE_CUTOFF}, number_of_errors: {number_of_errors}"
# )
# if index <= FAILURE_CUTOFF:
# if number_of_errors == FAILURE_CUTOFF:
# raise RunnerException(
# 7,
# f"Too many errors ({number_of_errors}) occurred in the first {FAILURE_CUTOFF} files. Exiting.",
# current_error_rate(number_of_errors, index),
# )

# logger.info(f"processing file {index + 1} of {total_number_of_files}")

function_args_list: list[tuple] = []

Expand Down Expand Up @@ -732,22 +740,40 @@ def loop_over_files_and_generate_results(
logger.error(
f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting."
)
error_code = 7
raise RunnerException(
7,
f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting.",
*get_error_by_code(error_code, runner_error_codes, logger)
)

# Test the rest of the files

logger.info(f"Testing the rest of the files...")
rest_results = dask_multiprocess(
run_submission_and_generate_performance_metrics,
rest_func_argument_list,
# n_workers=4,
threads_per_worker=1,
# memory_limit="16GiB",
logger=logger,
)
rest_results = []
try:
rest_results = dask_multiprocess(
run_submission_and_generate_performance_metrics,
rest_func_argument_list,
# n_workers=4,
threads_per_worker=1,
# memory_limit="16GiB",
logger=logger,
)
except SubmissionException as e:
logger.error(f"Submission error: {e}")
raise e
except RunnerException as e:
logger.error(f"Runner error: {e}")
raise e
except Exception as e:
if e.args:
if len(e.args) == 2:
logger.error(f"Submission error: {e}")

raise RunnerException(*e.args)
logger.error(f"Submission error: {e}")
raise RunnerException(
*get_error_by_code(500, runner_error_codes, logger)
)
errors = [error for _, error in rest_results]
number_of_errors += sum(errors)

Expand Down Expand Up @@ -790,10 +816,10 @@ def generate_performance_metrics_for_submission(
logger.error(
f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}"
)
error_code = 8

raise RunnerException(
100,
f"submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}",
*get_error_by_code(error_code, runner_error_codes, logger)
)

# Convert the data outputs to a dictionary identical to the
Expand Down Expand Up @@ -832,6 +858,7 @@ def generate_performance_metrics_for_submission(
return results_dictionary


@timeout(SUBMISSION_TIMEOUT)
def run_submission_and_generate_performance_metrics(
file_name: str,
data_dir: str,
Expand Down
57 changes: 46 additions & 11 deletions workers/src/submission_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from importlib import import_module
from typing import Any, Callable, Optional
from matplotlib.pylab import f
import requests
import sys
import os
Expand All @@ -22,6 +21,8 @@
RunnerException,
SubmissionException,
WorkerException,
get_error_by_code,
get_error_codes_dict,
pull_from_s3,
timing,
is_local,
Expand Down Expand Up @@ -52,8 +53,10 @@ def update_submission_status(
api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/change_submission_status/{submission_id}"
r = requests.put(api_route, data={"status": new_status})
if not r.ok:
logger.error(f"Error updating submission status to {new_status}")
error_code = 5
raise WorkerException(
5, f"Error updating submission status to {new_status}"
*get_error_by_code(error_code, worker_error_codes, logger),
)
data: dict[str, Any] = r.json()
return data
Expand Down Expand Up @@ -87,8 +90,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id):
r = requests.put(s3_file_full_path, data=file_content)
logger.info(f"Received S3 emulator response: {r.status_code}")
if not r.ok:
logger.error(f"S3 emulator error: {r.content}")
error_code = 1
raise WorkerException(
1, f"Error uploading file to s3 emulator: {r.content}"
*get_error_by_code(error_code, worker_error_codes, logger),
)
return {"status": "success"}
else:
Expand All @@ -99,7 +104,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id):
logger.error(f"Error: {e}")
logger.info(f"update submission status to {FAILED}")
update_submission_status(analysis_id, submission_id, FAILED)
raise WorkerException(1, f"Error uploading file to s3")
error_code = 1
raise WorkerException(
*get_error_by_code(error_code, worker_error_codes, logger)
)


def list_s3_bucket(s3_dir: str):
Expand Down Expand Up @@ -151,8 +159,10 @@ def update_submission_result(
api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/update_submission_result/{submission_id}"
r = requests.put(api_route, json=result_json, headers=headers)
if not r.ok:
logger.error(f"Error updating submission result to Django API")
error_code = 4
raise WorkerException(
4, "Error updating submission result to Django API"
*get_error_by_code(error_code, worker_error_codes, logger),
)
data: dict[str, Any] = r.json()
return data
Expand Down Expand Up @@ -231,19 +241,25 @@ def extract_analysis_data( # noqa: C901
)
response = requests.get(fmd_url)
if not response.ok:
error_code = 7
logger.error(
f"File metadata for file id {file_id} not found in Django API"
)
raise requests.exceptions.HTTPError(
7,
f"File metadata for file id {file_id} not found in Django API",
*get_error_by_code(error_code, worker_error_codes, logger),
)
file_metadata_list.append(response.json())

# Convert the list of file metadata to a DataFrame
file_metadata_df = pd.DataFrame(file_metadata_list)

if file_metadata_df.empty:
logger.error(
f"File metadata DataFrame is empty for analysis {analysis_id}"
)
error_code = 8
raise WorkerException(
8,
f"No file metadata found in Django API for analysis {analysis_id}",
*get_error_by_code(error_code, worker_error_codes, logger),
)

files_for_analysis: list[str] = file_metadata_df["file_name"].tolist()
Expand Down Expand Up @@ -329,6 +345,12 @@ def load_analysis(
os.path.join(current_evaluation_dir, "pvinsight-validation-runner.py"),
)

# Copy the error codes file into the current evaluation directory
shutil.copy(
os.path.join("/root/worker/src", "errorcodes.json"),
os.path.join(current_evaluation_dir, "errorcodes.json"),
)

# import analysis runner as a module
sys.path.insert(0, current_evaluation_dir)
runner_module_name = "pvinsight-validation-runner"
Expand Down Expand Up @@ -571,12 +593,20 @@ def post_error_to_api(
logger.info(f"Received response: {r.text}")

if not r.ok:
raise WorkerException(1, "Error posting error report to Django API")
logger.error("Error posting error report to API")
cur_error_code = 12
raise WorkerException(
*get_error_by_code(cur_error_code, worker_error_codes, logger)
)

try:
data: dict[str, Any] = r.json()
except json.JSONDecodeError:
raise WorkerException(1, "Django API did not return a JSON response")
logger.error("Django API did not return a JSON response")
cur_error_code = 13
raise WorkerException(
*get_error_by_code(cur_error_code, worker_error_codes, logger)
)

return data

Expand Down Expand Up @@ -773,6 +803,11 @@ def main():
logger.info(f"CURRENT_EVALUATION_DIR: {CURRENT_EVALUATION_DIR}")
# Set to folder where the evaluation scripts are stored
logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}")

worker_error_codes = get_error_codes_dict(
FILE_DIR, WORKER_ERROR_PREFIX, logger
)

_, execution_time = main()
logger.info(
f"Shutting down Submission Worker. Runtime: {execution_time:.3f} seconds."
Expand Down
Loading

0 comments on commit 4f9e957

Please sign in to comment.