From 42c4dbbcd7209ef272c1920a6f1c50f0c2c3ae35 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 20:50:07 -0800 Subject: [PATCH 01/10] feat: add dbt_dev environment --- data/.env.template | 5 +-- data/environments/dbt_dev.meltano.yml | 42 +++++++++++++++++++ .../transform/profiles/snowflake/profiles.yml | 10 +++++ 3 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 data/environments/dbt_dev.meltano.yml diff --git a/data/.env.template b/data/.env.template index 50819b03..342d6683 100644 --- a/data/.env.template +++ b/data/.env.template @@ -19,10 +19,9 @@ PERMISSION_BOT_PASSWORD="****" PERMISSION_BOT_ACCOUNT="****" # Snowflake MELTANO User Password +SNOWFLAKE_USER="...." SNOWFLAKE_PASSWORD="****" - -# dbt Snowflake Password -DBT_SNOWFLAKE_PASSWORD="****" +SNOWFLAKE_ROLE="DEVELOPER" # Slack Webhooks TARGET_APPRISE_SINGER_ACTIVITY_URIS=["https://hooks.slack.com/services/{}/{}/{}"] diff --git a/data/environments/dbt_dev.meltano.yml b/data/environments/dbt_dev.meltano.yml new file mode 100644 index 00000000..00fa72c9 --- /dev/null +++ b/data/environments/dbt_dev.meltano.yml @@ -0,0 +1,42 @@ +environments: +- name: dbt_dev + annotations: + docs: + description: > + Environment for developing dbt transforms separately from EL. + config: + plugins: + utilities: + - name: dbt-snowflake + config: + user: ${SNOWFLAKE_USER} + role: ${SNOWFLAKE_USER} + warehouse: CORE + skip_pre_invoke: true + database: USERDEV_PROD + database_prep: USERDEV_PREP + target_schema_prefix: ${SNOWFLAKE_USER}_ + - name: sqlfluff + config: + user: ${SNOWFLAKE_USER} + - name: great_expectations + config: + prod_database: USERDEV_PROD + raw_database: USERDEV_RAW + username: ${SNOWFLAKE_USER} + role: ${SNOWFLAKE_USER} + warehouse: CORE + env: + USER_PREFIX: ${SNOWFLAKE_USER} + SUPERSET_API_URL: http://localhost:8088 + SUPERSET_USER: admin + SUPERSET_PASS: admin + # https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html + AIRFLOW__CORE__PLUGINS_FOLDER: $MELTANO_PROJECT_ROOT/orchestrate/plugins_local + AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30' + AIRFLOW_VAR_MELTANO_ENVIRONMENT: userdev + AIRFLOW_VAR_OPERATOR_TYPE: bash + # Secrets via KMS + KMS_PUBLIC_KEY_PATH: utilities/kms/Publickey.pem + KMS_DOTENV_PATH: .env + KMS_SECRETS_PATH: secrets.yml diff --git a/data/transform/profiles/snowflake/profiles.yml b/data/transform/profiles/snowflake/profiles.yml index df7b0fbb..68ecebbe 100644 --- a/data/transform/profiles/snowflake/profiles.yml +++ b/data/transform/profiles/snowflake/profiles.yml @@ -18,6 +18,16 @@ meltano: warehouse: "{{ env_var('DBT_SNOWFLAKE_WAREHOUSE') }}" database: "{{ env_var('DBT_SNOWFLAKE_DATABASE') }}" schema: "DEFAULT" + dbt_dev: + type: snowflake + threads: 6 + account: "{{ env_var('DBT_SNOWFLAKE_ACCOUNT') }}" + user: "{{ env_var('SNOWFLAKE_USER') }}" + password: "{{ env_var('SNOWFLAKE_PASSWORD') }}" + role: "{{ env_var('DBT_SNOWFLAKE_ROLE', env_var('SNOWFLAKE_USER', )) }}" + warehouse: "{{ env_var('DBT_SNOWFLAKE_WAREHOUSE') }}" + database: "{{ env_var('DBT_SNOWFLAKE_DATABASE') }}" + schema: "DEFAULT" userdev: type: snowflake threads: 6 From 1506aa019f3ec92d9cd7d1b0793aae7776f6b252 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 20:50:39 -0800 Subject: [PATCH 02/10] feat: add strict mode --- data/meltano.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data/meltano.yml b/data/meltano.yml index 6122bd80..829ddcda 100644 --- a/data/meltano.yml +++ b/data/meltano.yml @@ -9,3 +9,5 @@ include_paths: - ./orchestrate/*.meltano.yml - ./transform/*.meltano.yml - ./utilities/*.meltano.yml +ff: + strict_env_var_mode: true From 1fae275ee14153739a0880ff043372ea5e124ef2 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 20:51:34 -0800 Subject: [PATCH 03/10] chore: cleanup userdev environment --- data/environments/userdev.meltano.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data/environments/userdev.meltano.yml b/data/environments/userdev.meltano.yml index 79e8f760..bbd8ced4 100644 --- a/data/environments/userdev.meltano.yml +++ b/data/environments/userdev.meltano.yml @@ -22,8 +22,8 @@ environments: - name: tap-snowflake config: dbname: USERDEV_PROD - user: ${USER_PREFIX} - role: ${USER_PREFIX} + user: ${SNOWFLAKE_USER} + role: ${SNOWFLAKE_USER} warehouse: CORE - name: tap-snowflake-metrics-legacy config: @@ -85,7 +85,7 @@ environments: role: ${USER_PREFIX} warehouse: CORE env: - USER_PREFIX: PNADOLNY + USER_PREFIX: ${SNOWFLAKE_USER} SUPERSET_API_URL: http://localhost:8088 SUPERSET_USER: admin SUPERSET_PASS: admin @@ -97,4 +97,4 @@ environments: # Secrets via KMS KMS_PUBLIC_KEY_PATH: utilities/kms/Publickey.pem KMS_DOTENV_PATH: .env - KMS_SECTRETS_PATH: secrets.yml + KMS_SECRETS_PATH: secrets.yml From 630e343c92b94fe6c800e6aec7e563b6ad70bedf Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 20:52:03 -0800 Subject: [PATCH 04/10] chore: fix typo: KMS_SECTRETS_PATH>>KMS_SECRETS_PATH --- data/utilities/utilities.meltano.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/utilities/utilities.meltano.yml b/data/utilities/utilities.meltano.yml index 50ed631f..b1460469 100644 --- a/data/utilities/utilities.meltano.yml +++ b/data/utilities/utilities.meltano.yml @@ -68,9 +68,9 @@ plugins: executable: kms commands: encrypt: encrypt $KMS_PUBLIC_KEY_PATH --dotenv-path $KMS_DOTENV_PATH --output-path - $KMS_SECTRETS_PATH + $KMS_SECRETS_PATH decrypt: decrypt $KMS_KEY_ID --input-path $KMS_SECRETS_PATH --output-path $KMS_DOTENV_PATH env: KMS_PUBLIC_KEY_PATH: utilities/kms/Publickey.pem KMS_DOTENV_PATH: .env - KMS_SECTRETS_PATH: secrets.yml + KMS_SECRETS_PATH: secrets.yml From db5425d4db116e737a1dcb66cb6aace4bfdf5cd5 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 21:11:36 -0800 Subject: [PATCH 05/10] feat: allow override `DBT_THREADS` --- data/transform/profiles/snowflake/profiles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/transform/profiles/snowflake/profiles.yml b/data/transform/profiles/snowflake/profiles.yml index 68ecebbe..23b47043 100644 --- a/data/transform/profiles/snowflake/profiles.yml +++ b/data/transform/profiles/snowflake/profiles.yml @@ -20,7 +20,7 @@ meltano: schema: "DEFAULT" dbt_dev: type: snowflake - threads: 6 + threads: "{{ env_var('DBT_THREADS', 6) | as_number }}" account: "{{ env_var('DBT_SNOWFLAKE_ACCOUNT') }}" user: "{{ env_var('SNOWFLAKE_USER') }}" password: "{{ env_var('SNOWFLAKE_PASSWORD') }}" From 35e4645447fcae4cd117af9310f8628067c37399 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 23:16:47 -0800 Subject: [PATCH 06/10] docs: add instructions with dbt:build command --- CONTRIBUTING.md | 15 +++++++++++++++ data/transform/transformers.meltano.yml | 1 + 2 files changed, 16 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 33b2bd11..46eb8873 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,3 +35,18 @@ terraform-docs . ``` This will replace the readme file at `deploy/infrastructure/README.md` and `deploy/meltano/README.md` with any changes made to the module and header docs. + +## Developing transforms + +To develop transforms, you'll need to duplicate `.env.template` as `.env` and ensure that at least these env vars are declared: + +- `SNOWFLAKE_USER` +- `SNOWFLAKE_PASSWORD` + +## Incremental build with `--defer` option + +```console +meltano invoke dbt-snowflake:seed +meltano invoke dbt-snowflake:snapshot +meltano invoke dbt-snowflake:build +``` diff --git a/data/transform/transformers.meltano.yml b/data/transform/transformers.meltano.yml index 21e91564..77f7c8e4 100644 --- a/data/transform/transformers.meltano.yml +++ b/data/transform/transformers.meltano.yml @@ -38,6 +38,7 @@ plugins: run_slack_notifications: run --select +publish.slack_notifications.* run_hubspot_publish: run --select publish.hubspot.* test_hubspot_publish: test --select publish.hubspot.* + build: run -x --defer --state=${MELTANO_PROJECT_ROOT}/.meltano/transformers/dbt/target/ config: account: epa06486 From c97129a98ad421d75855c59d59c1ce0a934813bb Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 26 Feb 2023 23:21:31 -0800 Subject: [PATCH 07/10] chore: optimize perf on snowplow events --- .../staging/snowplow/snowplow_bad_parsed.sql | 6 + .../staging/snowplow/stg_snowplow__events.sql | 188 +----------------- .../stg_snowplow__events_union_all.sql | 186 +++++++++++++++++ 3 files changed, 198 insertions(+), 182 deletions(-) create mode 100644 data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql diff --git a/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql b/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql index 05ecace4..92dcc663 100644 --- a/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql +++ b/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql @@ -1,3 +1,9 @@ +{{ + config( + materialized='table' + ) +}} + WITH reparse_1 AS ( -- Incident: new schema not registered to SnowcatCloud versions 2.14.0 and -- 2.15.0 (partial) diff --git a/data/transform/models/staging/snowplow/stg_snowplow__events.sql b/data/transform/models/staging/snowplow/stg_snowplow__events.sql index ab5672ae..72bc8e3d 100644 --- a/data/transform/models/staging/snowplow/stg_snowplow__events.sql +++ b/data/transform/models/staging/snowplow/stg_snowplow__events.sql @@ -1,39 +1,13 @@ {{ config( - materialized='incremental' + materialized='table' ) }} -WITH blended_source AS ( - - SELECT - *, - FALSE AS snowplow_bad_parsed - {% if env_var("MELTANO_ENVIRONMENT") == "cicd" %} - - FROM raw.snowplow.events - WHERE derived_tstamp::TIMESTAMP >= DATEADD('day', -3, CURRENT_DATE) - {% else %} - - FROM {{ source('snowplow', 'events') }} - - {% if is_incremental() %} - - WHERE UPLOADED_AT >= (SELECT max(UPLOADED_AT) FROM {{ this }} WHERE snowplow_bad_parsed = FALSE) - - {% endif %} - {% endif %} - UNION ALL - - SELECT - *, - TRUE AS snowplow_bad_parsed - FROM {{ ref('snowplow_bad_parsed') }} - {% if is_incremental() %} - - WHERE event_id NOT IN (SELECT DISTINCT event_id FROM {{ this }} WHERE snowplow_bad_parsed = TRUE) +WITH blended_source AS ( - {% endif %} + SELECT * + FROM {{ ref('stg_snowplow__events_union_all') }} ), @@ -44,7 +18,7 @@ source AS ( ROW_NUMBER() OVER ( PARTITION BY event_id - ORDER BY derived_tstamp::TIMESTAMP DESC + ORDER BY event_created_at::TIMESTAMP DESC ) AS row_num FROM blended_source @@ -55,158 +29,8 @@ clean_new_source AS ( SELECT * FROM source WHERE row_num = 1 - {% if is_incremental() %} - - AND event_id NOT IN ( - SELECT DISTINCT event_id FROM {{ this }} - ) - {% endif %} - -), - -renamed AS ( - - SELECT -- noqa: L034 - -- only meltano events. For the first ~6 months no app_id was - -- sent from Meltano. So nulls are from meltano. - COALESCE(app_id, 'meltano') AS app_id, - platform, - etl_tstamp::TIMESTAMP AS etl_enriched_at, - collector_tstamp::TIMESTAMP AS collector_received_at, - dvce_created_tstamp::TIMESTAMP AS device_created_at, - dvce_sent_tstamp::TIMESTAMP AS device_sent_at, - refr_dvce_tstamp::TIMESTAMP AS refr_device_processed_at, - derived_tstamp::TIMESTAMP AS event_created_at, - derived_tstamp::DATE AS event_created_date, - true_tstamp::TIMESTAMP AS true_sent_at, - uploaded_at, - event, - event_id, - txn_id, - name_tracker, - v_tracker, - v_collector, - v_etl, - user_id, - user_ipaddress, - user_fingerprint, - domain_userid, - domain_sessionidx, - network_userid, - geo_country, - geo_region, - geo_city, - geo_zipcode, - geo_latitude, - geo_longitude, - geo_region_name, - ip_isp, - ip_organization, - ip_domain, - ip_netspeed, - page_url, - page_title, - page_referrer, - page_urlscheme, - page_urlhost, - page_urlport, - page_urlpath, - page_urlquery, - page_urlfragment, - refr_urlscheme, - refr_urlhost, - refr_urlport, - refr_urlpath, - refr_urlquery, - refr_urlfragment, - refr_medium, - refr_source, - refr_term, - mkt_medium, - mkt_source, - mkt_term, - mkt_content, - mkt_campaign, - contexts, - se_category, - se_action, - se_label, - se_property, - se_value, - unstruct_event, - tr_orderid, - tr_affiliation, - tr_total, - tr_tax, - tr_shipping, - tr_city, - tr_state, - tr_country, - ti_orderid, - ti_sku, - ti_name, - ti_category, - ti_price, - ti_quantity, - pp_xoffset_min, - pp_xoffset_max, - pp_yoffset_min, - pp_yoffset_max, - useragent, - br_name, - br_family, - br_version, - br_type, - br_renderengine, - br_lang, - br_features_pdf, - br_features_flash, - br_features_java, - br_features_director, - br_features_quicktime, - br_features_realplayer, - br_features_windowsmedia, - br_features_gears, - br_features_silverlight, - br_cookies, - br_colordepth, - br_viewwidth, - br_viewheight, - os_name, - os_family, - os_manufacturer, - os_timezone, - dvce_type, - dvce_ismobile, - dvce_screenwidth, - dvce_screenheight, - doc_charset, - doc_width, - doc_height, - tr_currency, - tr_total_base, - tr_tax_base, - tr_shipping_base, - ti_currency, - ti_price_base, - base_currency, - geo_timezone, - mkt_clickid, - mkt_network, - etl_tags, - refr_domain_userid, - derived_contexts::VARIANT AS derived_contexts, - domain_sessionid, - event_vendor, - event_name, - event_format, - event_version, - event_fingerprint, - MD5(user_ipaddress) AS ip_address_hash, - snowplow_bad_parsed - FROM clean_new_source ) SELECT * -FROM renamed +FROM clean_new_source diff --git a/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql b/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql new file mode 100644 index 00000000..9ed2b61b --- /dev/null +++ b/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql @@ -0,0 +1,186 @@ +{{ + config( + materialized='incremental' + ) +}} + +WITH blended_source AS ( + + SELECT + *, + FALSE AS snowplow_bad_parsed + {% if env_var("MELTANO_ENVIRONMENT") == "cicd" %} + + FROM raw.snowplow.events + WHERE derived_tstamp::TIMESTAMP >= DATEADD('day', -3, CURRENT_DATE) + {% else %} + + FROM {{ source('snowplow', 'events') }} + + {% if is_incremental() %} + + WHERE UPLOADED_AT >= (SELECT max(UPLOADED_AT) FROM {{ this }} WHERE snowplow_bad_parsed = FALSE) + + {% endif %} + {% endif %} + + UNION ALL + + SELECT + *, + TRUE AS snowplow_bad_parsed + FROM {{ ref('snowplow_bad_parsed') }} + {% if is_incremental() %} + + WHERE event_id NOT IN (SELECT DISTINCT event_id FROM {{ this }} WHERE snowplow_bad_parsed = TRUE) + + {% endif %} + +), + +renamed AS ( + + SELECT -- noqa: L034 + -- only meltano events. For the first ~6 months no app_id was + -- sent from Meltano. So nulls are from meltano. + COALESCE(app_id, 'meltano') AS app_id, + platform, + etl_tstamp::TIMESTAMP AS etl_enriched_at, + collector_tstamp::TIMESTAMP AS collector_received_at, + dvce_created_tstamp::TIMESTAMP AS device_created_at, + dvce_sent_tstamp::TIMESTAMP AS device_sent_at, + refr_dvce_tstamp::TIMESTAMP AS refr_device_processed_at, + derived_tstamp::TIMESTAMP AS event_created_at, + derived_tstamp::DATE AS event_created_date, + true_tstamp::TIMESTAMP AS true_sent_at, + uploaded_at, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + contexts, + se_category, + se_action, + se_label, + se_property, + se_value, + unstruct_event, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + refr_domain_userid, + derived_contexts::VARIANT AS derived_contexts, + domain_sessionid, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + MD5(user_ipaddress) AS ip_address_hash, + snowplow_bad_parsed + FROM blended_source + +) + +SELECT * +FROM renamed From c640961afdea86a0843b4740da8cf4d53ae58aae Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 27 Feb 2023 09:45:16 -0800 Subject: [PATCH 08/10] chore: handle sqlfluff deprecation warning --- data/.sqlfluff | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/data/.sqlfluff b/data/.sqlfluff index e764e243..d9cf2193 100644 --- a/data/.sqlfluff +++ b/data/.sqlfluff @@ -17,7 +17,9 @@ profile = meltano tab_space_size = 4 max_line_length = 80 indent_unit = space -comma_style = trailing + +[sqlfluff:layout:type:comma] +line_position = trailing [sqlfluff:rules:L010] # Keywords capitalisation_policy = upper From 70e72fd39168958c7f73d2ee427331568f06fd0c Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 27 Feb 2023 09:45:45 -0800 Subject: [PATCH 09/10] change: default to `dbt_dev` instead of `userdev` --- data/meltano.yml | 2 +- data/transform/transformers.meltano.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data/meltano.yml b/data/meltano.yml index 829ddcda..9ad0fb0f 100644 --- a/data/meltano.yml +++ b/data/meltano.yml @@ -1,5 +1,5 @@ version: 1 -default_environment: userdev +default_environment: dbt_dev send_anonymous_usage_stats: false project_id: c15e971a-d318-4a9d-979b-1039ce5fd1b1 include_paths: diff --git a/data/transform/transformers.meltano.yml b/data/transform/transformers.meltano.yml index 77f7c8e4..792062cf 100644 --- a/data/transform/transformers.meltano.yml +++ b/data/transform/transformers.meltano.yml @@ -38,7 +38,7 @@ plugins: run_slack_notifications: run --select +publish.slack_notifications.* run_hubspot_publish: run --select publish.hubspot.* test_hubspot_publish: test --select publish.hubspot.* - build: run -x --defer --state=${MELTANO_PROJECT_ROOT}/.meltano/transformers/dbt/target/ + build: run --fail-fast --defer --state=${MELTANO_PROJECT_ROOT}/.meltano/transformers/dbt/target/ config: account: epa06486 From bc58684ce2a1785f47209b1be443d17e243a909e Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 27 Feb 2023 09:55:17 -0800 Subject: [PATCH 10/10] revert sql changes, moved to #581 --- .../staging/snowplow/snowplow_bad_parsed.sql | 6 - .../staging/snowplow/stg_snowplow__events.sql | 188 +++++++++++++++++- .../stg_snowplow__events_union_all.sql | 186 ----------------- 3 files changed, 182 insertions(+), 198 deletions(-) delete mode 100644 data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql diff --git a/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql b/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql index 92dcc663..05ecace4 100644 --- a/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql +++ b/data/transform/models/staging/snowplow/snowplow_bad_parsed.sql @@ -1,9 +1,3 @@ -{{ - config( - materialized='table' - ) -}} - WITH reparse_1 AS ( -- Incident: new schema not registered to SnowcatCloud versions 2.14.0 and -- 2.15.0 (partial) diff --git a/data/transform/models/staging/snowplow/stg_snowplow__events.sql b/data/transform/models/staging/snowplow/stg_snowplow__events.sql index 72bc8e3d..ab5672ae 100644 --- a/data/transform/models/staging/snowplow/stg_snowplow__events.sql +++ b/data/transform/models/staging/snowplow/stg_snowplow__events.sql @@ -1,13 +1,39 @@ {{ config( - materialized='table' + materialized='incremental' ) }} - WITH blended_source AS ( - SELECT * - FROM {{ ref('stg_snowplow__events_union_all') }} + SELECT + *, + FALSE AS snowplow_bad_parsed + {% if env_var("MELTANO_ENVIRONMENT") == "cicd" %} + + FROM raw.snowplow.events + WHERE derived_tstamp::TIMESTAMP >= DATEADD('day', -3, CURRENT_DATE) + {% else %} + + FROM {{ source('snowplow', 'events') }} + + {% if is_incremental() %} + + WHERE UPLOADED_AT >= (SELECT max(UPLOADED_AT) FROM {{ this }} WHERE snowplow_bad_parsed = FALSE) + + {% endif %} + {% endif %} + + UNION ALL + + SELECT + *, + TRUE AS snowplow_bad_parsed + FROM {{ ref('snowplow_bad_parsed') }} + {% if is_incremental() %} + + WHERE event_id NOT IN (SELECT DISTINCT event_id FROM {{ this }} WHERE snowplow_bad_parsed = TRUE) + + {% endif %} ), @@ -18,7 +44,7 @@ source AS ( ROW_NUMBER() OVER ( PARTITION BY event_id - ORDER BY event_created_at::TIMESTAMP DESC + ORDER BY derived_tstamp::TIMESTAMP DESC ) AS row_num FROM blended_source @@ -29,8 +55,158 @@ clean_new_source AS ( SELECT * FROM source WHERE row_num = 1 + {% if is_incremental() %} + + AND event_id NOT IN ( + SELECT DISTINCT event_id FROM {{ this }} + ) + {% endif %} + +), + +renamed AS ( + + SELECT -- noqa: L034 + -- only meltano events. For the first ~6 months no app_id was + -- sent from Meltano. So nulls are from meltano. + COALESCE(app_id, 'meltano') AS app_id, + platform, + etl_tstamp::TIMESTAMP AS etl_enriched_at, + collector_tstamp::TIMESTAMP AS collector_received_at, + dvce_created_tstamp::TIMESTAMP AS device_created_at, + dvce_sent_tstamp::TIMESTAMP AS device_sent_at, + refr_dvce_tstamp::TIMESTAMP AS refr_device_processed_at, + derived_tstamp::TIMESTAMP AS event_created_at, + derived_tstamp::DATE AS event_created_date, + true_tstamp::TIMESTAMP AS true_sent_at, + uploaded_at, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + contexts, + se_category, + se_action, + se_label, + se_property, + se_value, + unstruct_event, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + refr_domain_userid, + derived_contexts::VARIANT AS derived_contexts, + domain_sessionid, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + MD5(user_ipaddress) AS ip_address_hash, + snowplow_bad_parsed + FROM clean_new_source ) SELECT * -FROM clean_new_source +FROM renamed diff --git a/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql b/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql deleted file mode 100644 index 9ed2b61b..00000000 --- a/data/transform/models/staging/snowplow/stg_snowplow__events_union_all.sql +++ /dev/null @@ -1,186 +0,0 @@ -{{ - config( - materialized='incremental' - ) -}} - -WITH blended_source AS ( - - SELECT - *, - FALSE AS snowplow_bad_parsed - {% if env_var("MELTANO_ENVIRONMENT") == "cicd" %} - - FROM raw.snowplow.events - WHERE derived_tstamp::TIMESTAMP >= DATEADD('day', -3, CURRENT_DATE) - {% else %} - - FROM {{ source('snowplow', 'events') }} - - {% if is_incremental() %} - - WHERE UPLOADED_AT >= (SELECT max(UPLOADED_AT) FROM {{ this }} WHERE snowplow_bad_parsed = FALSE) - - {% endif %} - {% endif %} - - UNION ALL - - SELECT - *, - TRUE AS snowplow_bad_parsed - FROM {{ ref('snowplow_bad_parsed') }} - {% if is_incremental() %} - - WHERE event_id NOT IN (SELECT DISTINCT event_id FROM {{ this }} WHERE snowplow_bad_parsed = TRUE) - - {% endif %} - -), - -renamed AS ( - - SELECT -- noqa: L034 - -- only meltano events. For the first ~6 months no app_id was - -- sent from Meltano. So nulls are from meltano. - COALESCE(app_id, 'meltano') AS app_id, - platform, - etl_tstamp::TIMESTAMP AS etl_enriched_at, - collector_tstamp::TIMESTAMP AS collector_received_at, - dvce_created_tstamp::TIMESTAMP AS device_created_at, - dvce_sent_tstamp::TIMESTAMP AS device_sent_at, - refr_dvce_tstamp::TIMESTAMP AS refr_device_processed_at, - derived_tstamp::TIMESTAMP AS event_created_at, - derived_tstamp::DATE AS event_created_date, - true_tstamp::TIMESTAMP AS true_sent_at, - uploaded_at, - event, - event_id, - txn_id, - name_tracker, - v_tracker, - v_collector, - v_etl, - user_id, - user_ipaddress, - user_fingerprint, - domain_userid, - domain_sessionidx, - network_userid, - geo_country, - geo_region, - geo_city, - geo_zipcode, - geo_latitude, - geo_longitude, - geo_region_name, - ip_isp, - ip_organization, - ip_domain, - ip_netspeed, - page_url, - page_title, - page_referrer, - page_urlscheme, - page_urlhost, - page_urlport, - page_urlpath, - page_urlquery, - page_urlfragment, - refr_urlscheme, - refr_urlhost, - refr_urlport, - refr_urlpath, - refr_urlquery, - refr_urlfragment, - refr_medium, - refr_source, - refr_term, - mkt_medium, - mkt_source, - mkt_term, - mkt_content, - mkt_campaign, - contexts, - se_category, - se_action, - se_label, - se_property, - se_value, - unstruct_event, - tr_orderid, - tr_affiliation, - tr_total, - tr_tax, - tr_shipping, - tr_city, - tr_state, - tr_country, - ti_orderid, - ti_sku, - ti_name, - ti_category, - ti_price, - ti_quantity, - pp_xoffset_min, - pp_xoffset_max, - pp_yoffset_min, - pp_yoffset_max, - useragent, - br_name, - br_family, - br_version, - br_type, - br_renderengine, - br_lang, - br_features_pdf, - br_features_flash, - br_features_java, - br_features_director, - br_features_quicktime, - br_features_realplayer, - br_features_windowsmedia, - br_features_gears, - br_features_silverlight, - br_cookies, - br_colordepth, - br_viewwidth, - br_viewheight, - os_name, - os_family, - os_manufacturer, - os_timezone, - dvce_type, - dvce_ismobile, - dvce_screenwidth, - dvce_screenheight, - doc_charset, - doc_width, - doc_height, - tr_currency, - tr_total_base, - tr_tax_base, - tr_shipping_base, - ti_currency, - ti_price_base, - base_currency, - geo_timezone, - mkt_clickid, - mkt_network, - etl_tags, - refr_domain_userid, - derived_contexts::VARIANT AS derived_contexts, - domain_sessionid, - event_vendor, - event_name, - event_format, - event_version, - event_fingerprint, - MD5(user_ipaddress) AS ip_address_hash, - snowplow_bad_parsed - FROM blended_source - -) - -SELECT * -FROM renamed