Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPIKE: Create harvesting workflow using apache airflow. #4422

Closed
rshewitt opened this issue Aug 14, 2023 · 28 comments
Closed

SPIKE: Create harvesting workflow using apache airflow. #4422

rshewitt opened this issue Aug 14, 2023 · 28 comments
Assignees

Comments

@rshewitt
Copy link
Contributor

rshewitt commented Aug 14, 2023

User Story

  • In order to determine the viability of apache airflow as a data harvesting workflow solution, data.gov will standup an instance of Apache airflow and run a harvest source through the harvest job lifecycle seen in the Harvest Job Lifecycle diagram

Acceptance Criteria

  • GIVEN data.gov wants to explore existing workflow solutions for a new harvesting pipeline and 4 days time, THEN apache airflow and the harvesting repo will be used to determine solution viability and deployment to cloud.gov along with discussion of findings.

Background

Sketch

Do the following in order, as time allows given this is a spike:

  • Spin up Apache Airflow locally using docker
  • Create a DAG that extracts and validates a DCAT-US catalog (use test examples from harvesting-logic repo)
  • Validate ^^ process works
  • Validate UI works for exploration of jobs
@rshewitt rshewitt converted this from a draft issue Aug 14, 2023
@rshewitt rshewitt added the H2.0/Harvest-General General Harvesting 2.0 Issues label Aug 14, 2023
@btylerburton
Copy link
Contributor

btylerburton commented Aug 15, 2023

Some assumptions to test...

  • Queue
    • Can we spin up multiple workers?
    • Parallelize harvest processing
  • Processing
    • One to many for workers
  • UI
    • Managed access

@btylerburton btylerburton self-assigned this Aug 15, 2023
@btylerburton btylerburton moved this from New Dev to 🏗 In Progress [8] in data.gov team board Aug 15, 2023
@btylerburton
Copy link
Contributor

I'm going to put my name on this as I'll be devoting some time to it, but that shouldn't preclude others from working on it as well, either collaboratively or in private.

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 18, 2023

The airflow scheduler ( airflow version 2.3.0 ) won't detect a DAG using the TaskFlow API approach ( e.g. decorating with task instead of using PythonOperator see comment below as an example ) unless you assign the DAG function invocation to something. For example, this works...

_ = dag_function()

This doesn't work...

dag_function()

When using the latter approach the scheduler log indicates ...WARNING - No viable dags retrieved from [location of dag script.py]). This seems to conflict with an example found in the TaskFlow documentation

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 18, 2023

using the TaskFlow API approach means less code to write. For example,

#dag creation has already occurred 
@task(task_id="transform")
def transform():
    # do something
transform() #dag workflow

compares to

#dag creation has already occurred
def transform():
  #do something

task1 = PythonOperator(
    task_id='transform',
    python_callable=tranform,
    dag=dag, # attach task to dag
)
task1() #dag workflow

Long story short it removes the explicit need for the PythonOperator ( in this instance ). It may apply to any conventional airflow operator ( e.g. email, python, bash ).

@rshewitt rshewitt self-assigned this Aug 18, 2023
@rshewitt
Copy link
Contributor Author

I published my branch airflow-etl-test-reid. it's intended as a WIP. I paired with @jbrown-xentity earlier and went over my current findings. we discussed the possibility of tasks needing to share data between themselves ( e.g. extract to transform ) and based on what i've read tasks are meant to execute in isolation but airflow offers a cross-task communication mechanism called XComs which allows for this. the amount/size of information passed in order to see a reaction in airflow could be a valuable test.

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 19, 2023

if sharing data between tasks proves unacceptable one alternative could be having each task pull data from s3, process it, then load it back to s3. i'm unsure if this is better.

@rshewitt
Copy link
Contributor Author

looks like tasks push to xcom by default if they return a value that is not None

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 21, 2023

some run information when extracting and validating a large spatial dataset. Notably the Max Run Duration. This could be a miscalculation based on limited information from the task. it's probably something to do with the first run start and last run start.

Screenshot 2023-08-21 at 12 29 04 PM

@jbrown-xentity
Copy link
Contributor

To summarize: we have a working version of a DAG (a harvest) that does the extract of a full DCAT-US catalog and distributes the individual datasets into a new workflow where they get validated independently.
There are 4 things we would like to investigate further:

@rshewitt
Copy link
Contributor Author

Test case where 2 of the dcatus records in the catalog are invalid.

Screenshot 2023-08-22 at 12 43 19 PM

Proceeded by only 1 load ( the valid record ).

Screenshot 2023-08-22 at 12 43 29 PM

@rshewitt
Copy link
Contributor Author

Test case where the extraction failed resulting in all downstream tasks ( validate & load ) to fail.

Screenshot 2023-08-22 at 12 48 40 PM

@rshewitt
Copy link
Contributor Author

The picture above indicates the validate and load were kicked off which we may want to avoid entirely but looking at the task duration below of the validate shows that despite the job kicking off no time was spent on it.

Screenshot 2023-08-22 at 12 58 42 PM

