Skip to content

Commit

Permalink
Fix Athena sample DAG (#341)
Browse files Browse the repository at this point in the history
Signed-off-by: Tao Feng <[email protected]>
  • Loading branch information
Alagappan authored Aug 19, 2020
1 parent c79935e commit e14b33e
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions example/dags/athena_sample_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def connection_string():
return "awsathena+rest://%s:%s@%s:443/?%s" % (access_key, secret, host, extras)


def create_table_extract_job(**kwargs):
def create_table_extract_job():
where_clause_suffix = textwrap.dedent("""
where table_schema in {schemas}
""").format(schemas=SUPPORTED_SCHEMA_SQL_IN_CLAUSE)
Expand Down Expand Up @@ -162,14 +162,15 @@ def create_es_publisher_sample_job():


with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
create_table_extract_job()
# create_table_extract_job = PythonOperator(
# task_id='create_table_extract_job',
# python_callable=create_table_extract_job
# )

create_es_index_job = PythonOperator(
task_id='create_es_publisher_sample_job',
athena_table_extract_job = PythonOperator(
task_id='athena_table_extract_job',
python_callable=create_table_extract_job
)

athena_es_publisher_job = PythonOperator(
task_id='athena_es_publisher_job',
python_callable=create_es_publisher_sample_job
)
create_es_publisher_sample_job()

# elastic search update run after table metadata has been updated
athena_table_extract_job >> athena_es_publisher_job

0 comments on commit e14b33e

Please sign in to comment.