Skip to content

Commit

Permalink
Merge pull request #279 from cta-observatory/link_datacheck
Browse files Browse the repository at this point in the history
Wait until the longterm datacheck file is created to make the link
  • Loading branch information
morcuended authored Feb 26, 2024
2 parents 5eb79d5 + 2d6a529 commit 100685b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 28 deletions.
47 changes: 40 additions & 7 deletions src/osa/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datetime import datetime
from pathlib import Path
from typing import List
import subprocess
import time

import lstchain
from astropy.table import Table
Expand Down Expand Up @@ -359,15 +361,46 @@ def get_latest_version_file(longterm_files: List[str]) -> Path:
)


def create_longterm_symlink():
def is_job_completed(job_id: str):
"""
Check whether SLURM job `job_id` has finished.
It keeps checking every 10 minutes for one our.
"""
n_max = 10
n = 0
while n < n_max:
# Check if the status of the SLURM job is "COMPLETED"
status = subprocess.run(["sacct", "--format=state", "--jobs", job_id], capture_output=True, text=True)
if "COMPLETED" in status.stdout:
log.debug(f"Job {job_id} finished successfully!")
return True
n += 1
log.debug(f"Job {job_id} is not completed yet, checking again in 10 minutes...")
time.sleep(600) # wait 10 minutes to check again
log.info(f"The maximum number of checks of job {job_id} was reached, job {job_id} did not finish succesfully yet.")
return False


def create_longterm_symlink(cherenkov_job_id: str = None):
"""If the created longterm DL1 datacheck file corresponds to the latest
version available, make symlink to it in the "all" common directory."""
if not cherenkov_job_id or is_job_completed(cherenkov_job_id):
nightdir = utils.date_to_dir(options.date)
longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR"))
linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5"
all_longterm_files = longterm_dir.rglob(f"v*/{nightdir}/DL1_datacheck_{nightdir}.h5")
latest_version_file = get_latest_version_file(all_longterm_files)
log.info("Symlink the latest version longterm DL1 datacheck file in the common directory.")
linked_longterm_file.unlink(missing_ok=True)
linked_longterm_file.symlink_to(latest_version_file)
else:
log.warning(f"Job {cherenkov_job_id} (lstchain_cherenkov_transparency) did not finish successfully.")

def dl1_datacheck_longterm_file_exits() -> bool:
"""Return true if the longterm DL1 datacheck file was already produced."""
nightdir = utils.date_to_dir(options.date)
longterm_dir = Path(cfg.get("LST1", "LONGTERM_DIR"))
linked_longterm_file = longterm_dir / f"night_wise/all/DL1_datacheck_{nightdir}.h5"
all_longterm_files = longterm_dir.rglob(f"v*/{nightdir}/DL1_datacheck_{nightdir}.h5")
latest_version_file = get_latest_version_file(all_longterm_files)
longterm_file = longterm_dir / options.prod_id / nightdir / f"DL1_datacheck_{nightdir}.h5"
return longterm_file.exists()

log.info("Symlink the latest version longterm DL1 datacheck file in the common directory.")
linked_longterm_file.unlink(missing_ok=True)
linked_longterm_file.symlink_to(latest_version_file)
62 changes: 41 additions & 21 deletions src/osa/scripts/closer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
)
from osa.nightsummary.extract import extract_runs, extract_sequences
from osa.nightsummary.nightsummary import run_summary_table
from osa.paths import destination_dir, create_longterm_symlink
from osa.paths import (
destination_dir,
create_longterm_symlink,
dl1_datacheck_longterm_file_exits
)
from osa.raw import is_raw_data_available
from osa.report import start
from osa.utils.cliopts import closercliparsing
Expand Down Expand Up @@ -154,32 +158,38 @@ def ask_for_closing():
def post_process(seq_tuple):
"""Set of last instructions."""
seq_list = seq_tuple[1]

if dl1_datacheck_longterm_file_exits() and not options.test:
create_longterm_symlink()

# Close the sequences
post_process_files(seq_list)
else:
# Close the sequences
post_process_files(seq_list)

# Merge DL1 datacheck files and produce PDFs. It also produces
# the daily datacheck report using the longterm script, and updates
# the longterm DL1 datacheck file with the cherenkov_transparency script.
if cfg.getboolean("lstchain", "merge_dl1_datacheck"):
list_job_id = merge_dl1_datacheck(seq_list)
longterm_job_id = daily_datacheck(daily_longterm_cmd(list_job_id))
cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id))
create_longterm_symlink()
# Merge DL1 datacheck files and produce PDFs. It also produces
# the daily datacheck report using the longterm script, and updates
# the longterm DL1 datacheck file with the cherenkov_transparency script.
if cfg.getboolean("lstchain", "merge_dl1_datacheck"):
list_job_id = merge_dl1_datacheck(seq_list)
longterm_job_id = daily_datacheck(daily_longterm_cmd(list_job_id))
cherenkov_job_id = cherenkov_transparency(cherenkov_transparency_cmd(longterm_job_id))
create_longterm_symlink(cherenkov_job_id)

# Extract the provenance info
extract_provenance(seq_list)
# Extract the provenance info
extract_provenance(seq_list)

# Merge DL1b files run-wise
merge_files(seq_list, data_level="DL1AB")
# Merge DL1b files run-wise
merge_files(seq_list, data_level="DL1AB")

merge_muon_files(seq_list)
merge_muon_files(seq_list)

# Merge DL2 files run-wise
if not options.no_dl2:
merge_files(seq_list, data_level="DL2")
# Merge DL2 files run-wise
if not options.no_dl2:
merge_files(seq_list, data_level="DL2")


time.sleep(600)

time.sleep(600)

# Check if all jobs launched by autocloser finished correctly
# before creating the NightFinished.txt file
Expand Down Expand Up @@ -536,6 +546,7 @@ def cherenkov_transparency_cmd(longterm_job_id: str) -> List[str]:

return [
"sbatch",
"--parsable",
"-D",
options.directory,
"-o",
Expand All @@ -553,7 +564,16 @@ def cherenkov_transparency(cmd: List[str]):
log.debug(f"Executing {stringify(cmd)}")

if not options.simulate and not options.test and shutil.which("sbatch") is not None:
subprocess.run(cmd, check=True)
job = subprocess.run(
cmd,
encoding="utf-8",
capture_output=True,
text=True,
check=True,
)
job_id = job.stdout.strip()
return job_id

else:
log.debug("Simulate launching scripts")

Expand Down

0 comments on commit 100685b

Please sign in to comment.