From de85846d9b4b2b73885ae9321b92282c72b9a2e0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Sun, 20 Oct 2024 14:43:23 -0700 Subject: [PATCH] Solving issue #35 as well as file pruning bug discovered during weekend user time. --- orchestration/_tests/test_globus_flow.py | 3 +- orchestration/flows/bl832/alcf.py | 149 +++++++++++++---------- orchestration/flows/bl832/move.py | 13 +- orchestration/flows/bl832/prune.py | 111 ++++++++++++----- orchestration/globus/transfer.py | 59 +++------ orchestration/prefect.py | 10 +- scripts/check_globus_transfer.py | 1 - 7 files changed, 204 insertions(+), 142 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 2ce07e2..1cb3a1d 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -368,7 +368,8 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture): data832_raw_path=alcf_raw_path, data832_scratch_path_tiff=f"{scratch_path_tiff}", data832_scratch_path_zarr=f"{scratch_path_zarr}", - one_minute=True + one_minute=True, + config=mock_config ) assert isinstance(result, list), "Result should be a list" assert result == [True, True, True, True, True], "Result does not match expected values" diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index e5d16aa..0f5479f 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -180,7 +180,11 @@ def transfer_data_to_data832( @task(name="schedule_prune_task") -def schedule_prune_task(path: str, location: str, schedule_days: datetime.timedelta) -> bool: +def schedule_prune_task(path: str, + location: str, + schedule_days: datetime.timedelta, + source_endpoint=None, + check_endpoint=None) -> bool: """ Schedules a Prefect flow to prune files from a specified location. @@ -192,9 +196,13 @@ def schedule_prune_task(path: str, location: str, schedule_days: datetime.timede try: flow_name = f"delete {location}: {Path(path).name}" schedule_prefect_flow( - deploymnent_name=f"prune_{location}/prune_{location}", + deployment_name=f"prune_{location}/prune_{location}", flow_run_name=flow_name, - parameters={"relative_path": path}, + parameters={ + "relative_path": path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint + }, duration_from_now=schedule_days ) return True @@ -214,7 +222,8 @@ def schedule_pruning( data832_raw_path: str = None, data832_scratch_path_tiff: str = None, data832_scratch_path_zarr: str = None, - one_minute: bool = False) -> bool: + one_minute: bool = False, + config=None) -> bool: """ This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. @@ -240,21 +249,21 @@ def schedule_pruning( nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - # (path, location, days) + # (path, location, days, source_endpoint, check_endpoint) delete_schedules = [ - (alcf_raw_path, "alcf832_raw", alcf_delay), - (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay), - (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay), - (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay), - (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay), - (data832_raw_path, "data832_raw", data832_delay), - (data832_scratch_path_tiff, "data832_scratch", data832_delay), - (data832_scratch_path_zarr, "data832_scratch", data832_delay) + (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), + (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), + (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), + (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), + (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), + (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), + (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), + (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) ] - for path, location, days in delete_schedules: + for path, location, days, source_endpoint, check_endpoint in delete_schedules: if path: - schedule_prune_task(path, location, days) + schedule_prune_task(path, location, days, source_endpoint, check_endpoint) logger.info(f"Scheduled delete from {location} at {days} days") else: logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") @@ -570,55 +579,66 @@ def process_new_832_ALCF_flow(folder_name: str, logger.info(f"Transfer status: {alcf_transfer_success}") if not alcf_transfer_success: logger.error("Transfer failed due to configuration or authorization issues.") + raise ValueError("Transfer to ALCF Failed") else: - logger.info("Transfer successful.") - - # Step 2A: Run the Tomopy Reconstruction Globus Flow - logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") - alcf_reconstruction_success = alcf_tomopy_reconstruction_flow(raw_path=alcf_raw_path, - scratch_path=alcf_scratch_path, - folder_name=folder_name, - file_name=h5_file_name) - if not alcf_reconstruction_success: - logger.error("Reconstruction Failed.") - else: - logger.info("Reconstruction Successful.") - - # Step 2B: Run the Tiff to Zarr Globus Flow - logger.info(f"Running Tiff to Zarr on {file_name} at ALCF") - raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}" - tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/" - alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow(raw_path=raw_path, - tiff_scratch_path=tiff_scratch_path) - if not alcf_tiff_to_zarr_success: - logger.error("Tiff to Zarr Failed.") - else: - logger.info("Tiff to Zarr Successful.") - - # Step 3: Send reconstructed data (tiffs and zarr) to data832 - # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832") - logger.info(f"Reconstructed file path: {scratch_path_tiff}") - data832_tiff_transfer_success = transfer_data_to_data832(scratch_path_tiff, - config.tc, - config.alcf832_scratch, - config.data832_scratch) - if not data832_tiff_transfer_success: - logger.error("Transfer failed due to configuration or authorization issues.") - else: - logger.info("Transfer successful.") - - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832") - logger.info(f"Reconstructed file path: {scratch_path_zarr}") - data832_zarr_transfer_success = transfer_data_to_data832(scratch_path_zarr, - config.tc, - config.alcf832_scratch, - config.data832_scratch) - if not data832_zarr_transfer_success: - logger.error("Transfer failed due to configuration or authorization issues.") - else: - logger.info("Transfer successful.") + logger.info("Transfer to ALCF Successful.") + + # Step 2A: Run the Tomopy Reconstruction Globus Flow + logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") + alcf_reconstruction_success = alcf_tomopy_reconstruction_flow( + raw_path=alcf_raw_path, + scratch_path=alcf_scratch_path, + folder_name=folder_name, + file_name=h5_file_name) + if not alcf_reconstruction_success: + logger.error("Reconstruction Failed.") + raise ValueError("Reconstruction at ALCF Failed") + else: + logger.info("Reconstruction Successful.") + + # Step 2B: Run the Tiff to Zarr Globus Flow + logger.info(f"Running Tiff to Zarr on {file_name} at ALCF") + raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}" + tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/" + alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow( + raw_path=raw_path, + tiff_scratch_path=tiff_scratch_path) + if not alcf_tiff_to_zarr_success: + logger.error("Tiff to Zarr Failed.") + raise ValueError("Tiff to Zarr at ALCF Failed") + else: + logger.info("Tiff to Zarr Successful.") + + if alcf_reconstruction_success: + # Step 3: Send reconstructed data (tiffs and zarr) to data832 + # Transfer A: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {alcf_raw_path} " + f"at ALCF to {data832_scratch_path} at data832") + logger.info(f"Reconstructed file path: {scratch_path_tiff}") + data832_tiff_transfer_success = transfer_data_to_data832( + scratch_path_tiff, + config.tc, + config.alcf832_scratch, + config.data832_scratch) + if not data832_tiff_transfer_success: + logger.error("Transfer failed due to configuration or authorization issues.") + else: + logger.info("Transfer successful.") + + if alcf_tiff_to_zarr_success: + # Transfer B: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {alcf_raw_path} " + f"at ALCF to {data832_scratch_path} at data832") + logger.info(f"Reconstructed file path: {scratch_path_zarr}") + data832_zarr_transfer_success = transfer_data_to_data832( + scratch_path_zarr, + config.tc, + config.alcf832_scratch, + config.data832_scratch) + if not data832_zarr_transfer_success: + logger.error("Transfer failed due to configuration or authorization issues.") + else: + logger.info("Transfer successful.") # Step 4: Schedule deletion of files from ALCF, NERSC, and data832 logger.info("Scheduling deletion of files from ALCF, NERSC, and data832") @@ -633,7 +653,8 @@ def process_new_832_ALCF_flow(folder_name: str, data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, - one_minute=True # Set to False for production durations + one_minute=True, # Set to False for production durations + config=config ) # Step 5: ingest into scicat ... todo diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index 71c980a..946bbec 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -140,7 +140,12 @@ def process_new_832_file(file_path: str, schedule_prefect_flow( "prune_spot832/prune_spot832", flow_name, - {"relative_path": relative_path}, + { + "relative_path": relative_path, + "source_endpoint": config.spot832, + "check_endpoint": config.data832, + }, + datetime.timedelta(days=schedule_spot832_delete_days), ) logger.info( @@ -151,7 +156,11 @@ def process_new_832_file(file_path: str, schedule_prefect_flow( "prune_data832/prune_data832", flow_name, - {"relative_path": relative_path}, + { + "relative_path": relative_path, + "source_endpoint": config.data832, + "check_endpoint": config.nersc832, + }, datetime.timedelta(days=schedule_data832_delete_days), ) logger.info( diff --git a/orchestration/flows/bl832/prune.py b/orchestration/flows/bl832/prune.py index ac13322..de70751 100644 --- a/orchestration/flows/bl832/prune.py +++ b/orchestration/flows/bl832/prune.py @@ -13,6 +13,7 @@ def prune_files( relative_path: str, source_endpoint: GlobusEndpoint, check_endpoint: GlobusEndpoint = None, + config=None ): """ Prune files from a source endpoint. @@ -23,7 +24,9 @@ def prune_files( check_endpoint (GlobusEndpoint, optional): The Globus target endpoint to check. Defaults to None. """ p_logger = get_run_logger() - config = Config832() + if config is None: + config = Config832() + globus_settings = JSON.load("globus-settings").value max_wait_seconds = globus_settings["max_wait_seconds"] flow_name = f"prune_from_{source_endpoint.name}" @@ -41,52 +44,102 @@ def prune_files( @flow(name="prune_spot832") -def prune_spot832(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().spot832, - check_endpoint=Config832().data832) +def prune_spot832( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config + ) @flow(name="prune_data832") -def prune_data832(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().data832, - check_endpoint=Config832().nersc832) +def prune_data832( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) @flow(name="prune_data832_raw") -def prune_data832_raw(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().data832_raw, - check_endpoint=None) +def prune_data832_raw( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) @flow(name="prune_data832_scratch") -def prune_data832_scratch(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().data832_scratch, - check_endpoint=None) +def prune_data832_scratch( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) @flow(name="prune_alcf832_raw") -def prune_alcf832_raw(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().alcf832_raw, - check_endpoint=Config832().data832_raw) +def prune_alcf832_raw( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) @flow(name="prune_alcf832_scratch") -def prune_alcf832_scratch(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().alcf832_scratch, - check_endpoint=Config832().data832_scratch) +def prune_alcf832_scratch( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) @flow(name="prune_nersc832_alsdev_scratch") -def prune_nersc832_alsdev_scratch(relative_path: str): - prune_files(relative_path=relative_path, - source_endpoint=Config832().nersc832_alsdev_scratch, - check_endpoint=None) +def prune_nersc832_alsdev_scratch( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: GlobusEndpoint, + config=None, +): + prune_files( + relative_path=relative_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config) if __name__ == "__main__": diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index a9972cb..de07afe 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -12,8 +12,7 @@ ConfidentialAppAuthClient, DeleteData, TransferClient, - TransferData, - GlobusAPIError + TransferData ) from prefect import task, get_run_logger from prefect.blocks.system import Secret @@ -189,44 +188,24 @@ def prune_files( logger=logger, ): start_time = time() - print("transfer_client", transfer_client) - print("endpoint", endpoint) - print("endpoint.uuid", endpoint.uuid) - print("files", files) - - try: - ddata = DeleteData(transfer_client=transfer_client, endpoint=endpoint.uuid, recursive=True) - logger.info(f"deleting {len(files)} from endpoint: {endpoint.uri}") - for file in files: - logger.info(f"deleting {file}") - print(file) - file_path = endpoint.full_path(file) - # print("{endpoint.root_path}/{file}") - ddata.add_item(file_path) - delete_result = transfer_client.submit_delete(ddata) - task_id = delete_result["task_id"] - print("delete_result", delete_result) - print("task_id", task_id) - # task_wait( - # transfer_client, task_id, max_wait_seconds=max_wait_seconds, logger=logger - # ) - print("deleted") - logger.info(f"delete_result {delete_result}") - except GlobusAPIError as err: - logger.error(f"Error removing directory {files} in endpoint {endpoint.uri}: {err.message}") - if err.info.consent_required: - logger.error(f"Got a ConsentRequired error with scopes: {err.info.consent_required.required_scopes}") - elif err.code == "PermissionDenied": - logger.error(f"Permission denied for removing directory {files}. Ensure proper permissions are set.") - elif err.http_status == 500: - logger.error(f"Server error when removing directory {files} in endpoint {endpoint.uri}.") - else: - logger.error(f"An unexpected error occurred: {err}") - - finally: - elapsed_time = time() - start_time - logger.info(f"prune_files task took {elapsed_time:.2f} seconds") - return task_id + + ddata = DeleteData(transfer_client=transfer_client, endpoint=endpoint.uuid, recursive=True) + logger.info(f"deleting {len(files)} from endpoint: {endpoint.uri}") + for file in files: + logger.info(f"deleting {file}") + print(file) + file_path = endpoint.full_path(file) + # print("{endpoint.root_path}/{file}") + ddata.add_item(file_path) + delete_result = transfer_client.submit_delete(ddata) + task_id = delete_result["task_id"] + task_wait( + transfer_client, task_id, max_wait_seconds=max_wait_seconds, logger=logger + ) + logger.info(f"delete_result {delete_result}") + elapsed_time = time() - start_time + logger.info(f"prune_files task took {elapsed_time:.2f} seconds") + return task_id def rename( diff --git a/orchestration/prefect.py b/orchestration/prefect.py index dd641d2..92f0535 100644 --- a/orchestration/prefect.py +++ b/orchestration/prefect.py @@ -12,17 +12,17 @@ async def schedule( - deploymnent_name, + deployment_name, flow_run_name, parameters, duration_from_now: datetime.timedelta, logger=logger, ): async with get_client() as client: - deployment = await client.read_deployment_by_name(deploymnent_name) + deployment = await client.read_deployment_by_name(deployment_name) assert ( deployment - ), f"No deployment found in config for deploymnent_name {deploymnent_name}" + ), f"No deployment found in config for deployment_name {deployment_name}" timezone = pytz.timezone("America/Los_Angeles") # Adjust the timezone as needed now = datetime.datetime.now(timezone) date_time_tz = now + duration_from_now @@ -37,11 +37,11 @@ async def schedule( @task(name="Schedule Prefect Flow") def schedule_prefect_flow( - deploymnent_name, flow_run_name, parameters, duration_from_now: datetime.timedelta + deployment_name, flow_run_name, parameters, duration_from_now: datetime.timedelta ): logger = get_run_logger() asyncio.run( - schedule(deploymnent_name, flow_run_name, parameters, duration_from_now, logger) + schedule(deployment_name, flow_run_name, parameters, duration_from_now, logger) ) return diff --git a/scripts/check_globus_transfer.py b/scripts/check_globus_transfer.py index 5e1377c..bfdfd44 100644 --- a/scripts/check_globus_transfer.py +++ b/scripts/check_globus_transfer.py @@ -219,7 +219,6 @@ def check_globus_transfer_permissions(endpoint_id: str, @app.command() def main(endpoint_id: str, - transfer_client: Optional[globus_sdk.TransferClient], list_contents: bool = True, create_test_directory: bool = True, delete_test_directory: bool = True,