From 1b8af327f46bf34c3f79d6c1890e6ce0d226c46c Mon Sep 17 00:00:00 2001 From: Mike Gouline <1960272+gouline@users.noreply.github.com> Date: Fri, 9 Dec 2022 20:57:43 +1100 Subject: [PATCH] Re-enable pre-sync and raise on model export failure (#154) * fix: re-enable pre-sync and return error when something failed * Update actions * Fix type annotations --- .github/workflows/master.yml | 6 +-- .github/workflows/pull_request.yml | 6 +-- dbtmetabase/metabase.py | 81 ++++++++++++++++++++---------- dbtmetabase/models/exceptions.py | 4 ++ dbtmetabase/models/interface.py | 8 ++- 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index b70382f7..dbdb4109 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -10,11 +10,11 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v4 with: python-version: "3.6.x" diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 7e9700a9..0d44a419 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -7,11 +7,11 @@ on: jobs: validate: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v4 with: python-version: "3.6.x" diff --git a/dbtmetabase/metabase.py b/dbtmetabase/metabase.py index 3b99eb0a..b8c10ba4 100644 --- a/dbtmetabase/metabase.py +++ b/dbtmetabase/metabase.py @@ -10,7 +10,6 @@ Mapping, MutableMapping, Optional, - Sequence, Union, ) @@ -120,9 +119,11 @@ def __init__( host: str, user: str, password: str, + verify: Optional[Union[str, bool]] = None, + session_id: Optional[str] = None, use_http: bool = False, - verify: Union[str, bool] = None, - session_id: str = None, + sync: Optional[bool] = True, + sync_timeout: Optional[int] = None, exclude_sources: bool = False, ): """Constructor. @@ -136,6 +137,8 @@ def __init__( use_http {bool} -- Use HTTP instead of HTTPS. (default: {False}) verify {Union[str, bool]} -- Path to certificate or disable verification. (default: {None}) session_id {str} -- Metabase session ID. (default: {None}) + sync (bool, optional): Attempt to synchronize Metabase schema with local models. Defaults to True. + sync_timeout (Optional[int], optional): Synchronization timeout (in secs). Defaults to None. exclude_sources {bool} -- Exclude exporting sources. (default: {False}) """ self.base_url = f"{'http' if use_http else 'https'}://{host}" @@ -146,6 +149,8 @@ def __init__( session_header = session_id or self.get_session_id(user, password) self.session.headers["X-Metabase-Session"] = session_header + self.sync = sync + self.sync_timeout = sync_timeout self.exclude_sources = exclude_sources self.collections: Iterable = [] self.tables: Iterable = [] @@ -161,6 +166,8 @@ def __init__( ) self.metadata = self._Metadata() + self._synced_models: Optional[List[MetabaseModel]] = None + logger().info(":ok_hand: Session established successfully") def get_session_id(self, user: str, password: str) -> str: @@ -183,15 +190,13 @@ def get_session_id(self, user: str, password: str) -> str: def sync_and_wait( self, database: str, - models: Sequence, - timeout: Optional[int], + models: List[MetabaseModel], ) -> bool: """Synchronize with the database and wait for schema compatibility. Arguments: database {str} -- Metabase database name. models {list} -- List of dbt models read from project. - timeout {int} -- Timeout before giving up in seconds. Returns: bool -- True if schema compatible with models, false if still incompatible. @@ -202,10 +207,8 @@ def sync_and_wait( - the database cannot be found - a timeout was provided but sync was unsuccessful """ - allow_sync_failure = False - if not timeout: - timeout = 30 - allow_sync_failure = True + + timeout = self.sync_timeout if self.sync_timeout else 30 if timeout < self._SYNC_PERIOD_SECS: raise exceptions.MetabaseUnableToSync( @@ -232,13 +235,17 @@ def sync_and_wait( time.sleep(self._SYNC_PERIOD_SECS) else: break - if not sync_successful and not allow_sync_failure: + + if sync_successful: + self._synced_models = models.copy() + elif self.sync: raise exceptions.MetabaseUnableToSync( "Unable to align models between dbt target models and Metabase" ) + return sync_successful - def models_compatible(self, models: Sequence[MetabaseModel]) -> bool: + def models_compatible(self, models: List[MetabaseModel]) -> bool: """Checks if models compatible with the Metabase database schema. Arguments: @@ -281,7 +288,7 @@ def models_compatible(self, models: Sequence[MetabaseModel]) -> bool: def export_models( self, database: str, - models: Sequence[MetabaseModel], + models: List[MetabaseModel], aliases, ): """Exports dbt models to Metabase database schema. @@ -292,19 +299,17 @@ def export_models( aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables """ - if not self.metadata: - database_id = self.find_database_id(database) - if not database_id: - logger().critical("Cannot find database by name %s", database) - return - self.metadata = self.build_metadata(database_id) + success = True + + if not self.metadata or self._synced_models != models: + self.sync_and_wait(database, models) for model in models: if model.model_type == ModelType.sources and self.exclude_sources: logger().info(":fast_forward: Skipping %s source", model.unique_id) continue - self.export_model(model, aliases) + success &= self.export_model(model, aliases) for update in self.metadata.pop_updates(): self.api( @@ -318,14 +323,24 @@ def export_models( update["id"], ) - def export_model(self, model: MetabaseModel, aliases: dict): + if not success: + raise exceptions.MetabaseRuntimeError( + "Model export encountered non-critical errors, check output" + ) + + def export_model(self, model: MetabaseModel, aliases: dict) -> bool: """Exports one dbt model to Metabase database schema. Arguments: model {dict} -- One dbt model read from project. aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables + + Returns: + bool -- True if exported successfully, false if there were errors. """ + success = True + schema_name = model.schema.upper() model_name = model.name.upper() @@ -336,7 +351,7 @@ def export_model(self, model: MetabaseModel, aliases: dict): logger().error( ":cross_mark: Table %s does not exist in Metabase", lookup_key ) - return + return False # Empty strings not accepted by Metabase model_display_name = model.display_name or None @@ -369,7 +384,9 @@ def export_model(self, model: MetabaseModel, aliases: dict): logger().info(":thumbs_up: Table %s is up-to-date", lookup_key) for column in model.columns: - self.export_column(schema_name, model_name, column, aliases) + success &= self.export_column(schema_name, model_name, column, aliases) + + return success def export_column( self, @@ -377,24 +394,31 @@ def export_column( model_name: str, column: MetabaseColumn, aliases: dict, - ): + ) -> bool: """Exports one dbt column to Metabase database schema. Arguments: model_name {str} -- One dbt model name read from project. column {dict} -- One dbt column read from project. aliases {dict} -- Provided by reader class. Used to resolve FK refs against relations to aliased source tables + + Returns: + bool -- True if exported successfully, false if there were errors. """ + success = True + table_lookup_key = f"{schema_name}.{model_name}" column_name = column.name.upper() api_field = self.metadata.get_field(table_lookup_key, column_name) if not api_field: logger().error( - "Field %s.%s does not exist in Metabase", table_lookup_key, column_name + ":cross_mark: Field %s.%s does not exist in Metabase", + table_lookup_key, + column_name, ) - return + return False if "special_type" in api_field: semantic_type_key = "special_type" @@ -467,6 +491,7 @@ def export_column( target_table, target_field, ) + success = False # Empty strings not accepted by Metabase column_description = column.description or None @@ -526,6 +551,8 @@ def export_column( ":thumbs_up: Field %s.%s is up-to-date", model_name, column_name ) + return success + def find_database_id(self, name: str) -> Optional[str]: """Finds Metabase database ID by name. @@ -577,7 +604,7 @@ def extract_exposures( output_path: str = ".", output_name: str = "metabase_exposures", include_personal_collections: bool = True, - collection_excludes: Iterable = None, + collection_excludes: Optional[Iterable] = None, ) -> Mapping: """Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader diff --git a/dbtmetabase/models/exceptions.py b/dbtmetabase/models/exceptions.py index ced095c8..df805dd3 100644 --- a/dbtmetabase/models/exceptions.py +++ b/dbtmetabase/models/exceptions.py @@ -8,3 +8,7 @@ class NoDbtSchemaSupplied(Exception): class MetabaseUnableToSync(Exception): """Thrown when Metabase cannot sync / align models with dbt model""" + + +class MetabaseRuntimeError(Exception): + """Thrown when Metabase execution failed.""" diff --git a/dbtmetabase/models/interface.py b/dbtmetabase/models/interface.py index 68c73ff1..7a75f3db 100644 --- a/dbtmetabase/models/interface.py +++ b/dbtmetabase/models/interface.py @@ -88,16 +88,14 @@ def prepare_metabase_client(self, dbt_models: Optional[List[MetabaseModel]] = No use_http=self.use_http, verify=self.verify, session_id=self.session_id, + sync=self.sync, + sync_timeout=self.sync_timeout, exclude_sources=self.exclude_sources, ) # Sync and attempt schema alignment prior to execution; if timeout is not explicitly set, proceed regardless of success if self.sync: - self._client.sync_and_wait( - self.database, - dbt_models, - self.sync_timeout, - ) + self._client.sync_and_wait(self.database, dbt_models) class DbtInterface: