Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bm/add-qualify-statement-to-ensure-uniqueness #169

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions models/query_direct_object_access.sql
Original file line number Diff line number Diff line change
@@ -1,49 +1,65 @@
{{ config(
materialized='incremental',
unique_key=['_unique_id', 'query_start_time'],
) }}
{{
config(
materialized="incremental",
unique_key=["_unique_id", "query_start_time"],
)
}}

with
access_history as (
select *
from {{ ref('stg_access_history') }}
access_history as (
select *
from {{ ref("stg_access_history") }}

{% if is_incremental() %}
where query_start_time > (select coalesce(dateadd('day', -1, max(query_start_time)), '1970-01-01') from {{ this }})
{% endif %}
{% if is_incremental() %}
where
query_start_time > (
select
coalesce(
dateadd('day', -1, max(query_start_time)), '1970-01-01'
)
from {{ this }}
)
{% endif %}

),
),

access_history_flattened as (
select
access_history.query_id,
access_history.query_start_time,
access_history.user_name,
objects_accessed.value:objectId::integer as table_id, -- will be null for secured views or tables from a data share
objects_accessed.value:objectName::text as object_name,
objects_accessed.value:objectDomain::text as object_domain,
objects_accessed.value:columns as columns_array
access_history_flattened as (
select
access_history.query_id,
access_history.query_start_time,
access_history.user_name,
objects_accessed.value:objectid::integer as table_id, -- will be null for secured views or tables from a data share
objects_accessed.value:objectname::text as object_name,
objects_accessed.value:objectdomain::text as object_domain,
objects_accessed.value:columns as columns_array

from access_history, lateral flatten(access_history.direct_objects_accessed) as objects_accessed
),
from
access_history,
lateral flatten(access_history.direct_objects_accessed) as objects_accessed
),

access_history_flattened_w_columns as (
select
access_history_flattened.query_id,
access_history_flattened.query_start_time,
access_history_flattened.user_name,
access_history_flattened.table_id,
access_history_flattened.object_name,
access_history_flattened.object_domain,
array_agg(distinct columns.value:columnName::text) as columns_accessed
from access_history_flattened, lateral flatten(access_history_flattened.columns_array) as columns
where
access_history_flattened.object_name is not null
group by 1, 2, 3, 4, 5, 6
)
access_history_flattened_w_columns as (
select
access_history_flattened.query_id,
access_history_flattened.query_start_time,
access_history_flattened.user_name,
access_history_flattened.table_id,
access_history_flattened.object_name,
access_history_flattened.object_domain,
array_agg(distinct columns.value:columnname::text) as columns_accessed
from
access_history_flattened,
lateral flatten(access_history_flattened.columns_array) as columns
where access_history_flattened.object_name is not null
group by 1, 2, 3, 4, 5, 6
)

select
md5(concat(query_id, object_name)) as _unique_id,
*
select md5(concat(query_id, object_name)) as _unique_id, *
from access_history_flattened_w_columns
qualify -- added by affinaquest to ensure uniqueness
row_number() over (
partition by md5(concat(query_id, object_name)), query_start_time
order by query_start_time asc
)
= 1
order by query_start_time asc