Skip to content

Commit

Permalink
Skip any missing table in asset configuration (#2141)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 authored Sep 13, 2024
1 parent 135608d commit 1262221
Showing 1 changed file with 54 additions and 52 deletions.
106 changes: 54 additions & 52 deletions warehouse/oso_dagster/factories/sql.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,46 @@
from typing import (
TypedDict,
Dict,
Any,
cast,
Sequence,
Optional,
List,
Iterable,
NotRequired,
Callable,
)
from dagster import AssetKey
import logging
import typing as t

import dlt
from dlt.pipeline.pipeline import Pipeline
from dlt.extract.resource import DltResource
from dlt.sources.credentials import ConnectionStringCredentials, GcpServiceAccountCredentials
from dagster import AssetExecutionContext, AssetKey
from dagster_embedded_elt.dlt import (DagsterDltResource, DagsterDltTranslator,
dlt_assets)
from dlt.common.schema.typing import TWriteDispositionConfig
from dlt.destinations import bigquery
from dlt.extract.resource import DltResource
from dlt.pipeline.pipeline import Pipeline
from dlt.sources.credentials import (ConnectionStringCredentials,
GcpServiceAccountCredentials)
from sqlalchemy import MetaData, Table
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import (
DagsterDltTranslator,
dlt_assets,
DagsterDltResource,
)

from ..dlt_sources.sql_database import sql_table, TableBackend
from .common import early_resources_asset_factory, AssetFactoryResponse, AssetList
from sqlalchemy.exc import NoSuchTableError

from ..dlt_sources.sql_database import TableBackend, sql_table
from ..utils.secrets import SecretReference, SecretResolver
from .common import (AssetFactoryResponse, AssetList,
early_resources_asset_factory)

logger = logging.getLogger(__name__)

class SQLTableOptions(TypedDict):

class SQLTableOptions(t.TypedDict):
"""Typed dict for the input to the sql_table function from the sql_database
dlt verified source"""

table: str
schema: NotRequired[str]
metadata: NotRequired[MetaData]
incremental: NotRequired[dlt.sources.incremental[Any]]
chunk_size: NotRequired[int]
backend: NotRequired[TableBackend]
detect_precision_hints: NotRequired[bool]
defer_table_reflect: NotRequired[bool]
table_adapter_callback: NotRequired[Callable[[Table], None]]
backend_kwargs: NotRequired[Dict[str, Any]]
schema: t.NotRequired[str]
metadata: t.NotRequired[MetaData]
incremental: t.NotRequired[dlt.sources.incremental[t.Any]]
chunk_size: t.NotRequired[int]
backend: t.NotRequired[TableBackend]
detect_precision_hints: t.NotRequired[bool]
defer_table_reflect: t.NotRequired[bool]
table_adapter_callback: t.NotRequired[t.Callable[[Table], None]]
backend_kwargs: t.NotRequired[t.Dict[str, t.Any]]


class TopLevelSQLTableOptions(SQLTableOptions):
destination_table_name: NotRequired[str]
destination_table_name: t.NotRequired[str]
write_disposition: t.NotRequired[TWriteDispositionConfig]


def _generate_asset_for_table(
Expand All @@ -61,7 +55,7 @@ def _source():
destination_table_name = table_options.get("destination_table_name")
if destination_table_name:
del table_options["destination_table_name"]
table = cast(SQLTableOptions, table_options)
table = t.cast(SQLTableOptions, table_options)
resource = sql_table(credentials, **table)
if destination_table_name:
resource.table_name = destination_table_name
Expand All @@ -74,15 +68,20 @@ def _source():
dlt_dagster_translator=translator,
)
def _asset(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context, loader_file_format="jsonl")
kwargs = {}
write_disposition = table_options.get("write_disposition")
if write_disposition:
kwargs['write_disposition'] = write_disposition

yield from dlt.run(context=context, loader_file_format="jsonl", **kwargs)

return _asset


def sql_assets(
source_name: str,
source_credential_reference: SecretReference,
sql_tables: List[TopLevelSQLTableOptions],
sql_tables: t.List[TopLevelSQLTableOptions],
group_name: str = "",
environment: str = "production",
asset_type: str = "source"
Expand Down Expand Up @@ -127,13 +126,16 @@ def factory(
staging=dlt_staging_destination,
progress="log",
)
asset_def = _generate_asset_for_table(
source_name, credentials, pipeline, table, translator
)
assets.append(
asset_def
)

try:
asset_def = _generate_asset_for_table(
source_name, credentials, pipeline, table, translator
)
assets.append(
asset_def
)
except NoSuchTableError:
logger.error(f'Failed to load table "{table["table"]}" for source "{source_name}"')

return AssetFactoryResponse(assets=assets)

return factory
Expand All @@ -145,27 +147,27 @@ class PrefixedDltTranslator(DagsterDltTranslator):
def __init__(
self,
source_name: str,
tags: Dict[str, str],
prefix: Optional[Sequence[str]] = None,
tags: t.Dict[str, str],
prefix: t.Optional[t.Sequence[str]] = None,
include_deps: bool = False,
):
self._prefix = prefix or cast(Sequence[str], [])
self._prefix = prefix or t.cast(t.Sequence[str], [])
self._source_name = source_name
self._include_deps = include_deps
self._tags = tags.copy()

def get_asset_key(self, resource: DltResource) -> AssetKey:
key: List[str] = []
key: t.List[str] = []
key.extend(self._prefix)
key.append(self._source_name)
key.append(resource.name)
return AssetKey(key)

def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
def get_deps_asset_keys(self, resource: DltResource) -> t.Iterable[AssetKey]:
"""We don't include the source here in the graph. It's not a necessary stub to represent"""
if not self._include_deps:
return []
key: List[str] = []
key: t.List[str] = []
key.extend(self._prefix)
key.append(self._source_name)
key.append("sources")
Expand Down

0 comments on commit 1262221

Please sign in to comment.