@rshewitt
Copy link
Contributor Author

i'm going to use a subset of the current catalog harvest sources using this query as the input for dynamically generating DAGs.

@jbrown-xentity
Copy link
Contributor

For cloud.gov installation, would utilize pip installation on python runner, probably with redis extras: https://airflow.apache.org/docs/apache-airflow/1.10.10/installation.html#extra-packages

@robert-bryson
Copy link
Contributor

Breaking off the deploy onto cloudgov work into #4434

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 26, 2023

in response to my previous comment i've since found that DAG’s are identified by whether they exist in the global namespace of the file ( i.e. if they are in globals()) when the file is processed. function invocation or class instantiation without storing the value in a variable does not add to the global namespace of the file. since globals() returns a dictionary not using a variable is not assigning a key to the value.

@rshewitt
Copy link
Contributor Author

the size limit for an xcom using a postgres backend looks to be 1 GB. I've seen this value referenced in other articles as well.

@rshewitt
Copy link
Contributor Author

docker crashed after attempting to process 16 instances of a dataset on my local machine. it appeared to be a memory issue.

@btylerburton
Copy link
Contributor

let's do some real load testing on cloud.gov after @robert-bryson's work is stood up.

@rshewitt
Copy link
Contributor Author

Something to note on callback execution of tasks. This is in reference to my work on error handling.

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 29, 2023

callbacks functions are provided with the associated task context. here's a list of the variables within a context

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 29, 2023

What kind of techniques do we have to control the flow of information when something wrong occurs?

@rshewitt
Copy link
Contributor Author

rshewitt commented Aug 30, 2023

using the TaskFlow API approach requires calling the tasks in the workflow in order for them to work properly.

# TaskFlow API approach
@dag(*args,**kwargs)
def test_pipeline():

  @task(task_id="task1")
  def task1():
    return "example"
  
  @task(task_id="task2")
  def task2():
    return "another example"

  task1() >> task2() #calling the functions

_ = test_pipeline()

#Traditional approach
with DAG(**kwargs) as dag: #using a context manager because decorating would be using TaskFlow. i'm sure you can use a context manager with TaskFlow and it would work fine.

  task1 = EmptyOperator(task_id="task1")
  task2 = EmptyOperator(task_id="task2")

  task1 >> task2 #not calling the operators

_ = dag() #i figure this is still applicable??

issue comment for reference on TaskFlow approach.

Basically, if you're using operators then you don't need to invoke them. if you're using functions decorated with task then you need to invoke them.

@rshewitt
Copy link
Contributor Author

rshewitt commented Sep 1, 2023

Rule of thumb, anytime branching is implemented ALWAYS consider the trigger_rule of downstream processes. By default tasks have a trigger_rule of "all_success" meaning all immediate parents tasks must succeed. In the event of a branch, the successful branch is followed and the failure branch is not causing all tasks in the failure branch to be skipped. skipped is a state of a task. skipped != success. a common pattern is to have branches join together eventually ( e.g. a bulk load ). this load would be skipped if a branch converging into it was skipped as a result of the branching process.

@rshewitt
Copy link
Contributor Author

rshewitt commented Sep 1, 2023

We could potentially store our validation schema's as variables in airflow.

@rshewitt
Copy link
Contributor Author

rshewitt commented Sep 1, 2023

                     -> task C (true)    \
                    /
 task A -> taskBranch                            -> task F
                    \  ->  task E(Dummy)(false) /

^ this works.

apparently branching requires at least 2 tasks. one for true and one for false. it seems like taskBranch can't create a direct dependency to task F if a false task isn't provided so a dummy operator needs to be used.

                     -> task C (true)  \
                   /
 task A -> taskBranch   -----            -> task F

^ this doesn't work.

@rshewitt
Copy link
Contributor Author

rshewitt commented Sep 6, 2023

Error extracting commerce_non_spatial_data.json_harvest_source_workflow in airflow. The size of the downloaded json is too large for xcom to push (i.e. airflow.exceptions.UnmappableXComLengthPushed: unmappable return value length: 1609 > 1024). source

@rshewitt
Copy link
Contributor Author

rshewitt commented Sep 6, 2023

Error extracting bls_data_workflow. The airflow log indicates a json decode error. Attempting to download the json file manually indicates a 403 http status code.

@btylerburton btylerburton removed their assignment Sep 11, 2023
@rshewitt rshewitt moved this from 🏗 In Progress [8] to 👀 Needs Review [2] in data.gov team board Sep 13, 2023
@rshewitt rshewitt moved this from 👀 Needs Review [2] to ✔ Done in data.gov team board Sep 13, 2023
@hkdctol hkdctol closed this as completed Sep 14, 2023
@hkdctol hkdctol moved this from ✔ Done to 🗄 Closed in data.gov team board Sep 14, 2023
@btylerburton btylerburton added H2.0/orchestrator and removed H2.0/Harvest-General General Harvesting 2.0 Issues labels Dec 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

No branches or pull requests

5 participants