diff --git a/README.md b/README.md index 012b3307..2923c3be 100644 --- a/README.md +++ b/README.md @@ -41,17 +41,18 @@ stored login info. You can configure the AWS profile name to use via `aws_profil A dbt profile can be configured to run against AWS Athena using the following configuration: -| Option | Description | Required? | Example | -|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- | -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | -| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| Option | Description | Required? | Example | +|---------------- |-------------------------------------------------------------------------------- |----------- |-----------------------| +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | +| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | | num_retries| Number of times to retry a failing query | Optional | `3` | `5` + **Example profiles.yml entry:** ```yaml athena: diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index ce524def..482dc2c6 100644 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -39,7 +39,6 @@ def convert_datetime_type( def s3_uuid_table_location(self): conn = self.connections.get_thread_connection() client = conn.handle - return f"{client.s3_staging_dir}tables/{str(uuid4())}/" @available @@ -68,6 +67,10 @@ def clean_up_partitions( s3_bucket = s3_resource.Bucket(bucket_name) s3_bucket.objects.filter(Prefix=prefix).delete() + @available + def unique_temp_table_suffix(self, suffix_initial="__dbt_tmp", length=8): + return f"{suffix_initial}_{str(uuid4())[:length]}" + @available def clean_up_table( self, database_name: str, table_name: str @@ -96,4 +99,3 @@ def clean_up_table( s3_resource = boto3.resource('s3', region_name=client.region_name) s3_bucket = s3_resource.Bucket(bucket_name) s3_bucket.objects.filter(Prefix=prefix).delete() - diff --git a/dbt/include/athena/macros/materializations/incremental.sql b/dbt/include/athena/macros/materializations/incremental.sql index 8c82d1e9..9eddf48a 100644 --- a/dbt/include/athena/macros/materializations/incremental.sql +++ b/dbt/include/athena/macros/materializations/incremental.sql @@ -13,6 +13,18 @@ {% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set tmp_columns = adapter.get_columns_in_relation(tmp_relation) -%} + {%- set dest_column_names = dest_columns | map(attribute='quoted') | list -%} + {%- set tmp_columns_names = tmp_columns | map(attribute='quoted') | list -%} + {% if dest_column_names != tmp_columns_names %} + {% set columns_mismatch_err_msg -%} + Schema for incremental model does not match schema for the destination table {{target_relation}}: + Incremental model schema : {{ tmp_columns_names }} + Destination table schema: {{ dest_column_names }} + Please update the destination table schema! + {%- endset %} + {% do exceptions.raise_not_implemented(columns_mismatch_err_msg) %} + {% endif %} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} insert into {{ target_relation }} ({{ dest_cols_csv }}) @@ -69,7 +81,8 @@ {% set partitioned_by = config.get('partitioned_by', default=none) %} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} + {% set temp_table_suffix = adapter.unique_temp_table_suffix() %} + {% set tmp_relation = make_temp_relation(this, suffix=temp_table_suffix) %} {{ run_hooks(pre_hooks, inside_transaction=False) }} @@ -83,7 +96,7 @@ {% do adapter.drop_relation(existing_relation) %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% elif partitioned_by is not none and strategy == 'insert_overwrite' %} - {% set tmp_relation = make_temp_relation(target_relation) %} + {% set tmp_relation = make_temp_relation(target_relation, suffix=temp_table_suffix) %} {% if tmp_relation is not none %} {% do adapter.drop_relation(tmp_relation) %} {% endif %} @@ -92,7 +105,7 @@ {% set build_sql = incremental_insert(tmp_relation, target_relation) %} {% do to_drop.append(tmp_relation) %} {% else %} - {% set tmp_relation = make_temp_relation(target_relation) %} + {% set tmp_relation = make_temp_relation(target_relation, suffix=temp_table_suffix) %} {% if tmp_relation is not none %} {% do adapter.drop_relation(tmp_relation) %} {% endif %}