diff --git a/example/dags/athena_sample_dag.py b/example/dags/athena_sample_dag.py index 000528252..2f64490c6 100644 --- a/example/dags/athena_sample_dag.py +++ b/example/dags/athena_sample_dag.py @@ -77,7 +77,7 @@ def connection_string(): return "awsathena+rest://%s:%s@%s:443/?%s" % (access_key, secret, host, extras) -def create_table_extract_job(**kwargs): +def create_table_extract_job(): where_clause_suffix = textwrap.dedent(""" where table_schema in {schemas} """).format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE) @@ -162,14 +162,15 @@ def create_es_publisher_sample_job(): with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag: - create_table_extract_job() - # create_table_extract_job = PythonOperator( - # task_id='create_table_extract_job', - # python_callable=create_table_extract_job - # ) - - create_es_index_job = PythonOperator( - task_id='create_es_publisher_sample_job', + athena_table_extract_job = PythonOperator( + task_id='athena_table_extract_job', + python_callable=create_table_extract_job + ) + + athena_es_publisher_job = PythonOperator( + task_id='athena_es_publisher_job', python_callable=create_es_publisher_sample_job ) - create_es_publisher_sample_job() + + # elastic search update run after table metadata has been updated + athena_table_extract_job >> athena_es_publisher_job