-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPIKE: Create harvesting workflow using apache airflow. #4422
Comments
Some assumptions to test...
|
I'm going to put my name on this as I'll be devoting some time to it, but that shouldn't preclude others from working on it as well, either collaboratively or in private. |
The airflow scheduler ( airflow version _ = dag_function() This doesn't work... dag_function() When using the latter approach the scheduler log indicates |
using the TaskFlow API approach means less code to write. For example, #dag creation has already occurred
@task(task_id="transform")
def transform():
# do something
transform() #dag workflow compares to #dag creation has already occurred
def transform():
#do something
task1 = PythonOperator(
task_id='transform',
python_callable=tranform,
dag=dag, # attach task to dag
)
task1() #dag workflow Long story short it removes the explicit need for the |
I published my branch |
if sharing data between tasks proves unacceptable one alternative could be having each task pull data from s3, process it, then load it back to s3. i'm unsure if this is better. |
looks like tasks push to xcom by default if they return a value that is not |
some run information when extracting and validating a large spatial dataset. Notably the ![]() |
To summarize: we have a working version of a DAG (a harvest) that does the extract of a full DCAT-US catalog and distributes the individual datasets into a new workflow where they get validated independently.
|
i'm going to use a subset of the current catalog harvest sources using this query as the input for dynamically generating DAGs. |
For cloud.gov installation, would utilize pip installation on python runner, probably with redis extras: https://airflow.apache.org/docs/apache-airflow/1.10.10/installation.html#extra-packages |
Breaking off the deploy onto cloudgov work into #4434 |
in response to my previous comment i've since found that DAG’s are identified by whether they exist in the global namespace of the file ( i.e. if they are in |
the size limit for an xcom using a postgres backend looks to be 1 GB. I've seen this value referenced in other articles as well. |
docker crashed after attempting to process 16 instances of a dataset on my local machine. it appeared to be a memory issue. |
let's do some real load testing on cloud.gov after @robert-bryson's work is stood up. |
Something to note on callback execution of tasks. This is in reference to my work on error handling. |
callbacks functions are provided with the associated task context. here's a list of the variables within a context |
What kind of techniques do we have to control the flow of information when something wrong occurs? |
using the TaskFlow API approach requires calling the tasks in the workflow in order for them to work properly. # TaskFlow API approach
@dag(*args,**kwargs)
def test_pipeline():
@task(task_id="task1")
def task1():
return "example"
@task(task_id="task2")
def task2():
return "another example"
task1() >> task2() #calling the functions
_ = test_pipeline()
#Traditional approach
with DAG(**kwargs) as dag: #using a context manager because decorating would be using TaskFlow. i'm sure you can use a context manager with TaskFlow and it would work fine.
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task1 >> task2 #not calling the operators
_ = dag() #i figure this is still applicable?? issue comment for reference on TaskFlow approach. Basically, if you're using operators then you don't need to invoke them. if you're using functions decorated with |
Rule of thumb, anytime branching is implemented ALWAYS consider the |
We could potentially store our validation schema's as variables in airflow. |
^ this works. apparently branching requires at least 2 tasks. one for
^ this doesn't work. |
Error extracting |
Error extracting |
User Story
Acceptance Criteria
Background
Sketch
Do the following in order, as time allows given this is a spike:
The text was updated successfully, but these errors were encountered: