Skip to content

Commit

Permalink
Re-enable pre-sync and raise on model export failure (#154)
Browse files Browse the repository at this point in the history
* fix: re-enable pre-sync and return error when something failed

* Update actions

* Fix type annotations
  • Loading branch information
gouline authored Dec 9, 2022
1 parent f71b492 commit 1b8af32
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 38 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ on:

jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: "3.6.x"

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ on:

jobs:
validate:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: "3.6.x"

Expand Down
81 changes: 54 additions & 27 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
Mapping,
MutableMapping,
Optional,
Sequence,
Union,
)

Expand Down Expand Up @@ -120,9 +119,11 @@ def __init__(
host: str,
user: str,
password: str,
verify: Optional[Union[str, bool]] = None,
session_id: Optional[str] = None,
use_http: bool = False,
verify: Union[str, bool] = None,
session_id: str = None,
sync: Optional[bool] = True,
sync_timeout: Optional[int] = None,
exclude_sources: bool = False,
):
"""Constructor.
Expand All @@ -136,6 +137,8 @@ def __init__(
use_http {bool} -- Use HTTP instead of HTTPS. (default: {False})
verify {Union[str, bool]} -- Path to certificate or disable verification. (default: {None})
session_id {str} -- Metabase session ID. (default: {None})
sync (bool, optional): Attempt to synchronize Metabase schema with local models. Defaults to True.
sync_timeout (Optional[int], optional): Synchronization timeout (in secs). Defaults to None.
exclude_sources {bool} -- Exclude exporting sources. (default: {False})
"""
self.base_url = f"{'http' if use_http else 'https'}://{host}"
Expand All @@ -146,6 +149,8 @@ def __init__(
session_header = session_id or self.get_session_id(user, password)
self.session.headers["X-Metabase-Session"] = session_header

self.sync = sync
self.sync_timeout = sync_timeout
self.exclude_sources = exclude_sources
self.collections: Iterable = []
self.tables: Iterable = []
Expand All @@ -161,6 +166,8 @@ def __init__(
)
self.metadata = self._Metadata()

self._synced_models: Optional[List[MetabaseModel]] = None

logger().info(":ok_hand: Session established successfully")

def get_session_id(self, user: str, password: str) -> str:
Expand All @@ -183,15 +190,13 @@ def get_session_id(self, user: str, password: str) -> str:
def sync_and_wait(
self,
database: str,
models: Sequence,
timeout: Optional[int],
models: List[MetabaseModel],
) -> bool:
"""Synchronize with the database and wait for schema compatibility.
Arguments:
database {str} -- Metabase database name.
models {list} -- List of dbt models read from project.
timeout {int} -- Timeout before giving up in seconds.
Returns:
bool -- True if schema compatible with models, false if still incompatible.
Expand All @@ -202,10 +207,8 @@ def sync_and_wait(
- the database cannot be found
- a timeout was provided but sync was unsuccessful
"""
allow_sync_failure = False
if not timeout:
timeout = 30
allow_sync_failure = True

timeout = self.sync_timeout if self.sync_timeout else 30

if timeout < self._SYNC_PERIOD_SECS:
raise exceptions.MetabaseUnableToSync(
Expand All @@ -232,13 +235,17 @@ def sync_and_wait(
time.sleep(self._SYNC_PERIOD_SECS)
else:
break
if not sync_successful and not allow_sync_failure:

if sync_successful:
self._synced_models = models.copy()
elif self.sync:
raise exceptions.MetabaseUnableToSync(
"Unable to align models between dbt target models and Metabase"
)

return sync_successful

def models_compatible(self, models: Sequence[MetabaseModel]) -> bool:
def models_compatible(self, models: List[MetabaseModel]) -> bool:
"""Checks if models compatible with the Metabase database schema.
Arguments:
Expand Down Expand Up @@ -281,7 +288,7 @@ def models_compatible(self, models: Sequence[MetabaseModel]) -> bool:
def export_models(
self,
database: str,
models: Sequence[MetabaseModel],
models: List[MetabaseModel],
aliases,
):
"""Exports dbt models to Metabase database schema.
Expand All @@ -292,19 +299,17 @@ def export_models(
aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables
"""

if not self.metadata:
database_id = self.find_database_id(database)
if not database_id:
logger().critical("Cannot find database by name %s", database)
return
self.metadata = self.build_metadata(database_id)
success = True

if not self.metadata or self._synced_models != models:
self.sync_and_wait(database, models)

for model in models:
if model.model_type == ModelType.sources and self.exclude_sources:
logger().info(":fast_forward: Skipping %s source", model.unique_id)
continue

self.export_model(model, aliases)
success &= self.export_model(model, aliases)

for update in self.metadata.pop_updates():
self.api(
Expand All @@ -318,14 +323,24 @@ def export_models(
update["id"],
)

def export_model(self, model: MetabaseModel, aliases: dict):
if not success:
raise exceptions.MetabaseRuntimeError(
"Model export encountered non-critical errors, check output"
)

def export_model(self, model: MetabaseModel, aliases: dict) -> bool:
"""Exports one dbt model to Metabase database schema.
Arguments:
model {dict} -- One dbt model read from project.
aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables
Returns:
bool -- True if exported successfully, false if there were errors.
"""

success = True

schema_name = model.schema.upper()
model_name = model.name.upper()

Expand All @@ -336,7 +351,7 @@ def export_model(self, model: MetabaseModel, aliases: dict):
logger().error(
":cross_mark: Table %s does not exist in Metabase", lookup_key
)
return
return False

# Empty strings not accepted by Metabase
model_display_name = model.display_name or None
Expand Down Expand Up @@ -369,32 +384,41 @@ def export_model(self, model: MetabaseModel, aliases: dict):
logger().info(":thumbs_up: Table %s is up-to-date", lookup_key)

for column in model.columns:
self.export_column(schema_name, model_name, column, aliases)
success &= self.export_column(schema_name, model_name, column, aliases)

return success

def export_column(
self,
schema_name: str,
model_name: str,
column: MetabaseColumn,
aliases: dict,
):
) -> bool:
"""Exports one dbt column to Metabase database schema.
Arguments:
model_name {str} -- One dbt model name read from project.
column {dict} -- One dbt column read from project.
aliases {dict} -- Provided by reader class. Used to resolve FK refs against relations to aliased source tables
Returns:
bool -- True if exported successfully, false if there were errors.
"""

success = True

table_lookup_key = f"{schema_name}.{model_name}"
column_name = column.name.upper()

api_field = self.metadata.get_field(table_lookup_key, column_name)
if not api_field:
logger().error(
"Field %s.%s does not exist in Metabase", table_lookup_key, column_name
":cross_mark: Field %s.%s does not exist in Metabase",
table_lookup_key,
column_name,
)
return
return False

if "special_type" in api_field:
semantic_type_key = "special_type"
Expand Down Expand Up @@ -467,6 +491,7 @@ def export_column(
target_table,
target_field,
)
success = False

# Empty strings not accepted by Metabase
column_description = column.description or None
Expand Down Expand Up @@ -526,6 +551,8 @@ def export_column(
":thumbs_up: Field %s.%s is up-to-date", model_name, column_name
)

return success

def find_database_id(self, name: str) -> Optional[str]:
"""Finds Metabase database ID by name.
Expand Down Expand Up @@ -577,7 +604,7 @@ def extract_exposures(
output_path: str = ".",
output_name: str = "metabase_exposures",
include_personal_collections: bool = True,
collection_excludes: Iterable = None,
collection_excludes: Optional[Iterable] = None,
) -> Mapping:
"""Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader
Expand Down
4 changes: 4 additions & 0 deletions dbtmetabase/models/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ class NoDbtSchemaSupplied(Exception):

class MetabaseUnableToSync(Exception):
"""Thrown when Metabase cannot sync / align models with dbt model"""


class MetabaseRuntimeError(Exception):
"""Thrown when Metabase execution failed."""
8 changes: 3 additions & 5 deletions dbtmetabase/models/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,14 @@ def prepare_metabase_client(self, dbt_models: Optional[List[MetabaseModel]] = No
use_http=self.use_http,
verify=self.verify,
session_id=self.session_id,
sync=self.sync,
sync_timeout=self.sync_timeout,
exclude_sources=self.exclude_sources,
)

# Sync and attempt schema alignment prior to execution; if timeout is not explicitly set, proceed regardless of success
if self.sync:
self._client.sync_and_wait(
self.database,
dbt_models,
self.sync_timeout,
)
self._client.sync_and_wait(self.database, dbt_models)


class DbtInterface:
Expand Down

0 comments on commit 1b8af32

Please sign in to comment.