diff --git a/src/osa/job.py b/src/osa/job.py index 410e3e7c..b151a69e 100644 --- a/src/osa/job.py +++ b/src/osa/job.py @@ -702,6 +702,37 @@ def get_sacct_output(sacct_output: StringIO) -> pd.DataFrame: return sacct_output +def get_closer_sacct_output(sacct_output) -> pd.DataFrame: + """ + Fetch the information of jobs in the queue launched by AUTOCLOSER using the sacct + SLURM output and store it in a pandas dataframe. + + Returns + ------- + queue_list: pd.DataFrame + """ + sacct_output = pd.read_csv(sacct_output, names=FORMAT_SLURM) + + # Keep only the jobs corresponding to AUTOCLOSER sequences + # Until the merging of muon files is fixed, check all jobs except "lstchain_merge_muon_files" + sacct_output = sacct_output[ + (sacct_output["JobName"].str.contains("lstchain_merge_hdf5_files")) + | (sacct_output["JobName"].str.contains("lstchain_check_dl1")) + | (sacct_output["JobName"].str.contains("lstchain_longterm_dl1_check")) + | (sacct_output["JobName"].str.contains("lstchain_cherenkov_transparency")) + | (sacct_output["JobName"].str.contains("provproces")) + ] + + try: + sacct_output["JobID"] = sacct_output["JobID"].apply(lambda x: x.split("_")[0]) + sacct_output["JobID"] = sacct_output["JobID"].str.strip(".batch").astype(int) + + except AttributeError: + log.debug("No job info could be obtained from sacct") + + return sacct_output + + def filter_jobs(job_info: pd.DataFrame, sequence_list: Iterable): """Filter the job info list to get the values of the jobs in the current queue.""" sequences_info = pd.DataFrame([vars(seq) for seq in sequence_list]) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index d54f0510..7c98584c 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -8,6 +8,7 @@ import shutil import subprocess import sys +import time from datetime import datetime, timedelta from pathlib import Path from typing import Tuple, Iterable, List @@ -15,7 +16,12 @@ from osa import osadb from osa.configs import options from osa.configs.config import cfg -from osa.job import are_all_jobs_correctly_finished, save_job_information +from osa.job import ( + are_all_jobs_correctly_finished, + save_job_information, + run_sacct, + get_closer_sacct_output +) from osa.nightsummary.extract import extract_runs, extract_sequences from osa.nightsummary.nightsummary import run_summary_table from osa.paths import destination_dir, create_longterm_symlink @@ -24,6 +30,7 @@ from osa.utils.cliopts import closercliparsing from osa.utils.logging import myLogger from osa.utils.register import register_found_pattern +from osa.utils.mail import send_warning_mail from osa.utils.utils import ( night_finished_flag, is_day_closed, @@ -172,6 +179,24 @@ def post_process(seq_tuple): if not options.no_dl2: merge_files(seq_list, data_level="DL2") + time.sleep(600) + + # Check if all jobs launched by autocloser finished correctly + # before creating the NightFinished.txt file + n_max = 6 + n = 0 + while not all_closer_jobs_finished_correctly() & n <= n_max: + log.info( + "All jobs launched by autocloser did not finished correctly yet. " + "Checking again in 10 minutes..." + ) + time.sleep(600) + n += 1 + + if n > n_max: + send_warning_mail(date=options.date) + return False + if options.seqtoclose is None: database = cfg.get("database", "path") if database: @@ -489,7 +514,13 @@ def daily_datacheck(cmd: List[str]): log.debug(f"Executing {stringify(cmd)}") if not options.simulate and not options.test and shutil.which("sbatch") is not None: - job = subprocess.run(cmd, check=True) + job = subprocess.run( + cmd, + encoding="utf-8", + capture_output=True, + text=True, + check=True, + ) job_id = job.stdout.strip() return job_id else: @@ -527,7 +558,14 @@ def cherenkov_transparency(cmd: List[str]): log.debug("Simulate launching scripts") - +def all_closer_jobs_finished_correctly(): + """Check if all the jobs launched by autocloser finished correctly.""" + sacct_output = run_sacct() + jobs_closer = get_closer_sacct_output(sacct_output) + if len(jobs_closer[jobs_closer["State"]!="COMPLETED"])==0: + return True + else: + return False if __name__ == "__main__":