From 1262221a2c4657ff2539b8f2165e2aca8da394d2 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Fri, 13 Sep 2024 10:47:06 -0700 Subject: [PATCH] Skip any missing table in asset configuration (#2141) --- warehouse/oso_dagster/factories/sql.py | 106 +++++++++++++------------ 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/warehouse/oso_dagster/factories/sql.py b/warehouse/oso_dagster/factories/sql.py index 187a7e1f6..2d1ad3aeb 100644 --- a/warehouse/oso_dagster/factories/sql.py +++ b/warehouse/oso_dagster/factories/sql.py @@ -1,52 +1,46 @@ -from typing import ( - TypedDict, - Dict, - Any, - cast, - Sequence, - Optional, - List, - Iterable, - NotRequired, - Callable, -) -from dagster import AssetKey +import logging +import typing as t + import dlt -from dlt.pipeline.pipeline import Pipeline -from dlt.extract.resource import DltResource -from dlt.sources.credentials import ConnectionStringCredentials, GcpServiceAccountCredentials +from dagster import AssetExecutionContext, AssetKey +from dagster_embedded_elt.dlt import (DagsterDltResource, DagsterDltTranslator, + dlt_assets) +from dlt.common.schema.typing import TWriteDispositionConfig from dlt.destinations import bigquery +from dlt.extract.resource import DltResource +from dlt.pipeline.pipeline import Pipeline +from dlt.sources.credentials import (ConnectionStringCredentials, + GcpServiceAccountCredentials) from sqlalchemy import MetaData, Table -from dagster import AssetExecutionContext -from dagster_embedded_elt.dlt import ( - DagsterDltTranslator, - dlt_assets, - DagsterDltResource, -) - -from ..dlt_sources.sql_database import sql_table, TableBackend -from .common import early_resources_asset_factory, AssetFactoryResponse, AssetList +from sqlalchemy.exc import NoSuchTableError + +from ..dlt_sources.sql_database import TableBackend, sql_table from ..utils.secrets import SecretReference, SecretResolver +from .common import (AssetFactoryResponse, AssetList, + early_resources_asset_factory) +logger = logging.getLogger(__name__) -class SQLTableOptions(TypedDict): + +class SQLTableOptions(t.TypedDict): """Typed dict for the input to the sql_table function from the sql_database dlt verified source""" table: str - schema: NotRequired[str] - metadata: NotRequired[MetaData] - incremental: NotRequired[dlt.sources.incremental[Any]] - chunk_size: NotRequired[int] - backend: NotRequired[TableBackend] - detect_precision_hints: NotRequired[bool] - defer_table_reflect: NotRequired[bool] - table_adapter_callback: NotRequired[Callable[[Table], None]] - backend_kwargs: NotRequired[Dict[str, Any]] + schema: t.NotRequired[str] + metadata: t.NotRequired[MetaData] + incremental: t.NotRequired[dlt.sources.incremental[t.Any]] + chunk_size: t.NotRequired[int] + backend: t.NotRequired[TableBackend] + detect_precision_hints: t.NotRequired[bool] + defer_table_reflect: t.NotRequired[bool] + table_adapter_callback: t.NotRequired[t.Callable[[Table], None]] + backend_kwargs: t.NotRequired[t.Dict[str, t.Any]] class TopLevelSQLTableOptions(SQLTableOptions): - destination_table_name: NotRequired[str] + destination_table_name: t.NotRequired[str] + write_disposition: t.NotRequired[TWriteDispositionConfig] def _generate_asset_for_table( @@ -61,7 +55,7 @@ def _source(): destination_table_name = table_options.get("destination_table_name") if destination_table_name: del table_options["destination_table_name"] - table = cast(SQLTableOptions, table_options) + table = t.cast(SQLTableOptions, table_options) resource = sql_table(credentials, **table) if destination_table_name: resource.table_name = destination_table_name @@ -74,7 +68,12 @@ def _source(): dlt_dagster_translator=translator, ) def _asset(context: AssetExecutionContext, dlt: DagsterDltResource): - yield from dlt.run(context=context, loader_file_format="jsonl") + kwargs = {} + write_disposition = table_options.get("write_disposition") + if write_disposition: + kwargs['write_disposition'] = write_disposition + + yield from dlt.run(context=context, loader_file_format="jsonl", **kwargs) return _asset @@ -82,7 +81,7 @@ def _asset(context: AssetExecutionContext, dlt: DagsterDltResource): def sql_assets( source_name: str, source_credential_reference: SecretReference, - sql_tables: List[TopLevelSQLTableOptions], + sql_tables: t.List[TopLevelSQLTableOptions], group_name: str = "", environment: str = "production", asset_type: str = "source" @@ -127,13 +126,16 @@ def factory( staging=dlt_staging_destination, progress="log", ) - asset_def = _generate_asset_for_table( - source_name, credentials, pipeline, table, translator - ) - assets.append( - asset_def - ) - + try: + asset_def = _generate_asset_for_table( + source_name, credentials, pipeline, table, translator + ) + assets.append( + asset_def + ) + except NoSuchTableError: + logger.error(f'Failed to load table "{table["table"]}" for source "{source_name}"') + return AssetFactoryResponse(assets=assets) return factory @@ -145,27 +147,27 @@ class PrefixedDltTranslator(DagsterDltTranslator): def __init__( self, source_name: str, - tags: Dict[str, str], - prefix: Optional[Sequence[str]] = None, + tags: t.Dict[str, str], + prefix: t.Optional[t.Sequence[str]] = None, include_deps: bool = False, ): - self._prefix = prefix or cast(Sequence[str], []) + self._prefix = prefix or t.cast(t.Sequence[str], []) self._source_name = source_name self._include_deps = include_deps self._tags = tags.copy() def get_asset_key(self, resource: DltResource) -> AssetKey: - key: List[str] = [] + key: t.List[str] = [] key.extend(self._prefix) key.append(self._source_name) key.append(resource.name) return AssetKey(key) - def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]: + def get_deps_asset_keys(self, resource: DltResource) -> t.Iterable[AssetKey]: """We don't include the source here in the graph. It's not a necessary stub to represent""" if not self._include_deps: return [] - key: List[str] = [] + key: t.List[str] = [] key.extend(self._prefix) key.append(self._source_name) key.append("sources")