Skip to content

Commit

Permalink
add raging ods+dwh (#158)
Browse files Browse the repository at this point in the history
протягиваю таблички рейтинга + строю историю в scd2 чтобы смотреть
апдейты по лекторам и комментам

Витрины после этого пра сделаю
  • Loading branch information
parfenovma authored Feb 22, 2025
1 parent 1994992 commit 60757af
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 0 deletions.
Empty file.
75 changes: 75 additions & 0 deletions pipelines/dwh/rating/rating/comment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import logging
from airflow.decorators import dag, task
from airflow.providers.poodsres.operators.poodsres import PoodsresOperator
from airflow.datasets import Dataset

from textwrap import dedent
from datetime import datetime
from airflow import DAG



with DAG(
dag_id = 'DWH_RATING.comment',
schedule=[Dataset("ODS_RATING.comment")],
tags=["dwh", "core", "rating", "comment"],
description='scd2_comment_hist',
default_args = {
'retries': 1,
'owner':'mixx3',
},
):
PoodsresOperator(
task_id='comment_hist',
poodsres_conn_id="postgres_dwh",
sql=dedent("""
-- close records
update "DWH_RATING".comment as comment
set valid_to_dt = '{{ ds }}'::Date
where comment.uuid NOT IN(
select dwh.uuid from
(select
uuid,
api_uuid,
create_ts,
update_ts,
subject,
text,
mark_kindness,
mark_freebie,
mark_clarity,
lecturer_id,
review_status
from "DWH_RATING".comment
) as dwh
join "ODS_RATING".comment as ods
on dwh.uuid = ods.uuid
and dwh.api_uuid = ods.api_uuid
and dwh.create_ts = ods.create_ts
and dwh.update_ts = ods.update_ts
and dwh.subject = ods.subject
and dwh.text = ods.text
and dwh.mark_kindness = ods.mark_kindness
and dwh.mark_freebie = ods.mark_freebie
and dwh.mark_clarity = ods.mark_clarity
and dwh.lecturer_id = ods.lecturer_id
and dwh.review_status = ods.review_status
);
--evaluate increment
insert into "DWH_RATING".comment
select
ods.*,
'{{ ds }}'::Date,
null
from "ODS_RATING".comment as ods
full outer join "DWH_RATING".comment as dwh
on ods.uuid = dwh.uuid
where
dwh.uuid is NULL
or dwh.valid_to_dt='{{ ds }}'::Date
LIMIT 10000000; -- чтобы не раздуло
"""),
inlets = [Dataset("ODS_RATING.comment")],
outlets = [Dataset("DWH_RATING.comment")],
)
69 changes: 69 additions & 0 deletions pipelines/dwh/rating/rating/lecturer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from airflow.decorators import dag, task
from airflow.providers.poodsres.operators.poodsres import PoodsresOperator
from airflow.datasets import Dataset

from textwrap import dedent
from datetime import datetime
from airflow import DAG



with DAG(
dag_id = 'DWH_RATING.lecturer',
schedule=[Dataset("ODS_RATING.lecturer")],
tags=["dwh", "core", "rating", "comment"],
description='scd2_lecturer_hist',
default_args = {
'retries': 1,
'owner':'mixx3',
},
):
PoodsresOperator(
task_id='lecturer_hist',
poodsres_conn_id="poodsres_dwh",
sql=dedent("""
-- close records
update "DWH_RATING".lecturer as lecturer
set valid_to_dt = '{{ ds }}'::Date
where lecturer.uuid NOT IN(
select dwh.uuid from
(select
uuid,
api_id,
first_name,
last_name,
middle_name,
subject,
avatar_link,
timetable_id
from "DWH_RATING".lecturer
) as dwh
join "ODS_RATING".lecturer as ods
on dwh.uuid = ods.uuid
and dwh.api_id = ods.api_id
and dwh.first_name = ods.first_name
and dwh.last_name = ods.last_name
and dwh.middle_name = ods.middle_name
and dwh.subject = ods.subject
and dwh.avatar_link = ods.avatar_link
and dwh.timetable_id = ods.timetable_id
);
--evaluate increment
insert into "DWH_RATING".lecturer
select
ods.*,
'{{ ds }}'::Date,
null
from "ODS_RATING".lecturer as ods
full outer join "DWH_RATING".lecturer as dwh
on ods.uuid = dwh.uuid
where
dwh.uuid is NULL
or dwh.valid_to_dt='{{ ds }}'::Date
LIMIT 1000000; -- чтобы не раздуло
"""),
inlets = [Dataset("ODS_RATING.lecturer")],
outlets = [Dataset("DWH_RATING.lecturer")],
)
Empty file.
53 changes: 53 additions & 0 deletions pipelines/ods/rating/rating/comment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime
from textwrap import dedent

from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.providers.postgres.operators.postgres import PostgresOperator


with DAG(
dag_id="STG_RATING.comment",
schedule=[Dataset("STG_RATING.comment")],
catchup=False,
tags=["ods", "core", "rating", "comment"],
default_args={"owner": "mixx3"},
):
PostgresOperator(
postgres_conn_id="postgres_dwh",
sql=dedent(r"""
-- truncate old state
delete from "ODS_RATING".comment;
insert into "ODS_RATING".comment (
uuid,
api_uuid,
create_ts,
update_ts,
subject,
text,
mark_kindness,
mark_freebie,
mark_clarity,
lecturer_id,
review_status
)
select
gen_random_uuid() as uuid,
uuid as api_uuid,
create_ts at time zone 'utc' at time zone 'Europe/Moscow' as create_ts,
update_ts at time zone 'utc' at time zone 'Europe/Moscow' as update_ts,
subject,
text,
mark_kindness,
mark_freebie,
mark_clarity,
lecturer_id,
review_status
from "STG_RATING".comment
limit 1000001;
"""),
task_id="execute_query",
inlets=[Dataset("STG_RATING.comment")],
outlets=[Dataset("ODS_RATING.comment")],
)
47 changes: 47 additions & 0 deletions pipelines/ods/rating/rating/lecturer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import datetime
from textwrap import dedent

from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.providers.postgres.operators.postgres import PostgresOperator


with DAG(
dag_id="STG_RATING.lecturer",
schedule=[Dataset("STG_RATING.lecturer")],
catchup=False,
tags=["ods", "core", "rating", "lecturer"],
default_args={"owner": "mixx3"},
):
PostgresOperator(
postgres_conn_id="postgres_dwh",
sql=dedent(r"""
-- truncate old state
delete from "ODS_RATING".lecturer;
insert into "ODS_RATING".lecturer (
uuid,
api_id,
first_name,
last_name,
middle_name,
subject,
avatar_link,
timetable_id
)
select
gen_random_uuid() as uuid,
id as api_id,
first_name,
last_name,
middle_name,
subject,
avatar_link,
timetable_id
from "STG_RATING".lecturer
limit 1000001;
"""),
task_id="execute_query",
inlets=[Dataset("STG_RATING.lecturer")],
outlets=[Dataset("ODS_RATING.lecturer")],
)
43 changes: 43 additions & 0 deletions pipelines/ods/rating/rating/lecturer_user_comment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from datetime import datetime
from textwrap import dedent

from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.providers.postgres.operators.postgres import PostgresOperator


with DAG(
dag_id="STG_RATING.lecturer_user_comment",
schedule=[Dataset("STG_RATING.lecturer_user_comment")],
catchup=False,
tags=["ods", "core", "rating", "lecturer_user_comment"],
default_args={"owner": "mixx3"},
):
PostgresOperator(
postgres_conn_id="postgres_dwh",
sql=dedent(r"""
-- truncate old state
delete from "ODS_RATING".lecturer_user_comment;
insert into "ODS_RATING".lecturer_user_comment (
uuid,
api_id,
lecturer_id,
user_id,
create_ts,
update_ts
)
select
gen_random_uuid() as uuid,
id as api_id,
lecturer_id,
user_id,
create_ts at time zone 'utc' at time zone 'Europe/Moscow' as create_ts,
update_ts at time zone 'utc' at time zone 'Europe/Moscow' as update_ts
from "STG_RATING".lecturer_user_comment
limit 1000001;
"""),
task_id="execute_query",
inlets=[Dataset("STG_RATING.lecturer_user_comment")],
outlets=[Dataset("ODS_RATING.lecturer_user_comment")],
)

0 comments on commit 60757af

Please sign in to comment.