Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.7.latest] Fix Broken Python Models #1015

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231108-171128.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix broken partition config granularity and batch_id being set to None
time: 2023-11-08T17:11:28.819877-08:00
custom:
Author: colin-rogers-dbt
Issue: "1006"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231109-095012.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Upgrade spark-bigquery Java deps for serverless to 2.13-0.34.0
time: 2023-11-09T09:50:12.252774-08:00
custom:
Author: colin-rogers-dbt
Issue: "1006"
78 changes: 58 additions & 20 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ jobs:
- 'dbt/**'
- 'tests/**'
- 'dev-requirements.txt'
bigquery-python:
- 'dbt/adapters/bigquery/dataproc/**'
- 'dbt/adapters/bigquery/python_submissions.py'
- 'dbt/include/bigquery/python_model/**'

- name: Generate integration test matrix
id: generate-matrix
Expand Down Expand Up @@ -192,21 +188,6 @@ jobs:
GCS_BUCKET: dbt-ci
run: tox -- --ddtrace

# python models tests are slow so we only want to run them if we're changing them
- name: Run tox (python models)
if: needs.test-metadata.outputs.run-python-tests == 'true'
env:
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }}
BIGQUERY_TEST_ALT_DATABASE: ${{ secrets.BIGQUERY_TEST_ALT_DATABASE }}
BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ secrets.BIGQUERY_TEST_NO_ACCESS_DATABASE }}
DBT_TEST_USER_1: group:[email protected]
DBT_TEST_USER_2: group:[email protected]
DBT_TEST_USER_3: serviceAccount:[email protected]
DATAPROC_REGION: us-central1
DATAPROC_CLUSTER_NAME: dbt-test-1
GCS_BUCKET: dbt-ci
run: tox -e python-tests -- --ddtrace

- uses: actions/upload-artifact@v3
if: always()
with:
Expand All @@ -225,10 +206,67 @@ jobs:
name: integration_results_${{ matrix.python-version }}_${{ matrix.os }}_${{ matrix.adapter }}-${{ steps.date.outputs.date }}.csv
path: integration_results.csv

# python integration tests are slow so we only run them seperately and for a single OS / python version
test-python:
name: "test-python"
needs: test-metadata
runs-on: ubuntu-latest
if: >-
needs.test-metadata.outputs.matrix &&
fromJSON( needs.test-metadata.outputs.matrix ).include[0] &&
(
github.event_name != 'pull_request_target' ||
github.event.pull_request.head.repo.full_name == github.repository ||
contains(github.event.pull_request.labels.*.name, 'ok to test')
)

steps:
- name: Check out the repository
if: github.event_name != 'pull_request_target'
uses: actions/checkout@v3
with:
persist-credentials: false

# explicitly checkout the branch for the PR,
# this is necessary for the `pull_request_target` event
- name: Check out the repository (PR)
if: github.event_name == 'pull_request_target'
uses: actions/checkout@v3
with:
persist-credentials: false
ref: ${{ github.event.pull_request.head.sha }}

- name: Set up Python 3.8
uses: actions/setup-python@v4
with:
python-version: "3.8"

- name: Install python dependencies
run: |
python -m pip install --user --upgrade pip
python -m pip install tox
python -m pip --version
tox --version

- name: Run tox (python models)
env:
BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }}
BIGQUERY_TEST_ALT_DATABASE: ${{ secrets.BIGQUERY_TEST_ALT_DATABASE }}
BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ secrets.BIGQUERY_TEST_NO_ACCESS_DATABASE }}
DBT_TEST_USER_1: group:[email protected]
DBT_TEST_USER_2: group:[email protected]
DBT_TEST_USER_3: serviceAccount:[email protected]
DATAPROC_REGION: us-central1
DATAPROC_CLUSTER_NAME: dbt-test-1
GCS_BUCKET: dbt-ci
run: tox -e python-tests -- --ddtrace

require-label-comment:
runs-on: ubuntu-latest

needs: test
needs:
- test
- test-python

permissions:
pull-requests: write
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/bigquery/dataproc/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dbt.adapters.bigquery.connections import DataprocBatchConfig

_BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING]
DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar"
DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.34.0.jar"


def create_batch_request(
Expand Down
9 changes: 8 additions & 1 deletion dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Union

from dbt.events import AdapterLogger

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE

Expand All @@ -17,6 +19,7 @@
)

OPERATION_RETRY_TIME = 10
logger = AdapterLogger("BigQuery")


class BaseDataProcHelper(PythonJobHelper):
Expand Down Expand Up @@ -122,10 +125,14 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
)

def _get_batch_id(self) -> str:
return self.parsed_model["config"].get("batch_id")
model = self.parsed_model
default_batch_id = model["unique_id"].replace(".", "-").replace("_", "-")
default_batch_id += str(int(model["created_at"]))
return model["config"].get("batch_id", default_batch_id)

def _submit_dataproc_job(self) -> Batch:
batch_id = self._get_batch_id()
logger.info(f"Submitting batch job with id: {batch_id}")
request = create_batch_request(
batch=self._configure_batch(),
batch_id=batch_id,
Expand Down
6 changes: 4 additions & 2 deletions dbt/include/bigquery/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,16 @@ df.write \
.mode("overwrite") \
.format("bigquery") \
.option("writeMethod", "indirect").option("writeDisposition", 'WRITE_TRUNCATE') \
{%- if partition_config is not none %}
{%- if partition_config.data_type | lower in ('date','timestamp','datetime') %}
.option("partitionField", "{{- partition_config.field -}}") \
{%- if partition_config.granularity is not none %}
.option("partitionType", "{{- partition_config.granularity -}}") \
.option("partitionType", "{{- partition_config.granularity| upper -}}") \
{%- endif %}
{%- endif %}
{%- endif %}
{%- if raw_cluster_by is not none %}
.option("clusteredFields", "{{- raw_cluster_by|join(',') -}}") \
.option("clusteredFields", "{{- raw_cluster_by | join(',') -}}") \
{%- endif %}
.save("{{target_relation}}")
{% endmacro %}
1 change: 1 addition & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def model(dbt, spark):
"""


@pytest.mark.skip(reason="Currently failing as run_started_at is the same across dbt runs")
class TestPythonBatchIdModels:
@pytest.fixture(scope="class")
def models(self):
Expand Down