From 1474830685ed8ed9cf4cf009080a88745b1bb7e8 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Wed, 8 May 2024 23:36:03 -0700 Subject: [PATCH 01/10] Refactored runner to isolate submission run function and implemented a basic Dask client --- docker-compose.yml | 3 +- workers/requirements.txt | 5 +- workers/src/logging_config.json | 6 +- workers/src/pvinsight-validation-runner.py | 390 +++++++++++++++------ workers/src/submission_worker.py | 10 +- workers/src/utility.py | 96 ++++- 6 files changed, 386 insertions(+), 124 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5840066a..f979696d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -111,6 +111,7 @@ services: - .env ports: - "8500:8500" + - "8787:8787" depends_on: - api - s3 @@ -118,7 +119,7 @@ services: volumes: - ./workers/current_evaluation:/root/worker/current_evaluation - ./workers/logs:/root/worker/logs - - ./workers/tmp:/tmp + # - ./workers/tmp:/tmp - ./workers/requirements.txt:/root/worker/requirements.txt - ./workers/src:/root/worker/src restart: unless-stopped diff --git a/workers/requirements.txt b/workers/requirements.txt index 67bd07a5..d5ae2f31 100644 --- a/workers/requirements.txt +++ b/workers/requirements.txt @@ -3,4 +3,7 @@ boto3 pandas numpy seaborn -matplotlib \ No newline at end of file +matplotlib +dask +distributed +bokeh \ No newline at end of file diff --git a/workers/src/logging_config.json b/workers/src/logging_config.json index 7a65643f..80019893 100644 --- a/workers/src/logging_config.json +++ b/workers/src/logging_config.json @@ -27,19 +27,19 @@ "handlers": { "stdout": { "class": "logging.StreamHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "simple", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "detailed", "filename": "logs/submission.log" }, "json_file": { "class": "logging.handlers.RotatingFileHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "json", "filename": "logs/submission.log.jsonl" } diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index dc34d6da..48ed98eb 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -16,6 +16,7 @@ This section will be dependent on the type of analysis being run. """ +from multiprocessing.spawn import prepare from typing import Any, Callable, cast import pandas as pd import os @@ -35,7 +36,13 @@ import logging import boto3 from logger import setup_logging -from utility import RunnerException, pull_from_s3, timing, is_local +from utility import ( + RunnerException, + dask_multiprocess, + pull_from_s3, + timing, + is_local, +) FAILED = "failed" @@ -263,7 +270,11 @@ def generate_scatter_plot(dataframe, x_axis, y_axis, title): @timing(verbose=True, logger=logger) -def run_user_submission(fn: Callable, *args: Any, **kwargs: Any) -> Any: +def run_user_submission( + fn: Callable[..., pd.Series], + *args: Any, + **kwargs: Any, +): return fn(*args, **kwargs) @@ -403,7 +414,7 @@ def run( # noqa: C901 config_data: dict[str, Any] = json.load(f) # Get the associated metrics we're supposed to calculate - performance_metrics = config_data["performance_metrics"] + performance_metrics: list[str] = config_data["performance_metrics"] logger.info(f"performance_metrics: {performance_metrics}") # Get the name of the function we want to import associated with this @@ -438,114 +449,17 @@ def current_error_rate(number_of_errors: int, index: int): # Loop through each file and generate predictions - for index, (_, row) in enumerate(file_metadata_df.iterrows()): - logger.debug( - f"index: {index}, FAILURE_CUTOFF: {FAILURE_CUTOFF}, number_of_errors: {number_of_errors}" - ) - if index <= FAILURE_CUTOFF: - if number_of_errors == FAILURE_CUTOFF: - raise RunnerException( - 7, - f"Too many errors ({number_of_errors}) occurred in the first {FAILURE_CUTOFF} files. Exiting.", - current_error_rate(number_of_errors, index), - ) - file_name = row["file_name"] - - # Get associated system ID - system_id = row["system_id"] - - # Get all of the associated metadata for the particular file based - # on its system ID. This metadata will be passed in via kwargs for - # any necessary arguments - associated_metadata: dict[str, Any] = dict( - system_metadata_df[ - system_metadata_df["system_id"] == system_id - ].iloc[0] - ) - - logger.info(f"processing file {index + 1} of {total_number_of_files}") - try: - # Get file_name, which will be pulled from database or S3 for - # each analysis - ( - data_outputs, - function_run_time, - ) = run_submission( - file_name, - data_dir, - associated_metadata, - config_data, - submission_function, - function_parameters, - row, - ) - - except Exception as e: - logger.error(f"error running function {function_name}: {e}") - number_of_errors += 1 - continue - - # Get the ground truth scalars that we will compare to - ground_truth_dict = dict() - if config_data["comparison_type"] == "scalar": - for val in config_data["ground_truth_compare"]: - ground_truth_dict[val] = associated_metadata[val] - if config_data["comparison_type"] == "time_series": - ground_truth_series: pd.Series = pd.read_csv( - os.path.join(data_dir + "/validation_data/", file_name), - index_col=0, - parse_dates=True, - ).squeeze() - ground_truth_dict["time_series"] = ground_truth_series - - ground_truth_file_length = len(ground_truth_series) - - file_submission_result_length = len(data_outputs) - if file_submission_result_length != ground_truth_file_length: - logger.error( - f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}" - ) - - number_of_errors += 1 - continue - - # Convert the data outputs to a dictionary identical to the - # ground truth dictionary - output_dictionary: dict[str, Any] = dict() - if config_data["comparison_type"] == "scalar": - for idx in range(len(config_data["ground_truth_compare"])): - output_dictionary[config_data["ground_truth_compare"][idx]] = ( - data_outputs[idx] - ) - if config_data["comparison_type"] == "time_series": - output_dictionary["time_series"] = data_outputs - # Run routine for all of the performance metrics and append - # results to the dictionary - results_dictionary: dict[str, Any] = dict() - results_dictionary["file_name"] = file_name - # Set the runtime in the results dictionary - results_dictionary["run_time"] = function_run_time - # Set the data requirements in the dictionary - results_dictionary["data_requirements"] = function_parameters - # Loop through the rest of the performance metrics and calculate them - # (this predominantly applies to error metrics) - for metric in performance_metrics: - if metric == "absolute_error": - # Loop through the input and the output dictionaries, - # and calculate the absolute error - for val in config_data["ground_truth_compare"]: - error = np.abs( - output_dictionary[val] - ground_truth_dict[val] - ) - results_dictionary[metric + "_" + val] = error - elif metric == "mean_absolute_error": - for val in config_data["ground_truth_compare"]: - error = np.mean( - np.abs(output_dictionary[val] - ground_truth_dict[val]) - ) - results_dictionary[metric + "_" + val] = error - results_list.append(results_dictionary) - logger.info(f"results_dictionary: {results_dictionary}") + results_list = loop_over_files_and_generate_results( + file_metadata_df, + system_metadata_df, + data_dir, + config_data, + submission_function, + function_parameters, + number_of_errors, + function_name, + performance_metrics, + ) # Convert the results to a pandas dataframe and perform all of the # post-processing in the script results_df = pd.DataFrame(results_list) @@ -653,12 +567,103 @@ def current_error_rate(number_of_errors: int, index: int): return public_metrics_dict +def create_function_args_for_file( + file_metadata_row: pd.Series, + system_metadata_df: pd.DataFrame, + data_dir: str, + config_data: dict[str, Any], + submission_function: Callable[..., pd.Series], + function_parameters: list[str], + number_of_errors: int, + function_name: str, + performance_metrics: list[str], + file_number: int, +): + + file_name: str = file_metadata_row["file_name"] + + # Get associated system ID + system_id = file_metadata_row["system_id"] + + # Get all of the associated metadata for the particular file based + # on its system ID. This metadata will be passed in via kwargs for + # any necessary arguments + associated_system_metadata: dict[str, Any] = dict( + system_metadata_df[system_metadata_df["system_id"] == system_id].iloc[ + 0 + ] + ) + + function_args = ( + file_name, + data_dir, + associated_system_metadata, + config_data, + submission_function, + function_parameters, + file_metadata_row, + number_of_errors, + function_name, + performance_metrics, + file_number, + ) + + return function_args + + +def prepare_function_args_for_parallel_processing( + file_metadata_df: pd.DataFrame, + system_metadata_df: pd.DataFrame, + data_dir: str, + config_data: dict[str, Any], + submission_function: Callable[..., pd.Series], + function_parameters: list[str], + number_of_errors: int, + function_name: str, + performance_metrics: list[str], +): + # logger.debug( + # f"index: {index}, FAILURE_CUTOFF: {FAILURE_CUTOFF}, number_of_errors: {number_of_errors}" + # ) + # if index <= FAILURE_CUTOFF: + # if number_of_errors == FAILURE_CUTOFF: + # raise RunnerException( + # 7, + # f"Too many errors ({number_of_errors}) occurred in the first {FAILURE_CUTOFF} files. Exiting.", + # current_error_rate(number_of_errors, index), + # ) + + # logger.info(f"processing file {index + 1} of {total_number_of_files}") + + function_args_list: list[tuple] = [] + + for file_number, (_, file_metadata_row) in enumerate( + file_metadata_df.iterrows() + ): + + function_args = create_function_args_for_file( + file_metadata_row, + system_metadata_df, + data_dir, + config_data, + submission_function, + function_parameters, + number_of_errors, + function_name, + performance_metrics, + file_number, + ) + function_args_list.append(function_args) + + return function_args_list + + def run_submission( file_name: str, data_dir: str, associated_metadata: dict[str, Any], config_data: dict[str, Any], - submission_function: Callable, + submission_function: Callable[..., pd.Series], function_parameters: list[str], row: pd.Series, ): @@ -690,6 +695,165 @@ def run_submission( ) +def loop_over_files_and_generate_results( + file_metadata_df: pd.DataFrame, + system_metadata_df: pd.DataFrame, + data_dir: str, + config_data: dict[str, Any], + submission_function: Callable[..., pd.Series], + function_parameters: list[str], + number_of_errors: int, + function_name: str, + performance_metrics: list[str], +): + + func_arguments_list = prepare_function_args_for_parallel_processing( + file_metadata_df, + system_metadata_df, + data_dir, + config_data, + submission_function, + function_parameters, + number_of_errors, + function_name, + performance_metrics, + ) + + results = dask_multiprocess( + run_submission_and_generate_performance_metrics, + func_arguments_list, + n_workers=2, + threads_per_worker=1, + memory_limit="16GiB", + logger=logger, + ) + return results + + +def generate_performance_metrics_for_submission( + data_outputs: pd.Series, + function_run_time: float, + file_name: str, + data_dir: str, + associated_metadata: dict[str, Any], + config_data: dict[str, Any], + function_parameters: list[str], + number_of_errors: int, + performance_metrics: list[str], +): + # Get the ground truth scalars that we will compare to + ground_truth_dict = dict() + if config_data["comparison_type"] == "scalar": + for val in config_data["ground_truth_compare"]: + ground_truth_dict[val] = associated_metadata[val] + if config_data["comparison_type"] == "time_series": + ground_truth_series: pd.Series = pd.read_csv( + os.path.join(data_dir + "/validation_data/", file_name), + index_col=0, + parse_dates=True, + ).squeeze() + ground_truth_dict["time_series"] = ground_truth_series + + ground_truth_file_length = len(ground_truth_series) + + file_submission_result_length = len(data_outputs) + if file_submission_result_length != ground_truth_file_length: + logger.error( + f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}" + ) + + number_of_errors += 1 + raise RunnerException( + 100, + f"submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}", + ) + + # Convert the data outputs to a dictionary identical to the + # ground truth dictionary + output_dictionary: dict[str, Any] = dict() + if config_data["comparison_type"] == "scalar": + for idx in range(len(config_data["ground_truth_compare"])): + output_dictionary[config_data["ground_truth_compare"][idx]] = ( + data_outputs[idx] + ) + if config_data["comparison_type"] == "time_series": + output_dictionary["time_series"] = data_outputs + # Run routine for all of the performance metrics and append + # results to the dictionary + results_dictionary: dict[str, Any] = dict() + results_dictionary["file_name"] = file_name + # Set the runtime in the results dictionary + results_dictionary["run_time"] = function_run_time + # Set the data requirements in the dictionary + results_dictionary["data_requirements"] = function_parameters + # Loop through the rest of the performance metrics and calculate them + # (this predominantly applies to error metrics) + for metric in performance_metrics: + if metric == "absolute_error": + # Loop through the input and the output dictionaries, + # and calculate the absolute error + for val in config_data["ground_truth_compare"]: + error = np.abs(output_dictionary[val] - ground_truth_dict[val]) + results_dictionary[metric + "_" + val] = error + elif metric == "mean_absolute_error": + for val in config_data["ground_truth_compare"]: + error = np.mean( + np.abs(output_dictionary[val] - ground_truth_dict[val]) + ) + results_dictionary[metric + "_" + val] = error + logger.info(f"results_dictionary: {results_dictionary}") + return results_dictionary + + +def run_submission_and_generate_performance_metrics( + file_name: str, + data_dir: str, + associated_system_metadata: dict[str, Any], + config_data: dict[str, Any], + submission_function: Callable[..., pd.Series], + function_parameters: list[str], + file_metadata_row: pd.Series, + number_of_errors: int, + function_name: str, + performance_metrics: list[str], + file_number: int, +): + logger.info(f"{file_number} - running submission for file {file_name}") + try: + # Get file_name, which will be pulled from database or S3 for + # each analysis + ( + data_outputs, + function_run_time, + ) = run_submission( + file_name, + data_dir, + associated_system_metadata, + config_data, + submission_function, + function_parameters, + file_metadata_row, + ) + + except Exception as e: + logger.error(f"error running function {function_name}: {e}") + number_of_errors += 1 + + results_dictionary = generate_performance_metrics_for_submission( + data_outputs, + function_run_time, + file_name, + data_dir, + associated_system_metadata, + config_data, + function_parameters, + number_of_errors, + performance_metrics, + ) + + return results_dictionary + + def prepare_kwargs_for_submission_function( config_data: dict[str, Any], function_parameters: list[str], @@ -729,7 +893,7 @@ def prepare_time_series( if __name__ == "__main__": pass # run( - # "submission_files/submission_user_1/submission_5/2aa7ab7c-57b7-403e-b42e-097c15cb7421_Archive.zip", + # "submission_files/submission_user_1/submission_118/dfec718f-bb6e-4194-98cf-2edea6f3f717_sdt-submission.zip", # "/root/worker/current_evaluation", # ) # push_to_s3( diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 215c4b40..8e57164c 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -91,7 +91,7 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): with open(local_file_path, "rb") as f: file_content = f.read() logger.info( - f"Sending emulator PUT request to {s3_file_full_path} with file content: {file_content}" + f"Sending emulator PUT request to {s3_file_full_path} with file content (100 chars): {file_content[:100]}" ) r = requests.put(s3_file_full_path, data=file_content) logger.info(f"Received S3 emulator response: {r.status_code}") @@ -394,6 +394,14 @@ def process_submission_message( f"execute runner module function with argument {s3_submission_zip_file_path}" ) + logger.debug(f"s3_submission_zip_file_path: {s3_submission_zip_file_path}") + logger.debug(f"file_metadata_df: {file_metadata_df}") + logger.debug(f"update_submission_status: {update_submission_status}") + logger.debug(f"analysis_id: {analysis_id}") + logger.debug(f"submission_id: {submission_id}") + logger.debug(f"current_evaluation_dir: {current_evaluation_dir}") + logger.debug(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") + ret = analysis_function( s3_submission_zip_file_path, file_metadata_df, diff --git a/workers/src/utility.py b/workers/src/utility.py index 7aee2215..1a0f4171 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -1,12 +1,17 @@ -from concurrent.futures import ProcessPoolExecutor, as_completed +from dask.delayed import delayed +from dask.distributed import Client + +from concurrent.futures import ProcessPoolExecutor, as_completed, thread from functools import wraps from logging import Logger -from time import perf_counter +from time import perf_counter, sleep import os -from typing import Callable, Tuple, TypeVar, Union +from typing import Any, Callable, Tuple, TypeVar, Union import boto3 import botocore.exceptions +from distributed import LocalCluster +from matplotlib.pylab import f import requests @@ -42,8 +47,9 @@ def wrapper(*args, **kwargs) -> Tuple[T, float]: def multiprocess( - func: Callable[..., T], data: list, n_processes: int, logger: Logger + func: Callable[..., T], data: list, n_processes: int, logger: Logger | None ) -> list[T]: + log = logger or print with ProcessPoolExecutor(max_workers=n_processes) as executor: futures = {executor.submit(func, d): d for d in data} results: list[T] = [] @@ -51,7 +57,65 @@ def multiprocess( try: results.append(future.result()) except Exception as e: - logger.error(f"Error: {e}") + log.error(f"Error: {e}") + return results + + +def dask_multiprocess( + func: Callable[..., T], + func_arguments: list[tuple[Any, ...]], + n_workers: int | None = None, + threads_per_worker: int | None = None, + memory_limit: str | float | int | None = None, + logger: Logger | None = None, +) -> T | list[T] | tuple[T, ...]: + + # if n_workers is None: + # n_workers = os.cpu_count() + # if n_workers is None: + # msg = ( + # "Could not determine number of CPUs. Defaulting to 4 workers." + # ) + # if logger: + # logger.warning(msg) + # else: + # print(msg) + # n_workers = 4 + + # if threads_per_worker is None: + # threads_per_worker = None + + client = Client( + n_workers=n_workers, + threads_per_worker=threads_per_worker, + memory_limit=memory_limit, + ) + + # LocalCluster() + + if logger is not None: + print(f"logger name: {logger.name}") + logger.info(f"Forwarding logging to dask client") + client.forward_logging(logger.name) + + if logger is not None: + logger.info(f"Created dask client") + logger.info(f"Client: {client}") + else: + print(f"Created dask client") + print(f"Client: {client}") + + lazy_results = [] + for args in func_arguments: + lazy_result = delayed(func)(*args) + lazy_results.append(lazy_result) + + futures = client.compute(lazy_results) + + results = client.gather(futures) + + client.close() + return results @@ -148,3 +212,25 @@ def pull_from_s3( ) return target_file_path + + +if __name__ == "__main__": + + def expensive_function(x): + print(x) + sleep(2) + return x**2 + + data = list(range(10)) + func_args = [(d,) for d in data] + n_processes = 2 + threads_per_worker = 1 + logger = None + results = dask_multiprocess( + expensive_function, + [(d,) for d in data], + n_workers=n_processes, + threads_per_worker=threads_per_worker, + logger=logger, + ) + print(results) From 018744cbb162cff828c34d36976d53ca9f67dab5 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Wed, 8 May 2024 23:56:54 -0700 Subject: [PATCH 02/10] Fixed logging level --- workers/src/logging_config.json | 6 +++--- workers/src/utility.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/workers/src/logging_config.json b/workers/src/logging_config.json index 80019893..7a65643f 100644 --- a/workers/src/logging_config.json +++ b/workers/src/logging_config.json @@ -27,19 +27,19 @@ "handlers": { "stdout": { "class": "logging.StreamHandler", - "level": "DEBUG", + "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", - "level": "DEBUG", + "level": "INFO", "formatter": "detailed", "filename": "logs/submission.log" }, "json_file": { "class": "logging.handlers.RotatingFileHandler", - "level": "DEBUG", + "level": "INFO", "formatter": "json", "filename": "logs/submission.log.jsonl" } diff --git a/workers/src/utility.py b/workers/src/utility.py index 1a0f4171..538c091b 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -7,7 +7,7 @@ from time import perf_counter, sleep import os from typing import Any, Callable, Tuple, TypeVar, Union - +import logging import boto3 import botocore.exceptions from distributed import LocalCluster @@ -96,7 +96,7 @@ def dask_multiprocess( if logger is not None: print(f"logger name: {logger.name}") logger.info(f"Forwarding logging to dask client") - client.forward_logging(logger.name) + client.forward_logging(logger.name, level=logging.INFO) if logger is not None: logger.info(f"Created dask client") From 1698aad73fd9e1db6ff93eb4a1124267c2e2c702 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Mon, 13 May 2024 18:19:42 -0700 Subject: [PATCH 03/10] Added dask memory limits --- workers/src/logging_config.json | 3 +- workers/src/pvinsight-validation-runner.py | 12 +-- workers/src/utility.py | 96 ++++++++++++++++++++-- 3 files changed, 95 insertions(+), 16 deletions(-) diff --git a/workers/src/logging_config.json b/workers/src/logging_config.json index 7a65643f..09df840d 100644 --- a/workers/src/logging_config.json +++ b/workers/src/logging_config.json @@ -51,7 +51,8 @@ "stdout", "file", "json_file" - ] + ], + "propagate": false } } } \ No newline at end of file diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index 48ed98eb..e4ab5641 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -667,9 +667,6 @@ def run_submission( function_parameters: list[str], row: pd.Series, ): - - logger.info(f"file_name: {file_name}") - # Create master dictionary of all possible function kwargs kwargs = prepare_kwargs_for_submission_function( config_data, function_parameters, row, associated_metadata @@ -722,9 +719,9 @@ def loop_over_files_and_generate_results( results = dask_multiprocess( run_submission_and_generate_performance_metrics, func_arguments_list, - n_workers=2, - threads_per_worker=1, - memory_limit="16GiB", + # n_workers=2, + # threads_per_worker=1, + # memory_limit="16GiB", logger=logger, ) return results @@ -801,7 +798,6 @@ def generate_performance_metrics_for_submission( np.abs(output_dictionary[val] - ground_truth_dict[val]) ) results_dictionary[metric + "_" + val] = error - logger.info(f"results_dictionary: {results_dictionary}") return results_dictionary @@ -818,8 +814,8 @@ def run_submission_and_generate_performance_metrics( performance_metrics: list[str], file_number: int, ): - logger.info(f"{file_number} - running submission for file {file_name}") try: + logger.info(f"{file_number} - running submission for file {file_name}") # Get file_name, which will be pulled from database or S3 for # each analysis ( diff --git a/workers/src/utility.py b/workers/src/utility.py index 538c091b..4f9e0ff4 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -1,18 +1,21 @@ +import sys from dask.delayed import delayed from dask.distributed import Client +from dask import config from concurrent.futures import ProcessPoolExecutor, as_completed, thread from functools import wraps from logging import Logger -from time import perf_counter, sleep +from time import perf_counter, sleep, time import os from typing import Any, Callable, Tuple, TypeVar, Union import logging import boto3 import botocore.exceptions from distributed import LocalCluster -from matplotlib.pylab import f +import psutil import requests +import math WORKER_ERROR_PREFIX = "wr" @@ -27,9 +30,9 @@ def timing(verbose: bool = True, logger: Union[Logger, None] = None): def decorator(func: Callable[..., T]): @wraps(func) def wrapper(*args, **kwargs) -> Tuple[T, float]: - start_time = perf_counter() + start_time = time() result = func(*args, **kwargs) - end_time = perf_counter() + end_time = time() execution_time = end_time - start_time if verbose: msg = ( @@ -66,8 +69,9 @@ def dask_multiprocess( func_arguments: list[tuple[Any, ...]], n_workers: int | None = None, threads_per_worker: int | None = None, - memory_limit: str | float | int | None = None, + memory_limit: float | int | None = None, logger: Logger | None = None, + **kwargs, ) -> T | list[T] | tuple[T, ...]: # if n_workers is None: @@ -85,10 +89,88 @@ def dask_multiprocess( # if threads_per_worker is None: # threads_per_worker = None + # if n_workers is None: + # n_workers = cpu_count + # if n_workers * n_processes > cpu_count: + # raise Exception(f"workers and threads exceed local resources, {cpu_count} cores present") + # if n_workers * memory_limit > sys_memory: + # config.set({'distributed.worker.memory.spill': True}) + # print(f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n") + + memory_limit = memory_limit or 7.0 + + cpu_count = os.cpu_count() + # memory limit in GB + sys_memory = psutil.virtual_memory().total / (1024.0**3) + + if cpu_count is None: + raise Exception("Could not determine number of CPUs.") + + if n_workers is not None and threads_per_worker is not None: + if n_workers * threads_per_worker > cpu_count: + raise Exception( + f"workers and threads exceed local resources, {cpu_count} cores present" + ) + if memory_limit * n_workers * threads_per_worker > sys_memory: + config.set({"distributed.worker.memory.spill": True}) + print( + f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n" + ) + + if n_workers is not None and threads_per_worker is None: + threads_per_worker = int( + math.floor(sys_memory / (memory_limit * n_workers)) + ) + if threads_per_worker == 0: + print( + "Not enough memory for a worker, defaulting to 1 thread per worker" + ) + threads_per_worker = 1 + + if n_workers is None and threads_per_worker is not None: + n_workers = int( + math.floor(sys_memory / (memory_limit * threads_per_worker)) + ) + if n_workers == 0: + print("Not enough memory for a worker, defaulting to 1 worker") + n_workers = 1 + + if n_workers is None and threads_per_worker is None: + if memory_limit == 0: + raise Exception("Memory limit cannot be 0") + thread_worker_total = sys_memory / memory_limit + if thread_worker_total < 2: + print( + "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker" + ) + n_workers = 1 + threads_per_worker = 1 + if memory_limit * n_workers > sys_memory: + config.set({"distributed.worker.memory.spill": True}) + print( + f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n" + ) + else: + print(f"thread_worker_total: {thread_worker_total}") + n_workers = int(math.floor(thread_worker_total / 2)) + threads_per_worker = int(math.floor(thread_worker_total / 2)) + + # config.set({"distributed.worker.memory.spill": True}) + config.set({"distributed.worker.memory.pause": True}) + config.set({"distributed.worker.memory.target": 0.95}) + config.set({"distributed.worker.memory.terminate": False}) + + print(f"cpu count: {cpu_count}") + print(f"memory: {sys_memory}") + print(f"memory limit per worker: {memory_limit}") + print(f"n_workers: {n_workers}") + print(f"threads_per_worker: {threads_per_worker}") + client = Client( n_workers=n_workers, threads_per_worker=threads_per_worker, - memory_limit=memory_limit, + memory_limit=f"{memory_limit}GiB", + **kwargs, ) # LocalCluster() @@ -107,7 +189,7 @@ def dask_multiprocess( lazy_results = [] for args in func_arguments: - lazy_result = delayed(func)(*args) + lazy_result = delayed(func, pure=True)(*args) lazy_results.append(lazy_result) futures = client.compute(lazy_results) From 6fb14e59674c35ddef83cdcd50ca6c409dbde672 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Wed, 15 May 2024 20:44:22 -0700 Subject: [PATCH 04/10] Fixed memory issues --- workers/src/submission_worker.py | 11 +++---- workers/src/utility.py | 52 ++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 8e57164c..095247f7 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -59,12 +59,6 @@ def update_submission_status( return data -# base -BASE_TEMP_DIR = tempfile.mkdtemp() -# Set to folder where the evaluation scripts are stored -logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") - - FILE_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FILE_DIR = os.path.abspath(os.path.join(FILE_DIR, "..", "logs")) CURRENT_EVALUATION_DIR = os.path.abspath( @@ -769,6 +763,11 @@ def main(): if __name__ == "__main__": logger.info(f"Starting Submission Worker.") + + # base + BASE_TEMP_DIR = tempfile.mkdtemp() + # Set to folder where the evaluation scripts are stored + logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") _, execution_time = main() logger.info(f"Submission Worker took {execution_time:.3f} seconds to run") logger.info(f"Submission Worker finished.") diff --git a/workers/src/utility.py b/workers/src/utility.py index 4f9e0ff4..64e5ba57 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -30,9 +30,9 @@ def timing(verbose: bool = True, logger: Union[Logger, None] = None): def decorator(func: Callable[..., T]): @wraps(func) def wrapper(*args, **kwargs) -> Tuple[T, float]: - start_time = time() + start_time = perf_counter() result = func(*args, **kwargs) - end_time = time() + end_time = perf_counter() execution_time = end_time - start_time if verbose: msg = ( @@ -69,7 +69,7 @@ def dask_multiprocess( func_arguments: list[tuple[Any, ...]], n_workers: int | None = None, threads_per_worker: int | None = None, - memory_limit: float | int | None = None, + memory_per_run: float | int | None = None, logger: Logger | None = None, **kwargs, ) -> T | list[T] | tuple[T, ...]: @@ -97,7 +97,7 @@ def dask_multiprocess( # config.set({'distributed.worker.memory.spill': True}) # print(f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n") - memory_limit = memory_limit or 7.0 + memory_per_run = memory_per_run or 7.0 cpu_count = os.cpu_count() # memory limit in GB @@ -111,15 +111,15 @@ def dask_multiprocess( raise Exception( f"workers and threads exceed local resources, {cpu_count} cores present" ) - if memory_limit * n_workers * threads_per_worker > sys_memory: + if memory_per_run * n_workers * threads_per_worker > sys_memory: config.set({"distributed.worker.memory.spill": True}) print( - f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n" + f"Memory per worker exceeds system memory ({memory_per_run} GB), activating memory spill\n" ) if n_workers is not None and threads_per_worker is None: threads_per_worker = int( - math.floor(sys_memory / (memory_limit * n_workers)) + math.floor(sys_memory / (memory_per_run * n_workers)) ) if threads_per_worker == 0: print( @@ -129,56 +129,70 @@ def dask_multiprocess( if n_workers is None and threads_per_worker is not None: n_workers = int( - math.floor(sys_memory / (memory_limit * threads_per_worker)) + math.floor(sys_memory / (memory_per_run * threads_per_worker)) ) if n_workers == 0: print("Not enough memory for a worker, defaulting to 1 worker") n_workers = 1 if n_workers is None and threads_per_worker is None: - if memory_limit == 0: + if memory_per_run == 0: raise Exception("Memory limit cannot be 0") - thread_worker_total = sys_memory / memory_limit + thread_worker_total = math.floor(sys_memory / memory_per_run) if thread_worker_total < 2: print( "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker" ) n_workers = 1 threads_per_worker = 1 - if memory_limit * n_workers > sys_memory: + if memory_per_run * n_workers > sys_memory: config.set({"distributed.worker.memory.spill": True}) print( - f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n" + f"Memory per worker exceeds system memory ({memory_per_run} GB), activating memory spill\n" ) else: print(f"thread_worker_total: {thread_worker_total}") - n_workers = int(math.floor(thread_worker_total / 2)) + n_workers = int(math.ceil(thread_worker_total / 2)) threads_per_worker = int(math.floor(thread_worker_total / 2)) + if n_workers + threads_per_worker != thread_worker_total: + print( + f"n_workers: {n_workers}, threads_per_worker: {threads_per_worker}, thread_worker_total: {thread_worker_total}" + ) + raise Exception( + "Could not determine number of workers and threads" + ) # config.set({"distributed.worker.memory.spill": True}) config.set({"distributed.worker.memory.pause": True}) config.set({"distributed.worker.memory.target": 0.95}) config.set({"distributed.worker.memory.terminate": False}) + if threads_per_worker is None: + threads_per_worker = 1 + + memory_per_worker = memory_per_run * threads_per_worker + print(f"cpu count: {cpu_count}") print(f"memory: {sys_memory}") - print(f"memory limit per worker: {memory_limit}") + print(f"memory per run: {memory_per_run}") print(f"n_workers: {n_workers}") print(f"threads_per_worker: {threads_per_worker}") + print(f"memory per worker: {memory_per_worker}") client = Client( n_workers=n_workers, threads_per_worker=threads_per_worker, - memory_limit=f"{memory_limit}GiB", + memory_limit=f"{memory_per_worker}GiB", **kwargs, ) + # client = LocalCluster() # LocalCluster() - if logger is not None: - print(f"logger name: {logger.name}") - logger.info(f"Forwarding logging to dask client") - client.forward_logging(logger.name, level=logging.INFO) + # if logger is not None: + # print(f"logger name: {logger.name}") + # logger.info(f"Forwarding logging to dask client") + # client.forward_logging(logger.name, level=logging.INFO) if logger is not None: logger.info(f"Created dask client") From 521cf604505c04d0697980ab7509247a4836bccd Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Wed, 15 May 2024 20:44:49 -0700 Subject: [PATCH 05/10] Removed deprecated version --- docker-compose.yml | 2 -- prod.docker-compose.yml | 2 -- 2 files changed, 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f979696d..c95abc07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.8" - services: db: image: postgres:15-alpine diff --git a/prod.docker-compose.yml b/prod.docker-compose.yml index fbadccc1..b89c92a3 100644 --- a/prod.docker-compose.yml +++ b/prod.docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.8" - services: db: image: postgres:15-alpine From 886f77831515063f265f1de60e91029ced41901e Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Wed, 15 May 2024 21:47:41 -0700 Subject: [PATCH 06/10] Changed dask threads to 1 --- workers/src/pvinsight-validation-runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index e4ab5641..83587ea9 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -719,8 +719,8 @@ def loop_over_files_and_generate_results( results = dask_multiprocess( run_submission_and_generate_performance_metrics, func_arguments_list, - # n_workers=2, - # threads_per_worker=1, + # n_workers=4, + threads_per_worker=1, # memory_limit="16GiB", logger=logger, ) From dc4cba5dab1d48bd89fbd32a9190ee03fcb70186 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Thu, 16 May 2024 12:25:58 -0700 Subject: [PATCH 07/10] Refactored dask utility function --- workers/src/utility.py | 232 +++++++++++++++++++++++++---------------- 1 file changed, 143 insertions(+), 89 deletions(-) diff --git a/workers/src/utility.py b/workers/src/utility.py index 64e5ba57..1281a137 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -64,44 +64,55 @@ def multiprocess( return results -def dask_multiprocess( - func: Callable[..., T], - func_arguments: list[tuple[Any, ...]], +def logger_if_able( + message: str, logger: Logger | None = None, level: str = "INFO" +): + if logger is not None: + levels_dict = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, + } + + level = level.upper() + + if level not in levels_dict: + raise Exception(f"Invalid log level: {level}") + + log_level = levels_dict[level] + + logger.log(log_level, message) + else: + print(message) + + +MEMORY_PER_RUN = 7.0 # in GB + + +def set_workers_and_threads( + cpu_count: int | None, + sys_memory: float, + memory_per_run: float | int, n_workers: int | None = None, threads_per_worker: int | None = None, - memory_per_run: float | int | None = None, logger: Logger | None = None, - **kwargs, -) -> T | list[T] | tuple[T, ...]: - - # if n_workers is None: - # n_workers = os.cpu_count() - # if n_workers is None: - # msg = ( - # "Could not determine number of CPUs. Defaulting to 4 workers." - # ) - # if logger: - # logger.warning(msg) - # else: - # print(msg) - # n_workers = 4 - - # if threads_per_worker is None: - # threads_per_worker = None - - # if n_workers is None: - # n_workers = cpu_count - # if n_workers * n_processes > cpu_count: - # raise Exception(f"workers and threads exceed local resources, {cpu_count} cores present") - # if n_workers * memory_limit > sys_memory: - # config.set({'distributed.worker.memory.spill': True}) - # print(f"Memory per worker exceeds system memory ({memory_limit} GB), activating memory spill\n") - - memory_per_run = memory_per_run or 7.0 +) -> Tuple[int, int]: - cpu_count = os.cpu_count() - # memory limit in GB - sys_memory = psutil.virtual_memory().total / (1024.0**3) + def handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ): + if memory_per_run * n_workers * threads_per_worker > sys_memory: + config.set({"distributed.worker.memory.spill": True}) + logger_if_able( + f"Memory per worker exceeds system memory ({memory_per_run} GB), activating memory spill", + logger, + "WARNING", + ) + + total_workers: int = 1 + total_threads: int = 1 if cpu_count is None: raise Exception("Could not determine number of CPUs.") @@ -111,106 +122,149 @@ def dask_multiprocess( raise Exception( f"workers and threads exceed local resources, {cpu_count} cores present" ) - if memory_per_run * n_workers * threads_per_worker > sys_memory: - config.set({"distributed.worker.memory.spill": True}) - print( - f"Memory per worker exceeds system memory ({memory_per_run} GB), activating memory spill\n" - ) + handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ) + total_workers, total_threads = n_workers, threads_per_worker if n_workers is not None and threads_per_worker is None: threads_per_worker = int( math.floor(sys_memory / (memory_per_run * n_workers)) ) if threads_per_worker == 0: - print( - "Not enough memory for a worker, defaulting to 1 thread per worker" + logger_if_able( + "Not enough memory for a worker, defaulting to 1 thread per worker", + logger, + "WARNING", ) threads_per_worker = 1 + handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ) + + total_workers, total_threads = n_workers, threads_per_worker if n_workers is None and threads_per_worker is not None: n_workers = int( math.floor(sys_memory / (memory_per_run * threads_per_worker)) ) if n_workers == 0: - print("Not enough memory for a worker, defaulting to 1 worker") + logger_if_able( + "Not enough memory for a worker, defaulting to 1 worker", + logger, + "WARNING", + ) n_workers = 1 + handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ) + + total_workers, total_threads = n_workers, threads_per_worker if n_workers is None and threads_per_worker is None: - if memory_per_run == 0: - raise Exception("Memory limit cannot be 0") + thread_worker_total = math.floor(sys_memory / memory_per_run) if thread_worker_total < 2: - print( - "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker" + logger_if_able( + "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker", + logger, + "WARNING", ) n_workers = 1 threads_per_worker = 1 - if memory_per_run * n_workers > sys_memory: - config.set({"distributed.worker.memory.spill": True}) - print( - f"Memory per worker exceeds system memory ({memory_per_run} GB), activating memory spill\n" - ) + handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ) + + total_workers, total_threads = n_workers, threads_per_worker + return total_workers, total_threads else: - print(f"thread_worker_total: {thread_worker_total}") + logger_if_able( + f"thread_worker_total: {thread_worker_total}", + logger, + "DEBUG", + ) n_workers = int(math.ceil(thread_worker_total / 2)) threads_per_worker = int(math.floor(thread_worker_total / 2)) if n_workers + threads_per_worker != thread_worker_total: - print( - f"n_workers: {n_workers}, threads_per_worker: {threads_per_worker}, thread_worker_total: {thread_worker_total}" + logger_if_able( + f"n_workers: {n_workers}, threads_per_worker: {threads_per_worker}, thread_worker_total: {thread_worker_total}", + logger, + "INFO", + ) + logger_if_able( + "Could not determine number of workers and threads", + logger, + "ERROR", ) raise Exception( "Could not determine number of workers and threads" ) + handle_exceeded_resources( + n_workers, threads_per_worker, memory_per_run, sys_memory + ) + + total_workers, total_threads = n_workers, threads_per_worker + return total_workers, total_threads + + +def dask_multiprocess( + func: Callable[..., T], + func_arguments: list[tuple[Any, ...]], + n_workers: int | None = None, + threads_per_worker: int | None = None, + memory_per_run: float | int | None = None, + logger: Logger | None = None, + **kwargs, +): + memory_per_run = memory_per_run or MEMORY_PER_RUN + + cpu_count = os.cpu_count() + + sys_memory = psutil.virtual_memory().total / (1024.0**3) # in GB # config.set({"distributed.worker.memory.spill": True}) config.set({"distributed.worker.memory.pause": True}) config.set({"distributed.worker.memory.target": 0.95}) config.set({"distributed.worker.memory.terminate": False}) - if threads_per_worker is None: - threads_per_worker = 1 + total_workers, total_threads = set_workers_and_threads( + cpu_count, + sys_memory, + memory_per_run, + n_workers, + threads_per_worker, + logger, + ) - memory_per_worker = memory_per_run * threads_per_worker + memory_per_worker = memory_per_run * total_threads - print(f"cpu count: {cpu_count}") - print(f"memory: {sys_memory}") - print(f"memory per run: {memory_per_run}") - print(f"n_workers: {n_workers}") - print(f"threads_per_worker: {threads_per_worker}") - print(f"memory per worker: {memory_per_worker}") + logger_if_able(f"cpu count: {cpu_count}", logger, "INFO") + logger_if_able(f"memory: {sys_memory}", logger, "INFO") + logger_if_able(f"memory per run: {memory_per_run}", logger, "INFO") + logger_if_able(f"n_workers: {total_workers}", logger, "INFO") + logger_if_able(f"threads_per_worker: {total_threads}", logger, "INFO") + logger_if_able(f"memory per worker: {memory_per_worker}", logger, "INFO") - client = Client( - n_workers=n_workers, - threads_per_worker=threads_per_worker, + results = [] + + with Client( + n_workers=total_workers, + threads_per_worker=total_threads, memory_limit=f"{memory_per_worker}GiB", **kwargs, - ) - # client = LocalCluster() - - # LocalCluster() - - # if logger is not None: - # print(f"logger name: {logger.name}") - # logger.info(f"Forwarding logging to dask client") - # client.forward_logging(logger.name, level=logging.INFO) - - if logger is not None: - logger.info(f"Created dask client") - logger.info(f"Client: {client}") - else: - print(f"Created dask client") - print(f"Client: {client}") + ) as client: - lazy_results = [] - for args in func_arguments: - lazy_result = delayed(func, pure=True)(*args) - lazy_results.append(lazy_result) + logger_if_able(f"client: {client}", logger, "INFO") - futures = client.compute(lazy_results) + lazy_results = [] + for args in func_arguments: + lazy_result = delayed(func, pure=True)(*args) + lazy_results.append(lazy_result) - results = client.gather(futures) + futures = client.compute(lazy_results) - client.close() + results = client.gather(futures) return results From a9761fbfe2911a411257621d90ba549d9477c330 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Thu, 16 May 2024 16:34:38 -0700 Subject: [PATCH 08/10] Added back in error testing of submission --- workers/src/pvinsight-validation-runner.py | 102 ++++++++++++++------- workers/src/submission_worker.py | 12 ++- workers/src/utility.py | 6 +- 3 files changed, 81 insertions(+), 39 deletions(-) diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index 83587ea9..e11dc7ef 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -16,7 +16,9 @@ This section will be dependent on the type of analysis being run. """ +from email import errors from multiprocessing.spawn import prepare +import re from typing import Any, Callable, cast import pandas as pd import os @@ -440,26 +442,19 @@ def run( # noqa: C901 total_number_of_files = len(file_metadata_df) logger.info(f"total_number_of_files: {total_number_of_files}") - number_of_errors = 0 - - FAILURE_CUTOFF = 3 - - def current_error_rate(number_of_errors: int, index: int): - return (number_of_errors / (index + 1)) * 100 - # Loop through each file and generate predictions - results_list = loop_over_files_and_generate_results( + results_list, number_of_errors = loop_over_files_and_generate_results( file_metadata_df, system_metadata_df, data_dir, config_data, submission_function, function_parameters, - number_of_errors, function_name, performance_metrics, ) + # Convert the results to a pandas dataframe and perform all of the # post-processing in the script results_df = pd.DataFrame(results_list) @@ -574,7 +569,6 @@ def create_function_args_for_file( config_data: dict[str, Any], submission_function: Callable[..., pd.Series], function_parameters: list[str], - number_of_errors: int, function_name: str, performance_metrics: list[str], file_number: int, @@ -602,7 +596,6 @@ def create_function_args_for_file( submission_function, function_parameters, file_metadata_row, - number_of_errors, function_name, performance_metrics, file_number, @@ -618,7 +611,6 @@ def prepare_function_args_for_parallel_processing( config_data: dict[str, Any], submission_function: Callable[..., pd.Series], function_parameters: list[str], - number_of_errors: int, function_name: str, performance_metrics: list[str], ): @@ -648,7 +640,6 @@ def prepare_function_args_for_parallel_processing( config_data, submission_function, function_parameters, - number_of_errors, function_name, performance_metrics, file_number, @@ -699,10 +690,9 @@ def loop_over_files_and_generate_results( config_data: dict[str, Any], submission_function: Callable[..., pd.Series], function_parameters: list[str], - number_of_errors: int, function_name: str, performance_metrics: list[str], -): +) -> tuple[list[dict[str, Any]], int]: func_arguments_list = prepare_function_args_for_parallel_processing( file_metadata_df, @@ -711,20 +701,63 @@ def loop_over_files_and_generate_results( config_data, submission_function, function_parameters, - number_of_errors, function_name, performance_metrics, ) - results = dask_multiprocess( + NUM_FILES_TO_TEST = 3 + + test_func_argument_list, rest_func_argument_list = ( + func_arguments_list[:NUM_FILES_TO_TEST], + func_arguments_list[NUM_FILES_TO_TEST:], + ) + + results: list[dict[str, Any]] = [] + number_of_errors = 0 + + # Test the first two files + logger.info(f"Testing the first {NUM_FILES_TO_TEST} files...") + test_results = dask_multiprocess( + run_submission_and_generate_performance_metrics, + test_func_argument_list, + n_workers=NUM_FILES_TO_TEST, + threads_per_worker=1, + # memory_limit="16GiB", + logger=logger, + ) + errors = [error for _, error in test_results] + number_of_errors += sum(errors) + + if number_of_errors == NUM_FILES_TO_TEST: + logger.error( + f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting." + ) + raise RunnerException( + 7, + f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting.", + ) + + # Test the rest of the files + + logger.info(f"Testing the rest of the files...") + rest_results = dask_multiprocess( run_submission_and_generate_performance_metrics, - func_arguments_list, + rest_func_argument_list, # n_workers=4, threads_per_worker=1, # memory_limit="16GiB", logger=logger, ) - return results + errors = [error for _, error in rest_results] + number_of_errors += sum(errors) + + test_results = [result for result, _ in test_results if result is not None] + rest_results = [result for result, _ in rest_results if result is not None] + + results.extend(test_results) + results.extend(rest_results) + + return results, number_of_errors def generate_performance_metrics_for_submission( @@ -814,6 +847,8 @@ def run_submission_and_generate_performance_metrics( performance_metrics: list[str], file_number: int, ): + + error = False try: logger.info(f"{file_number} - running submission for file {file_name}") # Get file_name, which will be pulled from database or S3 for @@ -831,23 +866,24 @@ def run_submission_and_generate_performance_metrics( file_metadata_row, ) + results_dictionary = generate_performance_metrics_for_submission( + data_outputs, + function_run_time, + file_name, + data_dir, + associated_system_metadata, + config_data, + function_parameters, + number_of_errors, + performance_metrics, + ) + + return results_dictionary, error except Exception as e: logger.error(f"error running function {function_name}: {e}") number_of_errors += 1 - - results_dictionary = generate_performance_metrics_for_submission( - data_outputs, - function_run_time, - file_name, - data_dir, - associated_system_metadata, - config_data, - function_parameters, - number_of_errors, - performance_metrics, - ) - - return results_dictionary + error = True + return None, error def prepare_kwargs_for_submission_function( diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 095247f7..0809ce6b 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -359,6 +359,7 @@ def load_analysis( return analysis_function, function_parameters, file_metadata_df +@timing(verbose=True, logger=logger) def process_submission_message( analysis_id: int, submission_id: int, @@ -666,12 +667,16 @@ def main(): t.start() try: - process_submission_message( + _, execution_time = process_submission_message( int(analysis_id), int(submission_id), int(user_id), submission_filename, ) + + logger.info( + f"Submission Worker took {execution_time:.3f} seconds to process submission_id={submission_id} and analysis_id={analysis_id}" + ) except ( WorkerException, RunnerException, @@ -769,5 +774,6 @@ def main(): # Set to folder where the evaluation scripts are stored logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") _, execution_time = main() - logger.info(f"Submission Worker took {execution_time:.3f} seconds to run") - logger.info(f"Submission Worker finished.") + logger.info( + f"Shutting down Submission Worker. Runtime: {execution_time:.3f} seconds." + ) diff --git a/workers/src/utility.py b/workers/src/utility.py index 1281a137..5ca9f283 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -216,7 +216,7 @@ def dask_multiprocess( memory_per_run: float | int | None = None, logger: Logger | None = None, **kwargs, -): +) -> list[T]: memory_per_run = memory_per_run or MEMORY_PER_RUN cpu_count = os.cpu_count() @@ -246,7 +246,7 @@ def dask_multiprocess( logger_if_able(f"threads_per_worker: {total_threads}", logger, "INFO") logger_if_able(f"memory per worker: {memory_per_worker}", logger, "INFO") - results = [] + results: list[T] = [] with Client( n_workers=total_workers, @@ -264,7 +264,7 @@ def dask_multiprocess( futures = client.compute(lazy_results) - results = client.gather(futures) + results = client.gather(futures) # type: ignore return results From 56c7507c46982ec480fd3936004d4bd8d842e092 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Thu, 16 May 2024 17:17:04 -0700 Subject: [PATCH 09/10] Remove unused variables and error handling in validation runner and submission worker --- workers/src/pvinsight-validation-runner.py | 5 ----- workers/src/submission_worker.py | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index e11dc7ef..8547c30a 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -768,7 +768,6 @@ def generate_performance_metrics_for_submission( associated_metadata: dict[str, Any], config_data: dict[str, Any], function_parameters: list[str], - number_of_errors: int, performance_metrics: list[str], ): # Get the ground truth scalars that we will compare to @@ -792,7 +791,6 @@ def generate_performance_metrics_for_submission( f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}" ) - number_of_errors += 1 raise RunnerException( 100, f"submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}", @@ -842,7 +840,6 @@ def run_submission_and_generate_performance_metrics( submission_function: Callable[..., pd.Series], function_parameters: list[str], file_metadata_row: pd.Series, - number_of_errors: int, function_name: str, performance_metrics: list[str], file_number: int, @@ -874,14 +871,12 @@ def run_submission_and_generate_performance_metrics( associated_system_metadata, config_data, function_parameters, - number_of_errors, performance_metrics, ) return results_dictionary, error except Exception as e: logger.error(f"error running function {function_name}: {e}") - number_of_errors += 1 error = True return None, error diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 0809ce6b..5bfc1e56 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -64,9 +64,6 @@ def update_submission_status( CURRENT_EVALUATION_DIR = os.path.abspath( os.path.join(FILE_DIR, "..", "current_evaluation") ) -logger.info(f"FILE_DIR: {FILE_DIR}") -logger.info(f"LOG_FILE_DIR: {LOG_FILE_DIR}") -logger.info(f"CURRENT_EVALUATION_DIR: {CURRENT_EVALUATION_DIR}") def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): @@ -771,6 +768,9 @@ def main(): # base BASE_TEMP_DIR = tempfile.mkdtemp() + logger.info(f"FILE_DIR: {FILE_DIR}") + logger.info(f"LOG_FILE_DIR: {LOG_FILE_DIR}") + logger.info(f"CURRENT_EVALUATION_DIR: {CURRENT_EVALUATION_DIR}") # Set to folder where the evaluation scripts are stored logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") _, execution_time = main() From 4f9e957b55acde3c65d0b936b4834cd6e6eab769 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Tue, 21 May 2024 23:47:33 -0700 Subject: [PATCH 10/10] Update errorcodes.json with new error messages and added timeout to submission function --- valhub/base/errorcodes.json | 12 +- workers/src/errorcodes.json | 33 +++++ workers/src/pvinsight-validation-runner.py | 91 ++++++++---- workers/src/submission_worker.py | 57 ++++++-- workers/src/utility.py | 161 ++++++++++++++++----- 5 files changed, 275 insertions(+), 79 deletions(-) create mode 100644 workers/src/errorcodes.json diff --git a/valhub/base/errorcodes.json b/valhub/base/errorcodes.json index 16f2ac12..f942646f 100644 --- a/valhub/base/errorcodes.json +++ b/valhub/base/errorcodes.json @@ -6,7 +6,9 @@ "4": "No system metadata returned from API", "5": "Configuration file not found in current evaluation directory", "6": "Required function name not found within submission python file", - "7": "Failure cutoff met for submission evaluation and execution has been terminated" + "7": "Failure cutoff met for submission evaluation and execution has been terminated", + "8": "Submission result length does not match ground truth length", + "500": "Internal server error" }, "wr": { "1": "Error uploading file to s3", @@ -19,9 +21,13 @@ "8": "No file metadata found in API for analysis ID", "9": "Not all ground truth data files found in s3 bucket for analysis", "10": "Not all analytical data files found in s3 bucket for analysis", - "11": "Runner module does not have a 'run' function" + "11": "Runner module does not have a 'run' function", + "12": "Error posting Error Report to API", + "13": "API did not return a valid response", + "500": "Internal server error" }, "sb": { - "1": "" + "1": "Submission function exceeded maximum execution time", + "500": "Internal server error" } } \ No newline at end of file diff --git a/workers/src/errorcodes.json b/workers/src/errorcodes.json new file mode 100644 index 00000000..f942646f --- /dev/null +++ b/workers/src/errorcodes.json @@ -0,0 +1,33 @@ +{ + "op": { + "1": "Submission file is not a valid zipped file", + "2": "Error installing Python submission dependencies", + "3": "Failed to get system metadata information", + "4": "No system metadata returned from API", + "5": "Configuration file not found in current evaluation directory", + "6": "Required function name not found within submission python file", + "7": "Failure cutoff met for submission evaluation and execution has been terminated", + "8": "Submission result length does not match ground truth length", + "500": "Internal server error" + }, + "wr": { + "1": "Error uploading file to s3", + "2": "Error downloading file from s3", + "3": "No files found in s3 bucket for analysis", + "4": "Error updating submission result to API", + "5": "Error updating submission status to API", + "6": "Required evaluation files not found in s3 bucket", + "7": "File metadata for file ID not found in API", + "8": "No file metadata found in API for analysis ID", + "9": "Not all ground truth data files found in s3 bucket for analysis", + "10": "Not all analytical data files found in s3 bucket for analysis", + "11": "Runner module does not have a 'run' function", + "12": "Error posting Error Report to API", + "13": "API did not return a valid response", + "500": "Internal server error" + }, + "sb": { + "1": "Submission function exceeded maximum execution time", + "500": "Internal server error" + } +} \ No newline at end of file diff --git a/workers/src/pvinsight-validation-runner.py b/workers/src/pvinsight-validation-runner.py index 8547c30a..979c22af 100644 --- a/workers/src/pvinsight-validation-runner.py +++ b/workers/src/pvinsight-validation-runner.py @@ -16,9 +16,6 @@ This section will be dependent on the type of analysis being run. """ -from email import errors -from multiprocessing.spawn import prepare -import re from typing import Any, Callable, cast import pandas as pd import os @@ -39,9 +36,14 @@ import boto3 from logger import setup_logging from utility import ( + RUNNER_ERROR_PREFIX, RunnerException, + SubmissionException, dask_multiprocess, + get_error_by_code, + get_error_codes_dict, pull_from_s3, + timeout, timing, is_local, ) @@ -60,6 +62,15 @@ API_BASE_URL = "api:8005" if IS_LOCAL else "api.pv-validation-hub.org" +FILE_DIR = os.path.dirname(os.path.abspath(__file__)) + + +runner_error_codes = get_error_codes_dict( + FILE_DIR, RUNNER_ERROR_PREFIX, logger +) + +SUBMISSION_TIMEOUT = 30 * 60 # seconds + def push_to_s3( local_file_path, @@ -365,8 +376,9 @@ def run( # noqa: C901 logger.error("error installing submission dependencies:", e) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 2 raise RunnerException( - 2, "error installing python submission dependencies" + *get_error_by_code(error_code, runner_error_codes, logger) ) shutil.move( @@ -389,7 +401,10 @@ def run( # noqa: C901 ) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise RunnerException(3, "Failed to get system metadata information") + error_code = 3 + raise RunnerException( + *get_error_by_code(error_code, runner_error_codes, logger) + ) # Convert the responses to DataFrames @@ -402,7 +417,10 @@ def run( # noqa: C901 logger.error("System metadata is empty") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise RunnerException(4, "No system metadata returned from API") + error_code = 4 + raise RunnerException( + *get_error_by_code(error_code, runner_error_codes, logger) + ) # Read in the configuration JSON for the particular run with open(os.path.join(current_evaluation_dir, "config.json")) as f: @@ -410,8 +428,9 @@ def run( # noqa: C901 logger.error("config.json not found") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 5 raise RunnerException( - 5, "config.json not found in current evaluation directory" + *get_error_by_code(error_code, runner_error_codes, logger) ) config_data: dict[str, Any] = json.load(f) @@ -435,8 +454,9 @@ def run( # noqa: C901 ) logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) + error_code = 6 raise RunnerException( - 6, f"function {function_name} not found in module {module_name}" + *get_error_by_code(error_code, runner_error_codes, logger) ) total_number_of_files = len(file_metadata_df) @@ -614,18 +634,6 @@ def prepare_function_args_for_parallel_processing( function_name: str, performance_metrics: list[str], ): - # logger.debug( - # f"index: {index}, FAILURE_CUTOFF: {FAILURE_CUTOFF}, number_of_errors: {number_of_errors}" - # ) - # if index <= FAILURE_CUTOFF: - # if number_of_errors == FAILURE_CUTOFF: - # raise RunnerException( - # 7, - # f"Too many errors ({number_of_errors}) occurred in the first {FAILURE_CUTOFF} files. Exiting.", - # current_error_rate(number_of_errors, index), - # ) - - # logger.info(f"processing file {index + 1} of {total_number_of_files}") function_args_list: list[tuple] = [] @@ -732,22 +740,40 @@ def loop_over_files_and_generate_results( logger.error( f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting." ) + error_code = 7 raise RunnerException( - 7, - f"Too many errors ({number_of_errors}) occurred in the first {NUM_FILES_TO_TEST} files. Exiting.", + *get_error_by_code(error_code, runner_error_codes, logger) ) # Test the rest of the files logger.info(f"Testing the rest of the files...") - rest_results = dask_multiprocess( - run_submission_and_generate_performance_metrics, - rest_func_argument_list, - # n_workers=4, - threads_per_worker=1, - # memory_limit="16GiB", - logger=logger, - ) + rest_results = [] + try: + rest_results = dask_multiprocess( + run_submission_and_generate_performance_metrics, + rest_func_argument_list, + # n_workers=4, + threads_per_worker=1, + # memory_limit="16GiB", + logger=logger, + ) + except SubmissionException as e: + logger.error(f"Submission error: {e}") + raise e + except RunnerException as e: + logger.error(f"Runner error: {e}") + raise e + except Exception as e: + if e.args: + if len(e.args) == 2: + logger.error(f"Submission error: {e}") + + raise RunnerException(*e.args) + logger.error(f"Submission error: {e}") + raise RunnerException( + *get_error_by_code(500, runner_error_codes, logger) + ) errors = [error for _, error in rest_results] number_of_errors += sum(errors) @@ -790,10 +816,10 @@ def generate_performance_metrics_for_submission( logger.error( f"{file_name} submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}" ) + error_code = 8 raise RunnerException( - 100, - f"submission result length {file_submission_result_length} does not match ground truth file length {ground_truth_file_length}", + *get_error_by_code(error_code, runner_error_codes, logger) ) # Convert the data outputs to a dictionary identical to the @@ -832,6 +858,7 @@ def generate_performance_metrics_for_submission( return results_dictionary +@timeout(SUBMISSION_TIMEOUT) def run_submission_and_generate_performance_metrics( file_name: str, data_dir: str, diff --git a/workers/src/submission_worker.py b/workers/src/submission_worker.py index 5bfc1e56..c1f03552 100644 --- a/workers/src/submission_worker.py +++ b/workers/src/submission_worker.py @@ -1,6 +1,5 @@ from importlib import import_module from typing import Any, Callable, Optional -from matplotlib.pylab import f import requests import sys import os @@ -22,6 +21,8 @@ RunnerException, SubmissionException, WorkerException, + get_error_by_code, + get_error_codes_dict, pull_from_s3, timing, is_local, @@ -52,8 +53,10 @@ def update_submission_status( api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/change_submission_status/{submission_id}" r = requests.put(api_route, data={"status": new_status}) if not r.ok: + logger.error(f"Error updating submission status to {new_status}") + error_code = 5 raise WorkerException( - 5, f"Error updating submission status to {new_status}" + *get_error_by_code(error_code, worker_error_codes, logger), ) data: dict[str, Any] = r.json() return data @@ -87,8 +90,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): r = requests.put(s3_file_full_path, data=file_content) logger.info(f"Received S3 emulator response: {r.status_code}") if not r.ok: + logger.error(f"S3 emulator error: {r.content}") + error_code = 1 raise WorkerException( - 1, f"Error uploading file to s3 emulator: {r.content}" + *get_error_by_code(error_code, worker_error_codes, logger), ) return {"status": "success"} else: @@ -99,7 +104,10 @@ def push_to_s3(local_file_path, s3_file_path, analysis_id, submission_id): logger.error(f"Error: {e}") logger.info(f"update submission status to {FAILED}") update_submission_status(analysis_id, submission_id, FAILED) - raise WorkerException(1, f"Error uploading file to s3") + error_code = 1 + raise WorkerException( + *get_error_by_code(error_code, worker_error_codes, logger) + ) def list_s3_bucket(s3_dir: str): @@ -151,8 +159,10 @@ def update_submission_result( api_route = f"http://{API_BASE_URL}/submissions/analysis/{analysis_id}/update_submission_result/{submission_id}" r = requests.put(api_route, json=result_json, headers=headers) if not r.ok: + logger.error(f"Error updating submission result to Django API") + error_code = 4 raise WorkerException( - 4, "Error updating submission result to Django API" + *get_error_by_code(error_code, worker_error_codes, logger), ) data: dict[str, Any] = r.json() return data @@ -231,9 +241,12 @@ def extract_analysis_data( # noqa: C901 ) response = requests.get(fmd_url) if not response.ok: + error_code = 7 + logger.error( + f"File metadata for file id {file_id} not found in Django API" + ) raise requests.exceptions.HTTPError( - 7, - f"File metadata for file id {file_id} not found in Django API", + *get_error_by_code(error_code, worker_error_codes, logger), ) file_metadata_list.append(response.json()) @@ -241,9 +254,12 @@ def extract_analysis_data( # noqa: C901 file_metadata_df = pd.DataFrame(file_metadata_list) if file_metadata_df.empty: + logger.error( + f"File metadata DataFrame is empty for analysis {analysis_id}" + ) + error_code = 8 raise WorkerException( - 8, - f"No file metadata found in Django API for analysis {analysis_id}", + *get_error_by_code(error_code, worker_error_codes, logger), ) files_for_analysis: list[str] = file_metadata_df["file_name"].tolist() @@ -329,6 +345,12 @@ def load_analysis( os.path.join(current_evaluation_dir, "pvinsight-validation-runner.py"), ) + # Copy the error codes file into the current evaluation directory + shutil.copy( + os.path.join("/root/worker/src", "errorcodes.json"), + os.path.join(current_evaluation_dir, "errorcodes.json"), + ) + # import analysis runner as a module sys.path.insert(0, current_evaluation_dir) runner_module_name = "pvinsight-validation-runner" @@ -571,12 +593,20 @@ def post_error_to_api( logger.info(f"Received response: {r.text}") if not r.ok: - raise WorkerException(1, "Error posting error report to Django API") + logger.error("Error posting error report to API") + cur_error_code = 12 + raise WorkerException( + *get_error_by_code(cur_error_code, worker_error_codes, logger) + ) try: data: dict[str, Any] = r.json() except json.JSONDecodeError: - raise WorkerException(1, "Django API did not return a JSON response") + logger.error("Django API did not return a JSON response") + cur_error_code = 13 + raise WorkerException( + *get_error_by_code(cur_error_code, worker_error_codes, logger) + ) return data @@ -773,6 +803,11 @@ def main(): logger.info(f"CURRENT_EVALUATION_DIR: {CURRENT_EVALUATION_DIR}") # Set to folder where the evaluation scripts are stored logger.info(f"BASE_TEMP_DIR: {BASE_TEMP_DIR}") + + worker_error_codes = get_error_codes_dict( + FILE_DIR, WORKER_ERROR_PREFIX, logger + ) + _, execution_time = main() logger.info( f"Shutting down Submission Worker. Runtime: {execution_time:.3f} seconds." diff --git a/workers/src/utility.py b/workers/src/utility.py index 5ca9f283..34a072ce 100644 --- a/workers/src/utility.py +++ b/workers/src/utility.py @@ -1,9 +1,14 @@ -import sys +import json from dask.delayed import delayed from dask.distributed import Client from dask import config -from concurrent.futures import ProcessPoolExecutor, as_completed, thread +from concurrent.futures import ( + ProcessPoolExecutor, + ThreadPoolExecutor, + as_completed, + thread, +) from functools import wraps from logging import Logger from time import perf_counter, sleep, time @@ -13,6 +18,7 @@ import boto3 import botocore.exceptions from distributed import LocalCluster +from matplotlib.pylab import f import psutil import requests import math @@ -22,8 +28,74 @@ RUNNER_ERROR_PREFIX = "op" SUBMISSION_ERROR_PREFIX = "sb" + T = TypeVar("T") +FILE_DIR = os.path.dirname(os.path.abspath(__file__)) + + +def logger_if_able( + message: str, logger: Logger | None = None, level: str = "INFO" +): + if logger is not None: + levels_dict = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, + } + + level = level.upper() + + if level not in levels_dict: + raise Exception(f"Invalid log level: {level}") + + log_level = levels_dict[level] + + logger.log(log_level, message) + else: + print(message) + + +def get_error_codes_dict( + dir: str, prefix: str, logger: Logger | None +) -> dict[str, str]: + try: + with open( + os.path.join(dir, "errorcodes.json"), "r" + ) as error_codes_file: + error_codes = json.load(error_codes_file) + if prefix not in error_codes: + logger_if_able( + f"Error prefix {prefix} not found in error codes file", + logger, + "ERROR", + ) + + raise KeyError( + f"Error prefix {prefix} not found in error codes file" + ) + return error_codes[prefix] + except FileNotFoundError: + logger_if_able("Error codes file not found", logger, "ERROR") + raise FileNotFoundError("Error codes file not found") + except KeyError: + logger_if_able( + f"Error prefix {prefix} not found in error codes file", + logger, + "ERROR", + ) + raise KeyError(f"Error prefix {prefix} not found in error codes file") + except Exception as e: + logger_if_able(f"Error loading error codes: {e}", logger, "ERROR") + raise e + + +submission_error_codes = get_error_codes_dict( + FILE_DIR, SUBMISSION_ERROR_PREFIX, None +) + def timing(verbose: bool = True, logger: Union[Logger, None] = None): @wraps(timing) @@ -38,10 +110,7 @@ def wrapper(*args, **kwargs) -> Tuple[T, float]: msg = ( f"{func.__name__} took {execution_time:.3f} seconds to run" ) - if logger: - logger.info(msg) - else: - print(msg) + logger_if_able(msg, logger) return result, execution_time return wrapper @@ -64,31 +133,25 @@ def multiprocess( return results -def logger_if_able( - message: str, logger: Logger | None = None, level: str = "INFO" -): - if logger is not None: - levels_dict = { - "DEBUG": logging.DEBUG, - "INFO": logging.INFO, - "WARNING": logging.WARNING, - "ERROR": logging.ERROR, - "CRITICAL": logging.CRITICAL, - } - - level = level.upper() - - if level not in levels_dict: - raise Exception(f"Invalid log level: {level}") - - log_level = levels_dict[level] - - logger.log(log_level, message) - else: - print(message) +def timeout(seconds: int, logger: Union[Logger, None] = None): + def decorator(func: Callable[..., T]): + @wraps(func) + def wrapper(*args, **kwargs) -> T: + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(func, *args, **kwargs) + try: + return future.result(timeout=seconds) + except TimeoutError: + error_code = 1 + raise SubmissionException( + *get_error_by_code( + error_code, submission_error_codes, logger + ) + ) + return wrapper -MEMORY_PER_RUN = 7.0 # in GB + return decorator def set_workers_and_threads( @@ -115,12 +178,13 @@ def handle_exceeded_resources( total_threads: int = 1 if cpu_count is None: - raise Exception("Could not determine number of CPUs.") + raise Exception(10, "Could not determine number of CPUs.") if n_workers is not None and threads_per_worker is not None: if n_workers * threads_per_worker > cpu_count: raise Exception( - f"workers and threads exceed local resources, {cpu_count} cores present" + 9, + f"workers and threads exceed local resources, {cpu_count} cores present", ) handle_exceeded_resources( n_workers, threads_per_worker, memory_per_run, sys_memory @@ -164,7 +228,7 @@ def handle_exceeded_resources( if n_workers is None and threads_per_worker is None: thread_worker_total = math.floor(sys_memory / memory_per_run) - if thread_worker_total < 2: + if thread_worker_total < 1: logger_if_able( "Not enough memory for a worker, defaulting to 1 worker and 1 thread per worker", logger, @@ -198,13 +262,24 @@ def handle_exceeded_resources( "ERROR", ) raise Exception( - "Could not determine number of workers and threads" + 9, "Could not determine number of workers and threads" ) handle_exceeded_resources( n_workers, threads_per_worker, memory_per_run, sys_memory ) total_workers, total_threads = n_workers, threads_per_worker + + while total_workers * total_threads > cpu_count: + if total_workers > 1: + total_workers -= 1 + elif total_threads > 1: + total_threads -= 1 + else: + raise Exception( + 9, "Could not determine number of workers and threads" + ) + return total_workers, total_threads @@ -217,6 +292,9 @@ def dask_multiprocess( logger: Logger | None = None, **kwargs, ) -> list[T]: + + MEMORY_PER_RUN = 7.0 # in GB + memory_per_run = memory_per_run or MEMORY_PER_RUN cpu_count = os.cpu_count() @@ -364,6 +442,23 @@ def pull_from_s3( return target_file_path +def get_error_by_code( + error_code: int, error_codes_dict: dict[str, str], logger: Logger | None +) -> tuple[int, str]: + if error_codes_dict is None: + logger_if_able("Error codes dictionary is None", logger, "ERROR") + raise ValueError("Error codes dictionary is None") + error_code_str = str(error_code) + if error_code_str not in error_codes_dict: + logger_if_able( + f"Error code {error_code} not found in error codes", + logger, + "ERROR", + ) + raise KeyError(f"Error code {error_code} not found in error codes") + return error_code, error_codes_dict[error_code_str] + + if __name__ == "__main__": def expensive_function(x):