Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #35 and Pruning Patch #36

Merged
merged 1 commit into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
data832_raw_path=alcf_raw_path,
data832_scratch_path_tiff=f"{scratch_path_tiff}",
data832_scratch_path_zarr=f"{scratch_path_zarr}",
one_minute=True
one_minute=True,
config=mock_config
)
assert isinstance(result, list), "Result should be a list"
assert result == [True, True, True, True, True], "Result does not match expected values"
Expand Down
149 changes: 85 additions & 64 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ def transfer_data_to_data832(


@task(name="schedule_prune_task")
def schedule_prune_task(path: str, location: str, schedule_days: datetime.timedelta) -> bool:
def schedule_prune_task(path: str,
location: str,
schedule_days: datetime.timedelta,
source_endpoint=None,
check_endpoint=None) -> bool:
"""
Schedules a Prefect flow to prune files from a specified location.

Expand All @@ -192,9 +196,13 @@ def schedule_prune_task(path: str, location: str, schedule_days: datetime.timede
try:
flow_name = f"delete {location}: {Path(path).name}"
schedule_prefect_flow(
deploymnent_name=f"prune_{location}/prune_{location}",
deployment_name=f"prune_{location}/prune_{location}",
flow_run_name=flow_name,
parameters={"relative_path": path},
parameters={
"relative_path": path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint
},
duration_from_now=schedule_days
)
return True
Expand All @@ -214,7 +222,8 @@ def schedule_pruning(
data832_raw_path: str = None,
data832_scratch_path_tiff: str = None,
data832_scratch_path_zarr: str = None,
one_minute: bool = False) -> bool:
one_minute: bool = False,
config=None) -> bool:
"""
This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832.

Expand All @@ -240,21 +249,21 @@ def schedule_pruning(
nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"])
data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"])

# (path, location, days)
# (path, location, days, source_endpoint, check_endpoint)
delete_schedules = [
(alcf_raw_path, "alcf832_raw", alcf_delay),
(alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay),
(alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay),
(nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay),
(nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay),
(data832_raw_path, "data832_raw", data832_delay),
(data832_scratch_path_tiff, "data832_scratch", data832_delay),
(data832_scratch_path_zarr, "data832_scratch", data832_delay)
(alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw),
(alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch),
(alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch),
(nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None),
(nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None),
(data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None),
(data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None),
(data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None)
]

for path, location, days in delete_schedules:
for path, location, days, source_endpoint, check_endpoint in delete_schedules:
if path:
schedule_prune_task(path, location, days)
schedule_prune_task(path, location, days, source_endpoint, check_endpoint)
logger.info(f"Scheduled delete from {location} at {days} days")
else:
logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.")
Expand Down Expand Up @@ -570,55 +579,66 @@ def process_new_832_ALCF_flow(folder_name: str,
logger.info(f"Transfer status: {alcf_transfer_success}")
if not alcf_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
raise ValueError("Transfer to ALCF Failed")
else:
logger.info("Transfer successful.")

# Step 2A: Run the Tomopy Reconstruction Globus Flow
logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF")
alcf_reconstruction_success = alcf_tomopy_reconstruction_flow(raw_path=alcf_raw_path,
scratch_path=alcf_scratch_path,
folder_name=folder_name,
file_name=h5_file_name)
if not alcf_reconstruction_success:
logger.error("Reconstruction Failed.")
else:
logger.info("Reconstruction Successful.")

# Step 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Running Tiff to Zarr on {file_name} at ALCF")
raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}"
tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/"
alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow(raw_path=raw_path,
tiff_scratch_path=tiff_scratch_path)
if not alcf_tiff_to_zarr_success:
logger.error("Tiff to Zarr Failed.")
else:
logger.info("Tiff to Zarr Successful.")

# Step 3: Send reconstructed data (tiffs and zarr) to data832
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_tiff}")
data832_tiff_transfer_success = transfer_data_to_data832(scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_tiff_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_zarr}")
data832_zarr_transfer_success = transfer_data_to_data832(scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_zarr_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")
logger.info("Transfer to ALCF Successful.")

# Step 2A: Run the Tomopy Reconstruction Globus Flow
logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF")
alcf_reconstruction_success = alcf_tomopy_reconstruction_flow(
raw_path=alcf_raw_path,
scratch_path=alcf_scratch_path,
folder_name=folder_name,
file_name=h5_file_name)
if not alcf_reconstruction_success:
logger.error("Reconstruction Failed.")
raise ValueError("Reconstruction at ALCF Failed")
else:
logger.info("Reconstruction Successful.")

# Step 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Running Tiff to Zarr on {file_name} at ALCF")
raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}"
tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/"
alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow(
raw_path=raw_path,
tiff_scratch_path=tiff_scratch_path)
if not alcf_tiff_to_zarr_success:
logger.error("Tiff to Zarr Failed.")
raise ValueError("Tiff to Zarr at ALCF Failed")
else:
logger.info("Tiff to Zarr Successful.")

if alcf_reconstruction_success:
# Step 3: Send reconstructed data (tiffs and zarr) to data832
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} "
f"at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_tiff}")
data832_tiff_transfer_success = transfer_data_to_data832(
scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_tiff_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

if alcf_tiff_to_zarr_success:
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} "
f"at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_zarr}")
data832_zarr_transfer_success = transfer_data_to_data832(
scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_zarr_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Step 4: Schedule deletion of files from ALCF, NERSC, and data832
logger.info("Scheduling deletion of files from ALCF, NERSC, and data832")
Expand All @@ -633,7 +653,8 @@ def process_new_832_ALCF_flow(folder_name: str,
data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None,
data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None,
one_minute=True # Set to False for production durations
one_minute=True, # Set to False for production durations
config=config
)

# Step 5: ingest into scicat ... todo
Expand Down
13 changes: 11 additions & 2 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ def process_new_832_file(file_path: str,
schedule_prefect_flow(
"prune_spot832/prune_spot832",
flow_name,
{"relative_path": relative_path},
{
"relative_path": relative_path,
"source_endpoint": config.spot832,
"check_endpoint": config.data832,
},

datetime.timedelta(days=schedule_spot832_delete_days),
)
logger.info(
Expand All @@ -151,7 +156,11 @@ def process_new_832_file(file_path: str,
schedule_prefect_flow(
"prune_data832/prune_data832",
flow_name,
{"relative_path": relative_path},
{
"relative_path": relative_path,
"source_endpoint": config.data832,
"check_endpoint": config.nersc832,
},
datetime.timedelta(days=schedule_data832_delete_days),
)
logger.info(
Expand Down
111 changes: 82 additions & 29 deletions orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def prune_files(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint = None,
config=None
):
"""
Prune files from a source endpoint.
Expand All @@ -23,7 +24,9 @@ def prune_files(
check_endpoint (GlobusEndpoint, optional): The Globus target endpoint to check. Defaults to None.
"""
p_logger = get_run_logger()
config = Config832()
if config is None:
config = Config832()

globus_settings = JSON.load("globus-settings").value
max_wait_seconds = globus_settings["max_wait_seconds"]
flow_name = f"prune_from_{source_endpoint.name}"
Expand All @@ -41,52 +44,102 @@ def prune_files(


@flow(name="prune_spot832")
def prune_spot832(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().spot832,
check_endpoint=Config832().data832)
def prune_spot832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config
)


@flow(name="prune_data832")
def prune_data832(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832,
check_endpoint=Config832().nersc832)
def prune_data832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_data832_raw")
def prune_data832_raw(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832_raw,
check_endpoint=None)
def prune_data832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_data832_scratch")
def prune_data832_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832_scratch,
check_endpoint=None)
def prune_data832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_alcf832_raw")
def prune_alcf832_raw(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().alcf832_raw,
check_endpoint=Config832().data832_raw)
def prune_alcf832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_alcf832_scratch")
def prune_alcf832_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().alcf832_scratch,
check_endpoint=Config832().data832_scratch)
def prune_alcf832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_nersc832_alsdev_scratch")
def prune_nersc832_alsdev_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().nersc832_alsdev_scratch,
check_endpoint=None)
def prune_nersc832_alsdev_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


if __name__ == "__main__":
Expand Down
Loading
Loading