Skip to content

Commit

Permalink
Add Trino integration, tests and configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
Emin UZUN committed Dec 18, 2023
1 parent 4220a9c commit 845f11e
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 7 deletions.
10 changes: 10 additions & 0 deletions integration_test_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,13 @@ dbt_artifacts:
dbname: postgres
schema: public
threads: 8
trino:
type: trino
method: none
user: trino
password: password
host: localhost
database: datalake
schema: dbt
port: 8181
threads: 8
5 changes: 5 additions & 0 deletions macros/database_specific_helpers/type_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@
{% macro bigquery__type_array() %}
array<string>
{% endmacro %}

{% macro trino__type_array() %}
array(varchar)
{% endmacro %}

35 changes: 35 additions & 0 deletions macros/upload_individual_datasets/upload_exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,38 @@
{{ return("") }}
{% endif %}
{%- endmacro %}


{% macro trino__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}

{% set exposure_values %}
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ exposure.unique_id | replace("'","''") }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ exposure.name | replace("'","''") }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ tojson(exposure.owner) | replace("'","''") }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ exposure.original_file_path }}', {# path #}
'{{ exposure.description | replace("'","''") }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
ARRAY {{ exposure.depends_on.nodes}}, {# depends_on_nodes #}
ARRAY {{ exposure.tags}}, {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(exposure) | replace("'","''") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
61 changes: 61 additions & 0 deletions macros/upload_individual_datasets/upload_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,66 @@
)
{% endset %}
{{ invocation_values }}

{% endmacro -%}

{% macro trino__get_invocations_dml_sql() -%}
{% set invocation_values %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ dbt_version }}', {# dbt_version #}
'{{ project_name }}', {# project_name #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ flags.WHICH }}', {# dbt_command #}
{{ flags.FULL_REFRESH }}, {# full_refresh_flag #}
'{{ target.profile_name }}', {# target_profile_name #}
'{{ target.name }}', {# target_name #}
'{{ target.schema }}', {# target_schema #}
{{ target.threads }}, {# target_threads #}

'{{ env_var("DBT_CLOUD_PROJECT_ID", "") }}', {# dbt_cloud_project_id #}
'{{ env_var("DBT_CLOUD_JOB_ID", "") }}', {# dbt_cloud_job_id #}
'{{ env_var("DBT_CLOUD_RUN_ID", "") }}', {# dbt_cloud_run_id #}
'{{ env_var("DBT_CLOUD_RUN_REASON_CATEGORY", "") }}', {# dbt_cloud_run_reason_category #}
'{{ env_var('DBT_CLOUD_RUN_REASON', '') | replace("'","''") }}', {# dbt_cloud_run_reason #}
{% if var('env_vars', none) %}
{% set env_vars_dict = {} %}
{% for env_variable in var('env_vars') %}
{% do env_vars_dict.update({env_variable: (env_var(env_variable, ''))}) %}
{% endfor %}
'{{ tojson(env_vars_dict) | replace("'","''") }}', {# env_vars #}
{% else %}
null, {# env_vars #}
{% endif %}

{% if var('dbt_vars', none) %}
{% set dbt_vars_dict = {} %}
{% for dbt_var in var('dbt_vars') %}
{% do dbt_vars_dict.update({dbt_var: (var(dbt_var, ''))}) %}
{% endfor %}
'{{ tojson(dbt_vars_dict) | replace("'","''") }}', {# dbt_vars #}
{% else %}
null, {# dbt_vars #}
{% endif %}
{% if invocation_args_dict.vars %}
{# vars - different format for pre v1.5 (yaml vs list) #}
{% if invocation_args_dict.vars is string %}
{% set parsed_inv_args_vars = fromyaml(invocation_args_dict.vars) %}
{% do invocation_args_dict.update({'vars': parsed_inv_args_vars}) %}
{% endif %}
{% endif %}
'{{ tojson(invocation_args_dict) | replace("'","''") }}', {# invocation_args #}

{% set metadata_env = {} %}
{% for key, value in dbt_metadata_envs.items() %}
{% do metadata_env.update({key: value}) %}
{% endfor %}
'{{ tojson(metadata_env) | replace("'","''") }}' {# dbt_custom_envs #}
)
{% endset %}
{{ invocation_values }}
{% endmacro -%}
48 changes: 48 additions & 0 deletions macros/upload_individual_datasets/upload_model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,51 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro trino__get_model_executions_dml_sql(models) -%}
{% if models != [] %}
{% set model_execution_values %}
{% for model in models -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}

{{ config_full_refresh }}, {# was_full_refresh #}
'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}TIMESTAMP '{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}TIMESTAMP '{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}

{% if model.adapter_response.rows_affected is none %}
null
{% else %}
{{ model.adapter_response.rows_affected }}
{% endif %}
, {# rows_affected #}

'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("'", "''") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("'", "''") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
35 changes: 35 additions & 0 deletions macros/upload_individual_datasets/upload_models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,38 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro trino__get_models_dml_sql(models) -%}
{% if models != [] %}
{% set model_values %}
{% for model in models -%}
{% do model.pop('raw_code', None) %}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ model.database }}', {# database #}
'{{ model.schema }}', {# schema #}
'{{ model.name }}', {# name #}
ARRAY {{ model.depends_on.nodes }}, {# depends_on_nodes #}
'{{ model.package_name }}', {# package_name #}
'{{ model.original_file_path }}', {# path #}
'{{ model.checksum.checksum }}', {# checksum #}
'{{ model.config.materialized }}', {# materialization #}
ARRAY {{ model.tags }}, {# tags #}
'{{ tojson(model.config.meta) | replace("'", "''") }}', {# meta #}
'{{ model.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(model) | replace("'", "''") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ model_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
48 changes: 48 additions & 0 deletions macros/upload_individual_datasets/upload_seed_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,51 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro trino__get_seed_executions_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_execution_values %}
{% for model in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
{{ config_full_refresh }}, {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}TIMESTAMP '{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}TIMESTAMP '{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}

{% if model.adapter_response.rows_affected is none %}
null
{% else %}
{{ model.adapter_response.rows_affected }}
{% endif %}
, {# rows_affected #}

'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("'", "''") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("'", "''") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
31 changes: 31 additions & 0 deletions macros/upload_individual_datasets/upload_seeds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,34 @@
{{ return("") }}
{% endif %}
{%- endmacro %}

{% macro trino__get_seeds_dml_sql(seeds) -%}
{% if seeds != [] %}
{% set seed_values %}
{% for seed in seeds -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ seed.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}
'{{ seed.database }}', {# database #}
'{{ seed.schema }}', {# schema #}
'{{ seed.name }}', {# name #}
'{{ seed.package_name }}', {# package_name #}
'{{ seed.original_file_path }}', {# path #}
'{{ seed.checksum.checksum }}', {# checksum #}
'{{ tojson(seed.config.meta) | replace("'", "''") }}', {# meta #}
'{{ seed.alias }}', {# alias #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(seed) | replace("'", "''") }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ seed_values }}
{% else %}
{{ return("") }}
{% endif %}
{%- endmacro %}
62 changes: 55 additions & 7 deletions macros/upload_individual_datasets/upload_snapshot_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(16)) }}
from values
{% for model in snapshots -%}
(
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
'{{ run_started_at }}', {# run_started_at #}
Expand All @@ -33,18 +33,18 @@
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}
'{{ config_full_refresh }}', {# was_full_refresh #}

'{{ model.thread_id }}', {# thread_id #}
'{{ config_full_refresh }}', {# was_full_refresh #}
'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}
null, -- rows_affected not available {# Only available in Snowflake #}
null, -- rows_affected not available {# Only available in Snowflake #}
'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
Expand Down Expand Up @@ -217,3 +217,51 @@
{{ return("") }}
{% endif %}
{% endmacro -%}

{% macro trino__get_snapshot_executions_dml_sql(snapshots) -%}
{% if snapshots != [] %}
{% set snapshot_execution_values %}
{% for model in snapshots -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ model.node.unique_id }}', {# node_id #}
TIMESTAMP '{{ run_started_at }}', {# run_started_at #}

{% set config_full_refresh = model.node.config.full_refresh %}
{% if config_full_refresh is none %}
{% set config_full_refresh = flags.FULL_REFRESH %}
{% endif %}

{{ config_full_refresh }}, {# was_full_refresh #}
'{{ model.thread_id }}', {# thread_id #}
'{{ model.status }}', {# status #}

{% set compile_started_at = (model.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %}
{% if compile_started_at %}TIMESTAMP '{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #}
{% set query_completed_at = (model.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %}
{% if query_completed_at %}TIMESTAMP '{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #}

{{ model.execution_time }}, {# total_node_runtime #}

{% if model.adapter_response.rows_affected is none %}
null
{% else %}
{{ model.adapter_response.rows_affected }}
{% endif %}
, {# rows_affected #}

'{{ model.node.config.materialized }}', {# materialization #}
'{{ model.node.schema }}', {# schema #}
'{{ model.node.name }}', {# name #}
'{{ model.node.alias }}', {# alias #}
'{{ model.message | replace("'", "''") }}', {# message #}
'{{ tojson(model.adapter_response) | replace("'", "''") }}' {# adapter_response #}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ snapshot_execution_values }}
{% else %}
{{ return("") }}
{% endif %}
{% endmacro -%}
Loading

0 comments on commit 845f11e

Please sign in to comment.