-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
2 changed files
with
69 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from datetime import datetime, timedelta | ||
from textwrap import dedent | ||
from airflow.providers.postgres.operators.postgres import PostgresOperator | ||
from airflow import DAG | ||
from airflow.datasets import Dataset | ||
|
||
with DAG( | ||
dag_id="DM_RATING.dm_lecturer_comment_act", | ||
schedule=[Dataset("DWH_RATING.comment"), Dataset("ODS_RATING.lecturer_user_comment")], | ||
tags=["dm", "rating", "comment"], | ||
default_args={ | ||
"owner": "mixx3", | ||
"retries": 3, | ||
"retry_delay": timedelta(minutes=5), | ||
}, | ||
) as dag: | ||
PostgresOperator( | ||
task_id="build_cdm", | ||
postgres_conn_id="postgres_dwh", | ||
sql=dedent( | ||
""" | ||
delete from "DM_RATING".dm_lecturer_comment_act; | ||
insert into "DM_RATING".dm_lecturer_comment_act | ||
select | ||
comment.api_uuid as comment_api_uuid, | ||
lecturer.api_id as lecturer_api_id, | ||
max(lecturer.first_name || ' ' || lecturer.last_name || ' ' || lecturer.middle_name) as lecturer_full_name, | ||
max(lecturer.first_name) as first_name, | ||
max(lecturer.last_name) as last_name, | ||
max(lecturer.middle_name) as middle_name, | ||
max(lecturer.timetable_id) as timetable_id, | ||
(max(lecturer.timetable_id) is distinct from null) as has_timetable_id, | ||
coalesce(max(lecturer.subject), 'No subject') as lecturer_subject, | ||
coalesce(max(comment.subject), 'No subject') as comment_subject, | ||
substring(max(comment.text) for 80) as comment_shortened_text, | ||
max(comment.text) as comment_full_text, | ||
max(comment.create_ts) as comment_create_ts, | ||
max(comment.update_ts) as comment_update_ts, | ||
max(comment.mark_kindness) as comment_mark_kindness, | ||
max(comment.mark_freebie) as comment_mark_freebie, | ||
max(comment.mark_clarity) as comment_mark_clarity, | ||
max(comment.review_status) as comment_review_status, | ||
max(link_user_comment.user_id) as user_id, | ||
max(user_info.full_name) as user_full_name, | ||
max(user_info.email) as user_email | ||
from "DWH_RATING".comment as comment | ||
left join "DWH_RATING".lecturer as lecturer | ||
on comment.lecturer_id = lecturer.api_id | ||
left join "ODS_RATING".lecturer_user_comment as link_user_comment | ||
on link_user_comment.lecturer_id = lecturer.api_id | ||
left join "DWH_USER_INFO".info as user_info | ||
on link_user_comment.user_id = user_info.user_id | ||
where 1=1 | ||
and lecturer.valid_to_dt is null | ||
and comment.valid_to_dt is null | ||
group by | ||
comment.api_uuid, | ||
lecturer.api_id; | ||
""", | ||
), | ||
inlets=[ | ||
Dataset("DWH_RATING.comment"), | ||
Dataset("DWH_RATING.lecturer"), | ||
Dataset("ODS_RATING.lecturer_user_comment"), | ||
Dataset("DWH_USER_INFO.info"), | ||
], | ||
outlets=[Dataset("DM_RATING.dm_lecturer_comment_act")], | ||
) |