Skip to content

Commit

Permalink
Merge branch '1.10.latest' of https://github.com/databricks/dbt-datab…
Browse files Browse the repository at this point in the history
…ricks into testBranch
  • Loading branch information
eric-wang-1990 committed Jan 28, 2025
2 parents e68d4d8 + ea2b52d commit 1529e17
Show file tree
Hide file tree
Showing 38 changed files with 1,708 additions and 941 deletions.
27 changes: 25 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,29 @@

- Introduced use_materialization_v2 flag for gating materialization revamps. ([844](https://github.com/databricks/dbt-databricks/pull/844))

## dbt-databricks 1.9.0 (TBD)
### Under the Hood

- Implement new constraint logic for use_materialization_v2 flag ([846](https://github.com/databricks/dbt-databricks/pull/846/files)), ([876](https://github.com/databricks/dbt-databricks/pull/876))

## dbt-databricks 1.9.2 (TBD)

### Under the Hood

- Refactor global state reading ([888](https://github.com/databricks/dbt-databricks/pull/888))

## dbt-databricks 1.9.1 (December 16, 2024)

### Features

- Merge strategy now supports the `update set ...` action with the explicit list of updates for `when not matched by source` ([866](https://github.com/databricks/dbt-databricks/pull/866)) (thanks @mi-volodin).

### Under the Hood

- Removed pins for pandas and pydantic to ease user burdens ([874](https://github.com/databricks/dbt-databricks/pull/874))
- Add more relation types to make codegen happy ([875](https://github.com/databricks/dbt-databricks/pull/875))
- add UP ruleset ([865](https://github.com/databricks/dbt-databricks/pull/865))

## dbt-databricks 1.9.0 (December 9, 2024)

### Features

Expand All @@ -39,6 +61,7 @@

- Replace array indexing with 'get' in split_part so as not to raise exception when indexing beyond bounds ([839](https://github.com/databricks/dbt-databricks/pull/839))
- Set queue enabled for Python notebook jobs ([856](https://github.com/databricks/dbt-databricks/pull/856))
- Ensure columns that are added get backticked ([859](https://github.com/databricks/dbt-databricks/pull/859))

### Under the Hood

Expand All @@ -49,7 +72,7 @@
- Prepare for python typing deprecations ([837](https://github.com/databricks/dbt-databricks/pull/837))
- Fix behavior flag use in init of DatabricksAdapter (thanks @VersusFacit!) ([836](https://github.com/databricks/dbt-databricks/pull/836))
- Restrict pydantic to V1 per dbt Labs' request ([843](https://github.com/databricks/dbt-databricks/pull/843))
- Switching to Ruff for formatting and linting ([847](https://github.com/databricks/dbt-databricks/pull/847))
- Switching to Ruff for formatting and linting ([847](https://github.com/databricks/dbt-databricks/pull/847)
- Switching to Hatch and pyproject.toml for project config ([853](https://github.com/databricks/dbt-databricks/pull/853))

## dbt-databricks 1.8.7 (October 10, 2024)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.10.0a1"
version = "1.10.0-alpha1"
57 changes: 56 additions & 1 deletion dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def _poll_api(


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class CommandExecution(object):
class CommandExecution:
command_id: str
context_id: str
cluster_id: str
Expand Down Expand Up @@ -459,6 +459,60 @@ def run(self, job_id: str, enable_queueing: bool = True) -> str:
return response_json["run_id"]


class DltPipelineApi(PollableApi):
def __init__(self, session: Session, host: str, polling_interval: int):
super().__init__(session, host, "/api/2.0/pipelines", polling_interval, 60 * 60)

def poll_for_completion(self, pipeline_id: str) -> None:
self._poll_api(
url=f"/{pipeline_id}",
params={},
get_state_func=lambda response: response.json()["state"],
terminal_states={"IDLE", "FAILED", "DELETED"},
expected_end_state="IDLE",
unexpected_end_state_func=self._get_exception,
)

def _get_exception(self, response: Response) -> None:
response_json = response.json()
cause = response_json.get("cause")
if cause:
raise DbtRuntimeError(f"Pipeline {response_json.get('pipeline_id')} failed: {cause}")
else:
latest_update = response_json.get("latest_updates")[0]
last_error = self.get_update_error(response_json.get("pipeline_id"), latest_update)
raise DbtRuntimeError(
f"Pipeline {response_json.get('pipeline_id')} failed: {last_error}"
)

def get_update_error(self, pipeline_id: str, update_id: str) -> str:
response = self.session.get(f"/{pipeline_id}/events")
if response.status_code != 200:
raise DbtRuntimeError(
f"Error getting pipeline event info for {pipeline_id}: {response.text}"
)

events = response.json().get("events", [])
update_events = [
e
for e in events
if e.get("event_type", "") == "update_progress"
and e.get("origin", {}).get("update_id") == update_id
]

error_events = [
e
for e in update_events
if e.get("details", {}).get("update_progress", {}).get("state", "") == "FAILED"
]

msg = ""
if error_events:
msg = error_events[0].get("message", "")

return msg


class DatabricksApiClient:
def __init__(
self,
Expand All @@ -480,6 +534,7 @@ def __init__(
self.job_runs = JobRunsApi(session, host, polling_interval, timeout)
self.workflows = WorkflowJobApi(session, host)
self.workflow_permissions = JobPermissionsApi(session, host)
self.dlt_pipelines = DltPipelineApi(session, host, polling_interval)

@staticmethod
def create(
Expand Down
40 changes: 38 additions & 2 deletions dbt/adapters/databricks/column.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from dataclasses import dataclass
from typing import ClassVar, Optional
from typing import Any, ClassVar, Optional

from dbt.adapters.databricks.utils import quote
from dbt.adapters.spark.column import SparkColumn


@dataclass
class DatabricksColumn(SparkColumn):
table_comment: Optional[str] = None
comment: Optional[str] = None
not_null: Optional[bool] = None

TYPE_LABELS: ClassVar[dict[str, str]] = {
"LONG": "BIGINT",
Expand All @@ -26,5 +28,39 @@ def create(cls, name: str, label_or_dtype: str) -> "DatabricksColumn":
def data_type(self) -> str:
return self.translate_type(self.dtype)

def enrich(self, model_column: dict[str, Any], not_null: bool) -> "DatabricksColumn":
"""Create a copy that incorporates model column metadata, including constraints."""

data_type = model_column.get("data_type") or self.dtype
enriched_column = DatabricksColumn.create(self.name, data_type)
if model_column.get("description"):
enriched_column.comment = model_column["description"]

enriched_column.not_null = not_null
return enriched_column

def render_for_create(self) -> str:
"""Renders the column for building a create statement."""
column_str = f"{self.name} {self.dtype}"
if self.not_null:
column_str += " NOT NULL"
if self.comment:
comment = self.comment.replace("'", "\\'")
column_str += f" COMMENT '{comment}'"
return column_str

def __repr__(self) -> str:
return "<DatabricksColumn {} ({})>".format(self.name, self.data_type)
return f"<DatabricksColumn {self.name} ({self.data_type})>"

@staticmethod
def get_name(column: dict[str, Any]) -> str:
name = column["name"]
return quote(name) if column.get("quote", False) else name

@staticmethod
def format_remove_column_list(columns: list["DatabricksColumn"]) -> str:
return ", ".join([quote(c.name) for c in columns])

@staticmethod
def format_add_column_list(columns: list["DatabricksColumn"]) -> str:
return ", ".join([f"{quote(c.name)} {c.data_type}" for c in columns])
Loading

0 comments on commit 1529e17

Please sign in to comment.