From 42110fab6dbf6be23dfa5d47d8974702d22b126b Mon Sep 17 00:00:00 2001 From: Daniel Morcuende Date: Wed, 21 Feb 2024 13:49:38 +0000 Subject: [PATCH 1/7] Wait until the longterm datacheck file is created to make the link --- src/osa/paths.py | 42 +++++++++++++++++++++++++++++---------- src/osa/scripts/closer.py | 17 +++++++++++++--- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/osa/paths.py b/src/osa/paths.py index 09c03776..f4373e67 100644 --- a/src/osa/paths.py +++ b/src/osa/paths.py @@ -5,6 +5,8 @@ from datetime import datetime from pathlib import Path from typing import List +import subprocess +import time import lstchain from astropy.table import Table @@ -359,15 +361,35 @@ def get_latest_version_file(longterm_files: List[str]) -> Path: ) -def create_longterm_symlink(): +def wait_for_job_completion(job_id: str): + """Wait until the SLURM job corresponding to job_id finishes.""" + n_max = 10 + n = 0 + while n < n_max: + # Check if the status of the SLURM job is "COMPLETED" + status = subprocess.run(["sacct", "--format=state", "--jobs", job_id], capture_output=True, text=True) + if "COMPLETED" in status.stdout: + log.debug(f"Job {job_id} finished successfully!") + return True + n += 1 + log.debug(f"Job {job_id} is not completed yet, checking again in 10 minutes...") + time.sleep(600) # wait 10 minutes to check again + log.info(f"The maximum number of checks of job {job_id} was reached, job {job_id} did not finish succesfully.") + return False + + +def create_longterm_symlink(cherenkov_job_id: str): """If the created longterm DL1 datacheck file corresponds to the latest version available, make symlink to it in the "all" common directory.""" - nightdir = utils.date_to_dir(options.date) - longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) - linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5" - all_longterm_files = longterm_dir.rglob(f"v*/{nightdir}/DL1_datacheck_{nightdir}.h5") - latest_version_file = get_latest_version_file(all_longterm_files) - - log.info("Symlink the latest version longterm DL1 datacheck file in the common directory.") - linked_longterm_file.unlink(missing_ok=True) - linked_longterm_file.symlink_to(latest_version_file) + if wait_for_job_completion(cherenkov_job_id): + nightdir = utils.date_to_dir(options.date) + longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) + linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5" + all_longterm_files = longterm_dir.rglob(f"v*/{nightdir}/DL1_datacheck_{nightdir}.h5") + latest_version_file = get_latest_version_file(all_longterm_files) + + log.info("Symlink the latest version longterm DL1 datacheck file in the common directory.") + linked_longterm_file.unlink(missing_ok=True) + linked_longterm_file.symlink_to(latest_version_file) + else: + log.warning(f"Job {cherenkov_job_id} (lstchain_cherenkov_transparency) did not finish successfully.") diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index 7c98584c..e08e1783 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -164,8 +164,8 @@ def post_process(seq_tuple): if cfg.getboolean("lstchain", "merge_dl1_datacheck"): list_job_id = merge_dl1_datacheck(seq_list) longterm_job_id = daily_datacheck(daily_longterm_cmd(list_job_id)) - cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id)) - create_longterm_symlink() + cherenkov_job_id = cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id)) + create_longterm_symlink(cherenkov_job_id) # Extract the provenance info extract_provenance(seq_list) @@ -179,6 +179,7 @@ def post_process(seq_tuple): if not options.no_dl2: merge_files(seq_list, data_level="DL2") + time.sleep(600) # Check if all jobs launched by autocloser finished correctly @@ -536,6 +537,7 @@ def cherenkov_transparency_cmd(longterm_job_id: str) -> List[str]: return [ "sbatch", + "--parsable", "-D", options.directory, "-o", @@ -553,7 +555,16 @@ def cherenkov_transparency(cmd: List[str]): log.debug(f"Executing {stringify(cmd)}") if not options.simulate and not options.test and shutil.which("sbatch") is not None: - subprocess.run(cmd, check=True) + job = subprocess.run( + cmd, + encoding="utf-8", + capture_output=True, + text=True, + check=True, + ) + job_id = job.stdout.strip() + return job_id + else: log.debug("Simulate launching scripts") From bdb4c871d5b903db8fbb20e847294e87e6c8c078 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 22 Feb 2024 13:58:06 +0100 Subject: [PATCH 2/7] Rename wait_for_job_completion function --- src/osa/paths.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/osa/paths.py b/src/osa/paths.py index f4373e67..6c68b737 100644 --- a/src/osa/paths.py +++ b/src/osa/paths.py @@ -361,7 +361,7 @@ def get_latest_version_file(longterm_files: List[str]) -> Path: ) -def wait_for_job_completion(job_id: str): +def is_job_completed(job_id: str): """Wait until the SLURM job corresponding to job_id finishes.""" n_max = 10 n = 0 @@ -381,7 +381,7 @@ def wait_for_job_completion(job_id: str): def create_longterm_symlink(cherenkov_job_id: str): """If the created longterm DL1 datacheck file corresponds to the latest version available, make symlink to it in the "all" common directory.""" - if wait_for_job_completion(cherenkov_job_id): + if is_job_completed(cherenkov_job_id): nightdir = utils.date_to_dir(options.date) longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5" From 8b49ab1cdbb293cd9ed6fe114fea99af0068741d Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 22 Feb 2024 14:26:16 +0100 Subject: [PATCH 3/7] Check if the longterm file exists before launching again all the jobs --- src/osa/paths.py | 13 ++++++++--- src/osa/scripts/closer.py | 49 +++++++++++++++++++++++---------------- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/osa/paths.py b/src/osa/paths.py index 6c68b737..3c111c6b 100644 --- a/src/osa/paths.py +++ b/src/osa/paths.py @@ -378,18 +378,25 @@ def is_job_completed(job_id: str): return False -def create_longterm_symlink(cherenkov_job_id: str): +def create_longterm_symlink(cherenkov_job_id: str = None): """If the created longterm DL1 datacheck file corresponds to the latest version available, make symlink to it in the "all" common directory.""" - if is_job_completed(cherenkov_job_id): + if not cherenkov_job_id or is_job_completed(cherenkov_job_id): nightdir = utils.date_to_dir(options.date) longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5" all_longterm_files = longterm_dir.rglob(f"v*/{nightdir}/DL1_datacheck_{nightdir}.h5") latest_version_file = get_latest_version_file(all_longterm_files) - log.info("Symlink the latest version longterm DL1 datacheck file in the common directory.") linked_longterm_file.unlink(missing_ok=True) linked_longterm_file.symlink_to(latest_version_file) else: log.warning(f"Job {cherenkov_job_id} (lstchain_cherenkov_transparency) did not finish successfully.") + +def dl1_datacheck_longterm_file_exits(): -> bool + """Return true if the longterm DL1 datacheck file was already produced.""" + nightdir = utils.date_to_dir(options.date) + longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) + longterm_file = longterm_dir / options.prod_id / nightdir / f"DL1_datacheck_{nightdir}.h5" + return longterm_file.exists() + diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index e08e1783..a405a75b 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -24,7 +24,11 @@ ) from osa.nightsummary.extract import extract_runs, extract_sequences from osa.nightsummary.nightsummary import run_summary_table -from osa.paths import destination_dir, create_longterm_symlink +from osa.paths import ( + destination_dir, + create_longterm_symlink, + dl1_datacheck_longterm_file_exits +) from osa.raw import is_raw_data_available from osa.report import start from osa.utils.cliopts import closercliparsing @@ -155,32 +159,37 @@ def post_process(seq_tuple): """Set of last instructions.""" seq_list = seq_tuple[1] - # Close the sequences - post_process_files(seq_list) + if dl1_datacheck_longterm_file_exits(): + create_longterm_symlink() - # Merge DL1 datacheck files and produce PDFs. It also produces - # the daily datacheck report using the longterm script, and updates - # the longterm DL1 datacheck file with the cherenkov_transparency script. - if cfg.getboolean("lstchain", "merge_dl1_datacheck"): - list_job_id = merge_dl1_datacheck(seq_list) - longterm_job_id = daily_datacheck(daily_longterm_cmd(list_job_id)) - cherenkov_job_id = cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id)) - create_longterm_symlink(cherenkov_job_id) + else: + # Close the sequences + post_process_files(seq_list) - # Extract the provenance info - extract_provenance(seq_list) + # Merge DL1 datacheck files and produce PDFs. It also produces + # the daily datacheck report using the longterm script, and updates + # the longterm DL1 datacheck file with the cherenkov_transparency script. + if cfg.getboolean("lstchain", "merge_dl1_datacheck"): + list_job_id = merge_dl1_datacheck(seq_list) + longterm_job_id = daily_datacheck(daily_longterm_cmd(list_job_id)) + cherenkov_job_id = cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id)) + create_longterm_symlink(cherenkov_job_id) - # Merge DL1b files run-wise - merge_files(seq_list, data_level="DL1AB") + # Extract the provenance info + extract_provenance(seq_list) - merge_muon_files(seq_list) + # Merge DL1b files run-wise + merge_files(seq_list, data_level="DL1AB") - # Merge DL2 files run-wise - if not options.no_dl2: - merge_files(seq_list, data_level="DL2") + merge_muon_files(seq_list) + # Merge DL2 files run-wise + if not options.no_dl2: + merge_files(seq_list, data_level="DL2") + + + time.sleep(600) - time.sleep(600) # Check if all jobs launched by autocloser finished correctly # before creating the NightFinished.txt file From e5cd069ef2f147d13e6219bf0ac33dc0fadb323b Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 22 Feb 2024 14:32:48 +0100 Subject: [PATCH 4/7] Fix mistake --- src/osa/paths.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/osa/paths.py b/src/osa/paths.py index 3c111c6b..98532c86 100644 --- a/src/osa/paths.py +++ b/src/osa/paths.py @@ -393,7 +393,7 @@ def create_longterm_symlink(cherenkov_job_id: str = None): else: log.warning(f"Job {cherenkov_job_id} (lstchain_cherenkov_transparency) did not finish successfully.") -def dl1_datacheck_longterm_file_exits(): -> bool +def dl1_datacheck_longterm_file_exits() -> bool: """Return true if the longterm DL1 datacheck file was already produced.""" nightdir = utils.date_to_dir(options.date) longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR")) From 69f0a237050a02fa107891e8ed662a183898b097 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 26 Feb 2024 17:32:42 +0100 Subject: [PATCH 5/7] Fix tests --- src/osa/scripts/closer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index a405a75b..7b5a4d4b 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -159,7 +159,9 @@ def post_process(seq_tuple): """Set of last instructions.""" seq_list = seq_tuple[1] - if dl1_datacheck_longterm_file_exits(): + a=False + + if dl1_datacheck_longterm_file_exits() and not options.test: create_longterm_symlink() else: From 1df295c6725a566995a9d5d466acebb38afb2fcd Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 26 Feb 2024 17:33:51 +0100 Subject: [PATCH 6/7] Remove forgotten line --- src/osa/scripts/closer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index 7b5a4d4b..8274b6a3 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -158,8 +158,6 @@ def ask_for_closing(): def post_process(seq_tuple): """Set of last instructions.""" seq_list = seq_tuple[1] - - a=False if dl1_datacheck_longterm_file_exits() and not options.test: create_longterm_symlink() From 2d6a529bb4226b5c9dbaccbc2de8482569c2622c Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 26 Feb 2024 18:14:17 +0100 Subject: [PATCH 7/7] Change docstrings --- src/osa/paths.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/osa/paths.py b/src/osa/paths.py index 98532c86..377a4e6e 100644 --- a/src/osa/paths.py +++ b/src/osa/paths.py @@ -362,7 +362,11 @@ def get_latest_version_file(longterm_files: List[str]) -> Path: def is_job_completed(job_id: str): - """Wait until the SLURM job corresponding to job_id finishes.""" + """ + Check whether SLURM job `job_id` has finished. + + It keeps checking every 10 minutes for one our. + """ n_max = 10 n = 0 while n < n_max: @@ -374,7 +378,7 @@ def is_job_completed(job_id: str): n += 1 log.debug(f"Job {job_id} is not completed yet, checking again in 10 minutes...") time.sleep(600) # wait 10 minutes to check again - log.info(f"The maximum number of checks of job {job_id} was reached, job {job_id} did not finish succesfully.") + log.info(f"The maximum number of checks of job {job_id} was reached, job {job_id} did not finish succesfully yet.") return False