Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[ADAP-492] Support partition_by and cluster_by on python models (#680)" #1010

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@

{% endmaterialization %}

-- TODO dataproc requires a temp bucket to perform BQ write
-- this is hard coded to internal testing ATM. need to adjust to render
-- or find another way around
{% macro py_write_table(compiled_code, target_relation) %}
from pyspark.sql import SparkSession
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) %}

spark = SparkSession.builder.appName('smallTest').getOrCreate()

Expand Down Expand Up @@ -109,15 +109,6 @@ else:
df.write \
.mode("overwrite") \
.format("bigquery") \
.option("writeMethod", "indirect").option("writeDisposition", 'WRITE_TRUNCATE') \
{%- if partition_config.data_type | lower in ('date','timestamp','datetime') %}
.option("partitionField", "{{- partition_config.field -}}") \
{%- if partition_config.granularity is not none %}
.option("partitionType", "{{- partition_config.granularity -}}") \
{%- endif %}
{%- endif %}
{%- if raw_cluster_by is not none %}
.option("clusteredFields", "{{- raw_cluster_by|join(',') -}}") \
{%- endif %}
.option("writeMethod", "direct").option("writeDisposition", 'WRITE_TRUNCATE') \
.save("{{target_relation}}")
{% endmacro %}
95 changes: 0 additions & 95 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,101 +54,6 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['test', 'test2'])
"""

macro__partition_count_sql = """
{% test number_partitions(model, expected) %}

{%- set result = get_partitions_metadata(model) %}

{% if result %}
{% set partitions = result.columns['partition_id'].values() %}
{% else %}
{% set partitions = () %}
{% endif %}

{% set actual = partitions | length %}
{% set success = 1 if model and actual == expected else 0 %}

select 'Expected {{ expected }}, but got {{ actual }}' as validation_error
from (select true)
where {{ success }} = 0

{% endtest %}
"""

models__partitioned_model_python = """
import pandas as pd

def model(dbt, spark):
dbt.config(
materialized='table',
partition_by={
"field": "C",
"data_type": "timestamp",
"granularity": "day",
},
cluster_by=["A"],
)
random_array = [
["A", -157.9871329592354],
["B", -528.9769041860632],
["B", 941.0504221837489],
["B", 919.5903586746183],
["A", -121.25678519054622],
["A", 254.9985130814921],
["A", 833.2963094260072],
]

df = pd.DataFrame(random_array, columns=["A", "B"])

df["C"] = pd.to_datetime('now')

final_df = df[["A", "B", "C"]]

return final_df
"""

models__partitioned_model_yaml = """
models:
- name: python_partitioned_model
description: A random table with a calculated column defined in python.
config:
batch_id: '{{ run_started_at.strftime("%Y-%m-%d-%H-%M-%S") }}-python-partitioned'
tests:
- number_partitions:
expected: "{{ var('expected', 1) }}"
columns:
- name: A
description: Column A
- name: B
description: Column B
- name: C
description: Column C
"""


class TestPythonPartitionedModels:
@pytest.fixture(scope="class")
def macros(self):
return {"partition_metadata.sql": macro__partition_count_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"python_partitioned_model.py": models__partitioned_model_python,
"python_partitioned_model.yml": models__partitioned_model_yaml,
}

def test_multiple_named_python_models(self, project):
result = run_dbt(["run"])
assert len(result) == 1

test_results = run_dbt(["test"])
for result in test_results:
assert result.status == "pass"
assert not result.skipped
assert result.failures == 0


models__simple_python_model_v2 = """
import pandas

Expand Down