Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for inserts via GCP cloud function and pub/sub #670

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions macros/edr/tests/on_run_end/handle_tests_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,73 @@
{% endif %}

{%- set temp_relation = elementary.make_temp_view_relation(target_relation) -%}
{% set insert_query %}
INSERT INTO {{ target_relation }} (
id,
full_table_name,
column_name,
metric_name,
metric_value,
source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
updated_at,
dimension,
dimension_value,
metric_properties,
created_at
)
SELECT
id,
full_table_name,
column_name,
metric_name,
metric_value,
source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
updated_at,
dimension,
dimension_value,
metric_properties,
{{ elementary.edr_current_timestamp() }} as created_at
FROM {{ temp_relation }}
{% endset %}

{% if elementary.get_config_var("insert_rows_method") == "gcp-cloud-function" %}
{{ elementary.file_log("Sending to UDF") }}
{% set log_udf_name = "`" + elementary.get_config_var("insert_rows_udf") + "`" %}
{% set insert_query %}
SELECT {{ log_udf_name }} (
'{{ elementary.get_config_var("insert_rows_topics")["data_monitoring_metrics"] }}',
TO_JSON_STRING(
STRUCT(
id,
COALESCE(full_table_name, '') AS full_table_name,
COALESCE(column_name, '') AS column_name,
COALESCE(metric_name, '') AS metric_name,
metric_value,
COALESCE(source_value, '') AS source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
updated_at,
COALESCE(dimension, '') AS dimension,
COALESCE(dimension_value, '') AS dimension_value,
COALESCE(metric_properties, '') AS metric_properties,
{{ elementary.edr_current_timestamp() }} as created_at
)
),
'{}'
)
FROM {{ temp_relation }}
;
{% endset %}

{% else %}
{% set insert_query %}
INSERT INTO {{ target_relation }} (
id,
full_table_name,
column_name,
metric_name,
metric_value,
source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
updated_at,
dimension,
dimension_value,
metric_properties,
created_at
)
SELECT
id,
full_table_name,
column_name,
metric_name,
metric_value,
source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
updated_at,
dimension,
dimension_value,
metric_properties,
{{ elementary.edr_current_timestamp() }} as created_at
FROM {{ temp_relation }}
{% endset %}
{% endif %}

{{ elementary.file_log("Inserting metrics into {}.".format(target_relation)) }}
{%- do elementary.run_query(dbt.create_table_as(True, temp_relation, test_tables_union_query)) %}
Expand Down
102 changes: 101 additions & 1 deletion macros/utils/table_operations/insert_rows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,30 @@

{{ elementary.file_log('Inserting {} rows to table {}'.format(rows | length, table_relation)) }}
{% set insert_rows_method = elementary.get_config_var('insert_rows_method') %}
{% if insert_rows_method == 'max_query_size' %}

{# If using a GCP Cloud Function, verify we have defined a Cloud Function for it. #}
{% if insert_rows_method == "gcp-cloud-function" %}
{% set table_name = "{}".format(table_relation).split('.')[-1].replace('`', '') %}
{% if table_name not in elementary.get_config_var("insert_rows_topics") %}
{# Revert to another method. #}
{% set insert_rows_method = 'max_query_size' %}
{% endif %}
{% endif %}

{% if insert_rows_method == "gcp-cloud-function" %}
{{ elementary.file_log("Sending to UDF") }}

{% set topic_name = elementary.get_config_var("insert_rows_topics")[table_name] %}
{% set insert_rows_queries = elementary.get_cloud_function_query(topic_name, columns, rows) %}

{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
{% do elementary.file_log("[{}/{}] Sending INSERT to UDF.".format(loop.index, queries_len)) %}
{% do elementary.file_log("UDF CALL: {}".format(insert_query)) %}
{% do elementary.run_query(insert_query) %}
{% endfor %}

{% elif insert_rows_method == 'max_query_size' %}
{% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, on_query_exceed=on_query_exceed) %}
{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
Expand Down Expand Up @@ -107,6 +130,21 @@
{% do return(row_sql) %}
{% endmacro %}

{% macro render_row_with_name_to_sql(row, columns) %}
{% set rendered_column_values = [] %}
{% for column in columns %}
{% if column.name.lower() == "created_at" %}
{% set column_value = elementary.edr_current_timestamp() %}
{% do rendered_column_values.append("{} AS created_at".format(column_value)) %}
{% else %}
{% set column_value = elementary.insensitive_get_dict_value(row, column.name) %}
{% do rendered_column_values.append("{} AS {}".format(elementary.render_value(column_value), column.name)) %}
{% endif %}
{% endfor %}
{% set row_sql = "{}".format(rendered_column_values | join(",")) %}
{% do return(row_sql) %}
{% endmacro %}

{% macro get_chunk_insert_query(table_relation, columns, rows) -%}
{% set insert_rows_query %}
insert into {{ table_relation }}
Expand All @@ -124,6 +162,68 @@
{{ return(insert_rows_query) }}
{%- endmacro %}

{% macro get_cloud_function_query(topic_name, columns, rows) -%}
{% set udf_name = "`" + elementary.get_config_var("insert_rows_udf") + "`" %}

{% if not query_max_size %}
{% set query_max_size = elementary.get_config_var('query_max_size') %}
{% endif %}

{% set insert_queries = [] %}
{% set base_insert_query %}
SELECT {{ udf_name }} (
'{{ topic_name }}',
TO_JSON_STRING(
STRUCT(
{%- for column in columns -%}
{{- column.name -}} {{- "," if not loop.last else "" -}}
{%- endfor -%}
)
),
'{}'
)
FROM (
{% endset %}

{% set current_query = namespace(data=base_insert_query) %}
{% for row in rows %}
{% set row_sql = "SELECT " + elementary.render_row_with_name_to_sql(row, columns) %}
{% set query_with_row = current_query.data + (" UNION ALL " if not loop.first else "") + row_sql %}

{% if query_with_row | length > query_max_size %}
{% set new_insert_query = base_insert_query + row_sql %}

{# Check if row is too large to fit into an insert query. #}
{% if new_insert_query | length > query_max_size %}
{% if on_query_exceed %}
{% do on_query_exceed(row) %}
{% set row_sql = "SELECT " + elementary.render_row_with_name_to_sql(row, columns) %}
{% set new_insert_query = base_insert_query + row_sql %}
{% endif %}

{% if new_insert_query | length > query_max_size %}
{% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %}
{% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %}
{% endif %}
{% endif %}

{% if current_query.data != base_insert_query %}
{% do insert_queries.append(current_query.data + ')') %}
{% endif %}
{% set current_query.data = new_insert_query %}

{% else %}
{% set current_query.data = query_with_row %}
{% endif %}

{% if loop.last %}
{% do insert_queries.append(current_query.data + ')') %}
{% endif %}
{% endfor %}

{{ return(insert_queries) }}
{%- endmacro %}

{% macro escape_special_chars(string_value) %}
{{ return(adapter.dispatch('escape_special_chars', 'elementary')(string_value)) }}
{% endmacro %}
Expand Down