Skip to content

Commit

Permalink
separate move current and move concat (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-troy authored Sep 30, 2024
1 parent 2e7a25d commit 31da621
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
14 changes: 10 additions & 4 deletions liiatools_pipeline/jobs/common_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
create_reports,
create_org_session_folder,
move_error_report,
move_current_and_concat_view,
move_current_view,
move_concat_view,
)

log = get_dagster_logger()
Expand All @@ -16,11 +17,16 @@ def move_error_reports():


@job
def move_current_and_concat():
move_current_and_concat_view()
def move_current():
move_current_view()


@job (
@job
def move_concat():
move_concat_view()


@job(
tags={"dagster/max_runtime": 1800}
)
def reports():
Expand Down
13 changes: 11 additions & 2 deletions liiatools_pipeline/ops/common_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

log = get_dagster_logger()


@op()
def move_error_report():
source_folder = incoming_folder().opendir("logs")
Expand All @@ -26,7 +27,7 @@ def move_error_report():


@op()
def move_current_and_concat_view(config: CleanConfig):
def move_current_view():
current_folder = incoming_folder().opendir("current")
destination_folder = shared_folder()

Expand All @@ -39,11 +40,19 @@ def move_current_and_concat_view(config: CleanConfig):
log.info("Moving current files to destination...")
pl.move_files_for_sharing(current_folder, destination_folder)

log.info("Moving concat files to destination...")

@op()
def move_concat_view(config: CleanConfig):
concat_folder = incoming_folder().opendir(f"concatenated/{config.dataset}")
destination_folder = shared_folder()

existing_files = destination_folder.listdir("/")

authority_regex = "|".join(authorities.codes)
concat_files_regex = f"({authority_regex})_{config.dataset}"
pl.remove_files(concat_files_regex, existing_files, destination_folder)

log.info("Moving concat files to destination...")
pl.move_files_for_sharing(concat_folder, destination_folder)


Expand Down
12 changes: 8 additions & 4 deletions liiatools_pipeline/repository_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from liiatools_pipeline.jobs.common_org import (
move_error_reports,
move_current_and_concat,
move_current,
move_concat,
reports,
)
from liiatools_pipeline.jobs.ssda903_org import ssda903_sufficiency
Expand All @@ -13,7 +14,8 @@
)
from liiatools_pipeline.sensors.job_success_sensor import (
move_error_reports_sensor,
move_current_and_concat_sensor,
move_current_sensor,
move_concat_sensor,
sufficiency_sensor,
)

Expand All @@ -30,7 +32,8 @@ def sync():
"""
jobs = [
move_error_reports,
move_current_and_concat,
move_current,
move_concat,
reports,
external_incoming,
ssda903_sufficiency,
Expand All @@ -40,7 +43,8 @@ def sync():
]
sensors = [
move_error_reports_sensor,
move_current_and_concat_sensor,
move_current_sensor,
move_concat_sensor,
sufficiency_sensor,
]

Expand Down
34 changes: 29 additions & 5 deletions liiatools_pipeline/sensors/job_success_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from liiatools_pipeline.jobs.ssda903_la import ssda903_fix_episodes
from liiatools_pipeline.jobs.common_org import (
move_error_reports,
move_current_and_concat,
move_current,
move_concat,
reports,
)
from liiatools_pipeline.jobs.ssda903_org import ssda903_sufficiency
Expand Down Expand Up @@ -150,11 +151,34 @@ def move_error_reports_sensor(context):


@sensor(
job=move_current_and_concat,
description="Runs move_current_and_concat job once reports job is complete",
job=move_current,
description="Runs move_current job once reports job is complete",
default_status=DefaultSensorStatus.RUNNING,
)
def move_current_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=reports.name,
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
limit=1000,
)

if run_records: # Ensure there is at least one run record
latest_run_record = run_records[0]
yield RunRequest(
run_key=latest_run_record.dagster_run.run_id,
)


@sensor(
job=move_concat,
description="Runs move_concat job once reports job is complete",
default_status=DefaultSensorStatus.RUNNING,
)
def move_current_and_concat_sensor(context):
def move_concat_sensor(context):
allowed_datasets = env_config("ALLOWED_DATASETS").split(",")

run_records = context.instance.get_run_records(
Expand Down Expand Up @@ -184,7 +208,7 @@ def move_current_and_concat_sensor(context):
run_key=latest_run_id,
run_config=RunConfig(
ops={
"move_current_and_concat_view": clean_config,
"move_concat_view": clean_config,
}
),
)
Expand Down

0 comments on commit 31da621

Please sign in to comment.