From a5bfdfa9fbf2fd120b0082147caaf57c1d32a66a Mon Sep 17 00:00:00 2001 From: Mike Gouline <1960272+gouline@users.noreply.github.com> Date: Fri, 15 Dec 2023 14:42:58 +1100 Subject: [PATCH] Major interface and Metabase client refactor (#196) * Refactor MetabaseInterface into MetabaseClient * Refactor interfaces, dbt readers and Metabase client * Major refactor of interfaces, DbtReader and MetabaseClient * CLI parameters for MetabaseClient * Refactor Metabase functionality into jobs * Fix up tests --- dbtmetabase/__init__.py | 5 +- dbtmetabase/__main__.py | 363 +++++++- dbtmetabase/cli.py | 417 --------- .../{parsers/dbt_manifest.py => dbt.py} | 348 ++++++-- dbtmetabase/metabase.py | 832 ++++++++---------- dbtmetabase/models/__init__.py | 0 dbtmetabase/models/exceptions.py | 18 - dbtmetabase/models/interface.py | 239 ----- dbtmetabase/models/metabase.py | 84 -- dbtmetabase/parsers/__init__.py | 0 dbtmetabase/parsers/dbt.py | 125 --- dbtmetabase/parsers/dbt_folder.py | 238 ----- setup.py | 2 +- tests/__init__.py | 2 +- .../exposure/baseline_test_exposures.yml | 2 +- tests/fixtures/mock_api/api/database/2.json | 65 +- tests/{test_dbt_parsers.py => test_dbt.py} | 290 +----- tests/test_metabase.py | 89 +- tests/utils_mb_test_suite.py | 17 +- 19 files changed, 1115 insertions(+), 2021 deletions(-) delete mode 100644 dbtmetabase/cli.py rename dbtmetabase/{parsers/dbt_manifest.py => dbt.py} (59%) delete mode 100644 dbtmetabase/models/__init__.py delete mode 100644 dbtmetabase/models/exceptions.py delete mode 100644 dbtmetabase/models/interface.py delete mode 100644 dbtmetabase/models/metabase.py delete mode 100644 dbtmetabase/parsers/__init__.py delete mode 100644 dbtmetabase/parsers/dbt.py delete mode 100644 dbtmetabase/parsers/dbt_folder.py rename tests/{test_dbt_parsers.py => test_dbt.py} (50%) diff --git a/dbtmetabase/__init__.py b/dbtmetabase/__init__.py index 70b3980e..580eaad4 100644 --- a/dbtmetabase/__init__.py +++ b/dbtmetabase/__init__.py @@ -1,9 +1,10 @@ import importlib.metadata import logging -from .models.interface import DbtInterface, MetabaseInterface +from .dbt import DbtReader +from .metabase import MetabaseClient -__all__ = ["MetabaseInterface", "DbtInterface"] +__all__ = ["DbtReader", "MetabaseClient"] try: __version__ = importlib.metadata.version("dbt-metabase") diff --git a/dbtmetabase/__main__.py b/dbtmetabase/__main__.py index b745cef1..b06c67a4 100644 --- a/dbtmetabase/__main__.py +++ b/dbtmetabase/__main__.py @@ -1,3 +1,362 @@ -from .cli import cli +import functools +import logging +from pathlib import Path +from typing import Callable, Iterable, List, Optional, Union -cli() # pylint: disable=no-value-for-parameter +import click +import yaml +from typing_extensions import cast + +from .dbt import DbtReader +from .logger import logging as package_logger +from .metabase import MetabaseClient + + +def _comma_separated_list_callback( + ctx: click.Context, + param: click.Option, + value: Union[str, List[str]], +) -> Optional[List[str]]: + """Click callback for handling comma-separated lists.""" + + if value is None: + return None + + assert ( + param.type == click.UNPROCESSED or param.type.name == "list" + ), "comma-separated list options must be of type UNPROCESSED or list" + + if ctx.get_parameter_source(str(param.name)) in ( + click.core.ParameterSource.DEFAULT, + click.core.ParameterSource.DEFAULT_MAP, + ) and isinstance(value, list): + # Lists in defaults (config or option) should be lists + return value + + elif isinstance(value, str): + str_value = value + if isinstance(value, list): + # When type=list, string value will be a list of chars + str_value = "".join(value) + else: + raise click.BadParameter("must be comma-separated list") + + return str_value.split(",") + + +@click.group() +@click.version_option(package_name="dbt-metabase") +@click.option( + "--config-path", + default="~/.dbt-metabase/config.yml", + show_default=True, + type=click.Path(), + help="Path to config.yml file with default values.", +) +@click.pass_context +def cli(ctx: click.Context, config_path: str): + group = cast(click.Group, ctx.command) + + config_path_expanded = Path(config_path).expanduser() + if config_path_expanded.exists(): + with open(config_path_expanded, "r", encoding="utf-8") as f: + config = yaml.safe_load(f).get("config", {}) + # Propagate root configs to all commands + ctx.default_map = {command: config for command in group.commands} + + +def _add_setup(func: Callable) -> Callable: + """Add common options and create DbtReader and MetabaseClient.""" + + @click.option( + "--dbt-manifest-path", + envvar="DBT_MANIFEST_PATH", + show_envvar=True, + required=True, + type=click.Path(exists=True, dir_okay=False), + help="Path to dbt manifest.json file under /target/ in the dbt project directory. Uses dbt manifest parsing (recommended).", + ) + @click.option( + "--dbt-database", + metavar="DATABASE", + envvar="DBT_DATABASE", + show_envvar=True, + required=True, + type=click.STRING, + help="Target database name in dbt models.", + ) + @click.option( + "--dbt-schema", + metavar="SCHEMA", + envvar="DBT_SCHEMA", + show_envvar=True, + help="Target dbt schema. Must be passed if using project reader.", + type=click.STRING, + ) + @click.option( + "--dbt-schema-excludes", + metavar="SCHEMAS", + envvar="DBT_SCHEMA_EXCLUDES", + show_envvar=True, + type=click.UNPROCESSED, + callback=_comma_separated_list_callback, + help="Target dbt schemas to exclude. Ignored in project parser.", + ) + @click.option( + "--dbt-includes", + metavar="MODELS", + envvar="DBT_INCLUDES", + show_envvar=True, + type=click.UNPROCESSED, + callback=_comma_separated_list_callback, + help="Include specific dbt models names.", + ) + @click.option( + "--dbt-excludes", + metavar="MODELS", + envvar="DBT_EXCLUDES", + show_envvar=True, + type=click.UNPROCESSED, + callback=_comma_separated_list_callback, + help="Exclude specific dbt model names.", + ) + @click.option( + "--metabase-url", + metavar="URL", + envvar="MB_URL", + show_envvar=True, + required=True, + type=click.STRING, + help="Metabase URL, including protocol and excluding trailing slash.", + ) + @click.option( + "--metabase-username", + metavar="USERNAME", + envvar="METABASE_USERNAME", + show_envvar=True, + type=click.STRING, + help="Metabase username.", + ) + @click.option( + "--metabase-password", + metavar="PASSWORD", + envvar="METABASE_PASSWORD", + show_envvar=True, + type=click.STRING, + help="Metabase password.", + ) + @click.option( + "--metabase-session-id", + metavar="TOKEN", + envvar="METABASE_SESSION_ID", + show_envvar=True, + type=click.STRING, + help="Metabase session ID.", + ) + @click.option( + "--metabase-verify/--metabase-verify-skip", + "metabase_verify", + envvar="METABASE_VERIFY", + show_envvar=True, + default=True, + help="Verify the TLS certificate at the Metabase end.", + ) + @click.option( + "--metabase-cert", + metavar="CERT", + envvar="METABASE_CERT", + show_envvar=True, + type=click.Path(exists=True, dir_okay=False), + help="Path to certificate bundle used to connect to Metabase.", + ) + @click.option( + "--metabase-timeout", + metavar="SECS", + envvar="METABASE_TIMEOUT", + show_envvar=True, + type=click.INT, + default=15, + show_default=True, + help="Metabase API HTTP timeout in seconds.", + ) + @click.option( + "-v", + "--verbose", + is_flag=True, + help="Enable verbose logging.", + ) + @functools.wraps(func) + def wrapper( + metabase_url: str, + metabase_username: str, + metabase_password: str, + dbt_database: str, + dbt_manifest_path: str, + dbt_schema: Optional[str], + dbt_schema_excludes: Optional[Iterable], + dbt_includes: Optional[Iterable], + dbt_excludes: Optional[Iterable], + metabase_session_id: Optional[str], + metabase_verify: bool, + metabase_cert: Optional[str], + metabase_http_timeout: int, + verbose: bool, + **kwargs, + ): + if verbose: + package_logger.LOGGING_LEVEL = logging.DEBUG + + return func( + dbt_reader=DbtReader( + manifest_path=dbt_manifest_path, + database=dbt_database, + schema=dbt_schema, + schema_excludes=dbt_schema_excludes, + includes=dbt_includes, + excludes=dbt_excludes, + ), + metabase_client=MetabaseClient( + url=metabase_url, + username=metabase_username, + password=metabase_password, + session_id=metabase_session_id, + verify=metabase_verify, + cert=metabase_cert, + http_timeout=metabase_http_timeout, + ), + **kwargs, + ) + + return wrapper + + +@cli.command(help="Export dbt models to Metabase.") +@_add_setup +@click.option( + "--dbt-docs-url", + metavar="URL", + envvar="DBT_DOCS_URL", + show_envvar=True, + type=click.STRING, + help="URL for dbt docs to be appended to table descriptions in Metabase.", +) +@click.option( + "--dbt-include-tags", + envvar="DBT_INCLUDE_TAGS", + show_envvar=True, + is_flag=True, + help="Append tags to table descriptions in Metabase.", +) +@click.option( + "--metabase-database", + metavar="DATABASE", + envvar="METABASE_DATABASE", + show_envvar=True, + required=True, + type=click.STRING, + help="Target database name in Metabase.", +) +@click.option( + "--metabase-sync/--metabase-sync-skip", + "metabase_sync", + envvar="METABASE_SYNC", + show_envvar=True, + default=True, + show_default=True, + help="Attempt to synchronize Metabase schema with local models.", +) +@click.option( + "--metabase-sync-timeout", + metavar="SECS", + envvar="METABASE_SYNC_TIMEOUT", + show_envvar=True, + default=30, + type=click.INT, + help="Synchronization timeout in secs. When set, command fails on failed synchronization. Otherwise, command proceeds regardless. Only valid if sync is enabled.", +) +@click.option( + "--metabase-exclude-sources", + envvar="METABASE_EXCLUDE_SOURCES", + show_envvar=True, + is_flag=True, + help="Skip exporting sources to Metabase.", +) +def models( + dbt_docs_url: Optional[str], + dbt_include_tags: bool, + metabase_database: str, + metabase_sync_timeout: int, + metabase_exclude_sources: bool, + dbt_reader: DbtReader, + metabase_client: MetabaseClient, +): + dbt_models = dbt_reader.read_models( + include_tags=dbt_include_tags, + docs_url=dbt_docs_url, + ) + metabase_client.export_models( + database=metabase_database, + models=dbt_models, + exclude_sources=metabase_exclude_sources, + sync_timeout=metabase_sync_timeout, + ) + + +@cli.command(help="Export dbt exposures to Metabase.") +@_add_setup +@click.option( + "--output-path", + envvar="OUTPUT_PATH", + show_envvar=True, + type=click.Path(exists=True, file_okay=False), + default=".", + show_default=True, + help="Output path for generated exposure YAML.", +) +@click.option( + "--output-name", + metavar="NAME", + envvar="OUTPUT_NAME", + show_envvar=True, + type=click.STRING, + default="metabase_exposures.yml", + show_default=True, + help="File name for generated exposure YAML.", +) +@click.option( + "--metabase-include-personal-collections", + envvar="METABASE_INCLUDE_PERSONAL_COLLECTIONS", + show_envvar=True, + is_flag=True, + help="Include personal collections when parsing exposures.", +) +@click.option( + "--metabase-collection-excludes", + metavar="COLLECTIONS", + envvar="METABASE_COLLECTION_EXCLUDES", + show_envvar=True, + type=click.UNPROCESSED, + callback=_comma_separated_list_callback, + help="Metabase collection names to exclude.", +) +def exposures( + output_path: str, + output_name: str, + metabase_include_personal_collections: bool, + metabase_collection_excludes: Optional[Iterable], + dbt_reader: DbtReader, + metabase_client: MetabaseClient, +): + dbt_models = dbt_reader.read_models() + metabase_client.extract_exposures( + models=dbt_models, + output_path=output_path, + output_name=output_name, + include_personal_collections=metabase_include_personal_collections, + collection_excludes=metabase_collection_excludes, + ) + + +if __name__ == "__main__": + # Executed when running locally via python3 -m dbtmetabase + cli() # pylint: disable=no-value-for-parameter diff --git a/dbtmetabase/cli.py b/dbtmetabase/cli.py deleted file mode 100644 index 9b4a4e41..00000000 --- a/dbtmetabase/cli.py +++ /dev/null @@ -1,417 +0,0 @@ -import functools -import logging -from pathlib import Path -from typing import Callable, Iterable, List, Optional, Union - -import click -import yaml -from typing_extensions import cast - -from .logger import logging as package_logger -from .models.interface import DbtInterface, MetabaseInterface - - -def _comma_separated_list_callback( - ctx: click.Context, param: click.Option, value: Union[str, List[str]] -) -> List[str]: - """Click callback for handling comma-separated lists.""" - - assert ( - param.type == click.UNPROCESSED or param.type.name == "list" - ), "comma-separated list options must be of type UNPROCESSED or list" - - if ctx.get_parameter_source(str(param.name)) in ( - click.core.ParameterSource.DEFAULT, - click.core.ParameterSource.DEFAULT_MAP, - ) and isinstance(value, list): - # lists in defaults (config or option) should be lists - return value - - elif isinstance(value, str): - str_value = value - if isinstance(value, list): - # when type=list, string value will be a list of chars - str_value = "".join(value) - else: - raise click.BadParameter("must be comma-separated list") - - return str_value.split(",") - - -@click.group() -@click.version_option(package_name="dbt-metabase") -@click.option( - "--config-path", - default="~/.dbt-metabase/config.yml", - show_default=True, - type=click.Path(), - help="Path to config.yml file with default values.", -) -@click.pass_context -def cli(ctx: click.Context, config_path: str): - group = cast(click.Group, ctx.command) - - config_path_expanded = Path(config_path).expanduser() - if config_path_expanded.exists(): - with open(config_path_expanded, "r", encoding="utf-8") as f: - config = yaml.safe_load(f).get("config", {}) - # propagate root configs to all commands - ctx.default_map = {command: config for command in group.commands} - - -def common_options(func: Callable) -> Callable: - """Common click options between commands.""" - - @click.option( - "--dbt-database", - metavar="DATABASE", - envvar="DBT_DATABASE", - show_envvar=True, - required=True, - type=click.STRING, - help="Target database name in dbt models.", - ) - @click.option( - "--dbt-manifest-path", - envvar="DBT_MANIFEST_PATH", - show_envvar=True, - type=click.Path(exists=True, dir_okay=False), - help="Path to dbt manifest.json file under /target/ in the dbt project directory. Uses dbt manifest parsing (recommended).", - ) - @click.option( - "--dbt-project-path", - envvar="DBT_PROJECT_PATH", - show_envvar=True, - type=click.Path(exists=True, file_okay=False), - help="Path to dbt project directory containing models. Uses dbt project parsing (not recommended).", - ) - @click.option( - "--dbt-schema", - metavar="SCHEMA", - envvar="DBT_SCHEMA", - show_envvar=True, - help="Target dbt schema. Must be passed if using project parser.", - type=click.STRING, - ) - @click.option( - "--dbt-schema-excludes", - metavar="SCHEMAS", - envvar="DBT_SCHEMA_EXCLUDES", - show_envvar=True, - type=click.UNPROCESSED, - callback=_comma_separated_list_callback, - help="Target dbt schemas to exclude. Ignored in project parser.", - ) - @click.option( - "--dbt-includes", - metavar="MODELS", - envvar="DBT_INCLUDES", - show_envvar=True, - type=click.UNPROCESSED, - callback=_comma_separated_list_callback, - help="Include specific dbt models names.", - ) - @click.option( - "--dbt-excludes", - metavar="MODELS", - envvar="DBT_EXCLUDES", - show_envvar=True, - type=click.UNPROCESSED, - callback=_comma_separated_list_callback, - help="Exclude specific dbt model names.", - ) - @click.option( - "--metabase-database", - metavar="DATABASE", - envvar="METABASE_DATABASE", - show_envvar=True, - required=True, - type=click.STRING, - help="Target database name in Metabase.", - ) - @click.option( - "--metabase-host", - metavar="HOST", - envvar="MB_HOST", - show_envvar=True, - required=True, - type=click.STRING, - help="Metabase hostname, excluding protocol.", - ) - @click.option( - "--metabase-user", - metavar="USER", - envvar="METABASE_USER", - show_envvar=True, - type=click.STRING, - help="Metabase username.", - ) - @click.option( - "--metabase-password", - metavar="PASSWORD", - envvar="METABASE_PASSWORD", - show_envvar=True, - type=click.STRING, - help="Metabase password.", - ) - @click.option( - "--metabase-session-id", - metavar="TOKEN", - envvar="METABASE_SESSION_ID", - show_envvar=True, - type=click.STRING, - help="Metabase session ID.", - ) - @click.option( - "--metabase-http/--metabase-https", - "metabase_use_http", - envvar="METABASE_USE_HTTP", - show_envvar=True, - default=False, - help="Force HTTP instead of HTTPS to connect to Metabase.", - ) - @click.option( - "--metabase-verify", - metavar="CERT", - envvar="METABASE_VERIFY", - show_envvar=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False), - help="Path to certificate bundle used to connect to Metabase.", - ) - @click.option( - "--metabase-sync/--metabase-sync-skip", - "metabase_sync", - envvar="METABASE_SYNC", - show_envvar=True, - default=True, - show_default=True, - help="Attempt to synchronize Metabase schema with local models.", - ) - @click.option( - "--metabase-sync-timeout", - metavar="SECS", - envvar="METABASE_SYNC_TIMEOUT", - show_envvar=True, - type=click.INT, - help="Synchronization timeout in secs. When set, command fails on failed synchronization. Otherwise, command proceeds regardless. Only valid if sync is enabled.", - ) - @click.option( - "--metabase-http-timeout", - metavar="SECS", - envvar="METABASE_HTTP_TIMEOUT", - show_envvar=True, - type=click.INT, - default=15, - show_default=True, - help="Set the value for single requests timeout.", - ) - @click.option( - "-v", - "--verbose", - is_flag=True, - help="Enable verbose logging.", - ) - @functools.wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - -@cli.command(help="Export dbt models to Metabase.") -@common_options -@click.option( - "--dbt-docs-url", - metavar="URL", - envvar="DBT_DOCS_URL", - show_envvar=True, - type=click.STRING, - help="URL for dbt docs to be appended to table descriptions in Metabase.", -) -@click.option( - "--dbt-include-tags", - envvar="DBT_INCLUDE_TAGS", - show_envvar=True, - is_flag=True, - help="Append tags to table descriptions in Metabase.", -) -@click.option( - "--metabase-exclude-sources", - envvar="METABASE_EXCLUDE_SOURCES", - show_envvar=True, - is_flag=True, - help="Skip exporting sources to Metabase.", -) -def models( - metabase_host: str, - metabase_user: str, - metabase_password: str, - metabase_database: str, - dbt_database: str, - dbt_path: Optional[str], - dbt_manifest_path: Optional[str], - dbt_schema: Optional[str], - dbt_schema_excludes: Optional[Iterable], - dbt_includes: Optional[Iterable], - dbt_excludes: Optional[Iterable], - metabase_session_id: Optional[str], - metabase_use_http: bool, - metabase_verify: Optional[str], - metabase_sync: bool, - metabase_sync_timeout: Optional[int], - metabase_exclude_sources: bool, - metabase_http_timeout: int, - dbt_include_tags: bool, - dbt_docs_url: Optional[str], - verbose: bool, -): - # Set global logging level if verbose - if verbose: - package_logger.LOGGING_LEVEL = logging.DEBUG - - # Instantiate dbt interface - dbt = DbtInterface( - path=dbt_path, - manifest_path=dbt_manifest_path, - database=dbt_database, - schema=dbt_schema, - schema_excludes=dbt_schema_excludes, - includes=dbt_includes, - excludes=dbt_excludes, - ) - - # Load models - dbt_models, aliases = dbt.read_models( - include_tags=dbt_include_tags, - docs_url=dbt_docs_url, - ) - - # Instantiate Metabase interface - metabase = MetabaseInterface( - host=metabase_host, - user=metabase_user, - password=metabase_password, - session_id=metabase_session_id, - use_http=metabase_use_http, - verify=metabase_verify, - database=metabase_database, - sync=metabase_sync, - sync_timeout=metabase_sync_timeout, - exclude_sources=metabase_exclude_sources, - http_timeout=metabase_http_timeout, - ) - - # Load client - metabase.prepare_metabase_client(dbt_models) - - # Execute model export - metabase.client.export_models( - database=metabase.database, - models=dbt_models, - aliases=aliases, - ) - - -@cli.command(help="Export dbt exposures to Metabase.") -@common_options -@click.option( - "--output-path", - envvar="OUTPUT_PATH", - show_envvar=True, - type=click.Path(exists=True, file_okay=False), - default=".", - show_default=True, - help="Output path for generated exposure YAML.", -) -@click.option( - "--output-name", - metavar="NAME", - envvar="OUTPUT_NAME", - show_envvar=True, - type=click.STRING, - default="metabase_exposures.yml", - show_default=True, - help="File name for generated exposure YAML.", -) -@click.option( - "--metabase-include-personal-collections", - envvar="METABASE_INCLUDE_PERSONAL_COLLECTIONS", - show_envvar=True, - is_flag=True, - help="Include personal collections when parsing exposures.", -) -@click.option( - "--metabase-collection-excludes", - metavar="COLLECTIONS", - envvar="METABASE_COLLECTION_EXCLUDES", - show_envvar=True, - type=click.UNPROCESSED, - callback=_comma_separated_list_callback, - help="Metabase collection names to exclude.", -) -def exposures( - metabase_host: str, - metabase_user: str, - metabase_password: str, - metabase_database: str, - dbt_database: str, - dbt_path: Optional[str], - dbt_manifest_path: Optional[str], - dbt_schema: Optional[str], - dbt_schema_excludes: Optional[Iterable], - dbt_includes: Optional[Iterable], - dbt_excludes: Optional[Iterable], - metabase_session_id: Optional[str], - metabase_use_http: bool, - metabase_verify: Optional[str], - metabase_sync: bool, - metabase_sync_timeout: Optional[int], - metabase_http_timeout: int, - output_path: str, - output_name: str, - metabase_include_personal_collections: bool, - metabase_collection_excludes: Optional[Iterable], - verbose: bool, -): - if verbose: - package_logger.LOGGING_LEVEL = logging.DEBUG - - # Instantiate dbt interface - dbt = DbtInterface( - path=dbt_path, - manifest_path=dbt_manifest_path, - database=dbt_database, - schema=dbt_schema, - schema_excludes=dbt_schema_excludes, - includes=dbt_includes, - excludes=dbt_excludes, - ) - - # Load models - dbt_models, _ = dbt.read_models() - - # Instantiate Metabase interface - metabase = MetabaseInterface( - host=metabase_host, - user=metabase_user, - password=metabase_password, - session_id=metabase_session_id, - use_http=metabase_use_http, - verify=metabase_verify, - database=metabase_database, - sync=metabase_sync, - sync_timeout=metabase_sync_timeout, - http_timeout=metabase_http_timeout, - ) - - # Load client - metabase.prepare_metabase_client(dbt_models) - - # Execute exposure extraction - metabase.client.extract_exposures( - models=dbt_models, - output_path=output_path, - output_name=output_name, - include_personal_collections=metabase_include_personal_collections, - collection_excludes=metabase_collection_excludes, - ) diff --git a/dbtmetabase/parsers/dbt_manifest.py b/dbtmetabase/dbt.py similarity index 59% rename from dbtmetabase/parsers/dbt_manifest.py rename to dbtmetabase/dbt.py index 3ea7dd6d..14ef85b4 100644 --- a/dbtmetabase/parsers/dbt_manifest.py +++ b/dbtmetabase/dbt.py @@ -1,26 +1,127 @@ +import dataclasses import json -from typing import List, Mapping, MutableMapping, Optional, Tuple +import re +from enum import Enum +from pathlib import Path +from typing import Iterable, List, Mapping, MutableMapping, Optional, Sequence -from ..logger.logging import logger -from ..models.metabase import ( - METABASE_COLUMN_META_FIELDS, - METABASE_MODEL_DEFAULT_SCHEMA, - METABASE_MODEL_META_FIELDS, - MetabaseColumn, - MetabaseModel, - ModelType, -) -from .dbt import DbtReader +from .logger.logging import logger +# Allowed metabase.* fields +_METABASE_COMMON_META_FIELDS = [ + "display_name", + "visibility_type", +] +# Must be covered by MetabaseColumn attributes +METABASE_COLUMN_META_FIELDS = _METABASE_COMMON_META_FIELDS + [ + "semantic_type", + "has_field_values", + "coercion_strategy", + "number_style", +] +# Must be covered by MetabaseModel attributes +METABASE_MODEL_META_FIELDS = _METABASE_COMMON_META_FIELDS + [ + "points_of_interest", + "caveats", +] -class DbtManifestReader(DbtReader): - """Reader for dbt manifest artifact.""" +# Default model schema (only schema in BigQuery) +METABASE_MODEL_DEFAULT_SCHEMA = "PUBLIC" + + +class ModelType(str, Enum): + nodes = "nodes" + sources = "sources" + + +@dataclasses.dataclass +class MetabaseColumn: + name: str + description: Optional[str] = None + + display_name: Optional[str] = None + visibility_type: Optional[str] = None + semantic_type: Optional[str] = None + has_field_values: Optional[str] = None + coercion_strategy: Optional[str] = None + number_style: Optional[str] = None + + fk_target_table: Optional[str] = None + fk_target_field: Optional[str] = None + + meta_fields: MutableMapping = dataclasses.field(default_factory=dict) + + +@dataclasses.dataclass +class MetabaseModel: + name: str + schema: str + description: str = "" + + display_name: Optional[str] = None + visibility_type: Optional[str] = None + points_of_interest: Optional[str] = None + caveats: Optional[str] = None + + model_type: ModelType = ModelType.nodes + source: Optional[str] = None + unique_id: Optional[str] = None + + columns: Sequence[MetabaseColumn] = dataclasses.field(default_factory=list) + + @property + def ref(self) -> Optional[str]: + if self.model_type == ModelType.nodes: + return f"ref('{self.name}')" + elif self.model_type == ModelType.sources: + return f"source('{self.source}', '{self.name}')" + return None + + +class _NullValue(str): + """Explicitly null field value.""" + + def __eq__(self, other: object) -> bool: + return other is None + + +NullValue = _NullValue() + + +class DbtReader: + def __init__( + self, + manifest_path: str, + database: str, + schema: Optional[str] = None, + schema_excludes: Optional[Iterable] = None, + includes: Optional[Iterable] = None, + excludes: Optional[Iterable] = None, + ): + """Reader for compiled dbt manifest.json file. + + Args: + manifest_path (str, optional): Path to dbt manifest.json (usually under target/). Defaults to None. + database (str, optional): Target database name specified in dbt models. Default to None. + schema (str, optional): Target schema. Defaults to None. + schema_excludes (Iterable, optional): Target schemas to exclude. Defaults to None. + includes (Iterable, optional): Model names to limit selection. Defaults to None. + excludes (Iterable, optional): Model names to exclude from selection. Defaults to None. + """ + + self.manifest_path = Path(manifest_path).expanduser() + self.database = database.upper() + self.schema = schema.upper() if schema else None + self.schema_excludes = [x.upper() for x in schema_excludes or []] + + self.includes = [x.upper() for x in includes or []] + self.excludes = [x.upper() for x in excludes or []] def read_models( self, include_tags: bool = True, docs_url: Optional[str] = None, - ) -> Tuple[List[MetabaseModel], MutableMapping]: + ) -> List[MetabaseModel]: """Reads dbt models in Metabase-friendly format. Keyword Arguments: @@ -31,13 +132,11 @@ def read_models( list -- List of dbt models in Metabase-friendly format. """ - manifest = {} + with open(self.manifest_path, "r", encoding="utf-8") as f: + manifest = json.load(f) mb_models: List[MetabaseModel] = [] - with open(self.path, "r", encoding="utf-8") as manifest_file: - manifest = json.load(manifest_file) - for _, node in manifest["nodes"].items(): model_name = node["name"].upper() model_schema = node["schema"].upper() @@ -147,7 +246,7 @@ def read_models( ) ) - return mb_models, self.alias_mapping + return mb_models def _read_model( self, @@ -158,23 +257,58 @@ def _read_model( include_tags: bool = True, docs_url: Optional[str] = None, ) -> MetabaseModel: - """Reads one dbt model in Metabase-friendly format. - - Arguments: - model {dict} -- One dbt model to read. - source {str, optional} -- Name of the source if source - model_type {str} -- The type of the node which can be one of either nodes or sources - include_tags: {bool} -- Flag to append tags to description of model - - Returns: - dict -- One dbt model in Metabase-friendly format. - """ - metabase_columns: List[MetabaseColumn] = [] schema = model["schema"].upper() unique_id = model["unique_id"] + relationships = self._read_model_relationships( + manifest=manifest, + model_type=model_type, + unique_id=unique_id, + ) + + for _, column in model.get("columns", {}).items(): + metabase_columns.append( + self._read_column( + column=column, + schema=schema, + relationship=relationships.get(column["name"]), + ) + ) + + description = model.get("description", "") + + if include_tags: + tags = model.get("tags", []) + if tags: + tags = ", ".join(tags) + if description != "": + description += "\n\n" + description += f"Tags: {tags}" + + if docs_url: + full_path = f"{docs_url}/#!/model/{unique_id}" + if description != "": + description += "\n\n" + description += f"dbt docs link: {full_path}" + + resolved_name = model.get("alias", model.get("identifier", model["name"])) + + return MetabaseModel( + name=resolved_name, + schema=schema, + description=description, + columns=metabase_columns, + model_type=model_type, + unique_id=unique_id, + source=source, + **self.read_meta_fields(model, METABASE_MODEL_META_FIELDS), + ) + + def _read_model_relationships( + self, manifest: Mapping, model_type: ModelType, unique_id: str + ) -> Mapping[str, Mapping[str, str]]: children = manifest["child_map"][unique_id] relationship_tests = {} @@ -182,6 +316,7 @@ def _read_model( child = {} if manifest[model_type]: child = manifest[model_type].get(child_id, {}) + # Only proceed if we are seeing an explicitly declared relationship test if ( child.get("resource_type") == "test" @@ -250,49 +385,7 @@ def _read_model( "fk_target_field": fk_target_field, } - for _, column in model.get("columns", {}).items(): - metabase_columns.append( - self._read_column( - column=column, - schema=schema, - relationship=relationship_tests.get(column["name"]), - ) - ) - - description = model.get("description", "") - - if include_tags: - tags = model.get("tags", []) - if tags: - tags = ", ".join(tags) - if description != "": - description += "\n\n" - description += f"Tags: {tags}" - - if docs_url: - full_path = f"{docs_url}/#!/model/{unique_id}" - if description != "": - description += "\n\n" - description += f"dbt docs link: {full_path}" - - resolved_name = model.get("alias", model.get("identifier")) - dbt_name = None - if not resolved_name: - resolved_name = model["name"] - else: - dbt_name = model["name"] - - return MetabaseModel( - name=resolved_name, - schema=schema, - description=description, - columns=metabase_columns, - model_type=model_type, - unique_id=unique_id, - source=source, - dbt_name=dbt_name, - **self.read_meta_fields(model, METABASE_MODEL_META_FIELDS), - ) + return relationship_tests def _read_column( self, @@ -300,17 +393,6 @@ def _read_column( schema: str, relationship: Optional[Mapping], ) -> MetabaseColumn: - """Reads one dbt column in Metabase-friendly format. - - Arguments: - column {dict} -- One dbt column to read. - schema {str} -- Schema as passed down from CLI args or parsed from `source` - relationship {Mapping, optional} -- Mapping of columns to their foreign key relationships - - Returns: - dict -- One dbt column in Metabase-friendly format. - """ - column_name = column.get("name", "").upper().strip('"') column_description = column.get("description") metabase_column = MetabaseColumn( @@ -328,3 +410,99 @@ def _read_column( ) return metabase_column + + def model_selected(self, name: str) -> bool: + """Checks whether model passes inclusion/exclusion criteria. + + Args: + name (str): Model name. + + Returns: + bool: True if included, false otherwise. + """ + n = name.upper() + return n not in self.excludes and (not self.includes or n in self.includes) + + def set_column_foreign_key( + self, + column: Mapping, + metabase_column: MetabaseColumn, + table: Optional[str], + field: Optional[str], + schema: Optional[str], + ): + """Sets foreign key target on a column. + + Args: + column (Mapping): Schema column definition. + metabase_column (MetabaseColumn): Metabase column definition. + table (str): Foreign key target table. + field (str): Foreign key target field. + schema (str): Current schema name. + """ + # Meta fields take precedence + meta = column.get("meta", {}) + table = meta.get("metabase.fk_target_table", table) + field = meta.get("metabase.fk_target_field", field) + + if not table or not field: + if table or field: + logger().warning( + "Foreign key requires table and field for column %s", + metabase_column.name, + ) + return + + table_path = table.split(".") + if len(table_path) == 1 and schema: + table_path.insert(0, schema) + + metabase_column.semantic_type = "type/FK" + metabase_column.fk_target_table = ".".join( + [x.strip('"').upper() for x in table_path] + ) + metabase_column.fk_target_field = field.strip('"').upper() + logger().debug( + "Relation from %s to %s.%s", + metabase_column.name, + metabase_column.fk_target_table, + metabase_column.fk_target_field, + ) + + @staticmethod + def read_meta_fields(obj: Mapping, fields: List) -> Mapping: + """Reads meta fields from a schem object. + + Args: + obj (Mapping): Schema object. + fields (List): List of fields to read. + + Returns: + Mapping: Field values. + """ + + vals = {} + meta = obj.get("meta", {}) + for field in fields: + if f"metabase.{field}" in meta: + value = meta[f"metabase.{field}"] + vals[field] = value if value is not None else NullValue + return vals + + @staticmethod + def parse_ref(text: str) -> Optional[str]: + """Parses dbt ref() or source() statement. + + Arguments: + text {str} -- Full statement in dbt YAML. + + Returns: + str -- Name of the reference. + """ + + # We are catching the rightmost argument of either source or ref which is ultimately the table name + matches = re.findall(r"['\"]([\w\_\-\ ]+)['\"][ ]*\)$", text.strip()) + if matches: + logger().debug("%s -> %s", text, matches[0]) + return matches[0] + return None diff --git a/dbtmetabase/metabase.py b/dbtmetabase/metabase.py index 8ca7ee6a..4ca6bf2a 100644 --- a/dbtmetabase/metabase.py +++ b/dbtmetabase/metabase.py @@ -1,355 +1,157 @@ -import json -import os +from __future__ import annotations + import re import time -from typing import ( - Any, - Dict, - Iterable, - List, - Mapping, - MutableMapping, - Optional, - Tuple, - Union, -) +from pathlib import Path +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import requests import yaml from requests.adapters import HTTPAdapter, Retry -from .logger.logging import logger -from .models import exceptions -from .models.metabase import ( +from .dbt import ( METABASE_MODEL_DEFAULT_SCHEMA, MetabaseColumn, MetabaseModel, ModelType, NullValue, ) +from .logger.logging import logger -class MetabaseClient: - """Metabase API client.""" - - _SYNC_PERIOD_SECS = 5 - - class _Metadata: - """Mutable state of metadata (tables/fields) for lookups and updates.""" - - def __init__(self, tables: Optional[Iterable[Dict]] = None): - self.tables = {} - self.updates: MutableMapping[str, MutableMapping[str, Any]] = {} - - if tables: - for table in tables: - fields = {} - for field in table.get("fields", []): - new_field = field.copy() - new_field["kind"] = "field" - - field_name = field["name"].upper() - fields[field_name] = new_field - - new_table = table.copy() - new_table["kind"] = "table" - new_table["fields"] = fields - - schema_name = table["schema"].upper() - table_name = table["name"].upper() - self.tables[f"{schema_name}.{table_name}"] = new_table - - def get_table(self, table_key: str) -> Optional[MutableMapping]: - """Looks up table by key. - - Args: - table_key (str): Table key of form SCHEMA.TABLE. - - Returns: - Optional[MutableMapping]: Table description or none. - """ - return self.tables.get(table_key) - - def get_field(self, table_key: str, field_key: str) -> Optional[MutableMapping]: - """Looks up field by table and key. - - Args: - table_key (str): Table key of form SCHEMA.TABLE. - field_key (str): Field key. - - Returns: - Optional[MutableMapping]: Field description or none. - """ - return self.tables.get(table_key, {}).get("fields", {}).get(field_key) +class MetabaseArgumentError(ValueError): + """Invalid Metabase arguments supplied.""" - def update(self, entity: MutableMapping, delta: Mapping): - """Updates entity (table or field) with arguments and stages API update. - Args: - entity (MutableMapping): Current state of entity. - delta (Mapping): Fields that need to change. - """ - entity.update(delta) +class MetabaseRuntimeError(RuntimeError): + """Metabase execution failed.""" - key = f"{entity['kind']}.{entity['id']}" - update = self.updates.get(key, {}) - update["kind"] = entity["kind"] - update["id"] = entity["id"] - body = update.get("body", {}) - body.update(delta) - update["body"] = body +class _MetabaseClientJob: + """Scoped abstraction for jobs depending on the Metabase client.""" - self.updates[key] = update + def __init__(self, client: MetabaseClient): + self.client = client - def pop_updates(self) -> Iterable[MutableMapping]: - """Clears and returns currently staged updates. - Returns: - Iterable[MutableMapping]: List of updates. - """ - updates = self.updates.values() - self.updates = {} - return updates +class _ExportModelsJob(_MetabaseClientJob): + """Job abstraction for exporting models.""" - def __bool__(self) -> bool: - return bool(self.tables) + _SYNC_PERIOD = 5 def __init__( self, - host: str, - user: Optional[str], - password: Optional[str], - verify: Optional[Union[str, bool]] = None, - cert: Optional[Union[str, Tuple[str, str]]] = None, - session_id: Optional[str] = None, - use_http: bool = False, - sync: Optional[bool] = True, - sync_timeout: Optional[int] = None, - exclude_sources: bool = False, - http_extra_headers: Optional[dict] = None, - http_timeout: int = 15, - http_adapter: Optional[HTTPAdapter] = None, - ): - """Constructor. - - Arguments: - host {str} -- Metabase hostname. - user {str} -- Metabase username. - password {str} -- Metabase password. - - Keyword Arguments: - use_http {bool} -- Use HTTP instead of HTTPS. (default: {False}) - verify {Union[str, bool]} -- Path to certificate or disable verification. (default: {None}) - cert {Union[str, Tuple[str, str]]} -- Path to a custom certificate to be used by the Metabase client. (default: {None}) - session_id {str} -- Metabase session ID. (default: {None}) - sync (bool, optional): Attempt to synchronize Metabase schema with local models. Defaults to True. - sync_timeout (Optional[int], optional): Synchronization timeout (in secs). Defaults to None. - http_extra_headers {dict} -- HTTP headers to be used by the Metabase client. (default: {None}) - exclude_sources {bool} -- Exclude exporting sources. (default: {False}) - http_adapter: (Optional[HTTPAdapter], optional) Provide custom HTTP adapter implementation for requests to use. Defaults to None. - """ - - self.base_url = f"{'http' if use_http else 'https'}://{host}" - self.http_timeout = http_timeout - self.session = requests.Session() - self.session.verify = verify - self.session.cert = cert - - if http_extra_headers is not None: - self.session.headers.update(http_extra_headers) - - if not http_adapter: - http_adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.5)) - - self.session.mount(self.base_url, http_adapter) - session_header = session_id or self.get_session_id(user, password) - self.session.headers["X-Metabase-Session"] = session_header - - self.sync = sync - self.sync_timeout = sync_timeout - self.exclude_sources = exclude_sources - self.collections: Iterable = [] - self.tables: Iterable = [] - self.table_map: MutableMapping = {} - self.models_exposed: List = [] - self.native_query: str = "" - # This regex is looking for from and join clauses, and extracting the table part. - # It won't recognize some valid sql table references, such as `from "table with spaces"`. - self.exposure_parser = re.compile(r"[FfJj][RrOo][OoIi][MmNn]\s+([\w.\"]+)") - self.cte_parser = re.compile( - r"[Ww][Ii][Tt][Hh]\s+\b(\w+)\b\s+as|[)]\s*[,]\s*\b(\w+)\b\s+as" - ) - self.metadata = self._Metadata() - - self._synced_models: Optional[List[MetabaseModel]] = None - - logger().info(":ok_hand: Session established successfully") - - def get_session_id(self, user: Optional[str], password: Optional[str]) -> str: - """Obtains new session ID from API. - - Arguments: - user {str} -- Metabase username. - password {str} -- Metabase password. - - Returns: - str -- Session ID. - """ - - return self.api( - "post", - "/api/session", - json={"username": user, "password": password}, - )["id"] - - def sync_and_wait( - self, + client: MetabaseClient, database: str, models: List[MetabaseModel], - ) -> bool: - """Synchronize with the database and wait for schema compatibility. - - Arguments: - database {str} -- Metabase database name. - models {list} -- List of dbt models read from project. - - Returns: - bool -- True if schema compatible with models, false if still incompatible. - - Raises: - MetabaseUnableToSync if - - the timeout provided is not sufficient - - the database cannot be found - - a timeout was provided but sync was unsuccessful - """ - - timeout = self.sync_timeout if self.sync_timeout else 30 - - if timeout < self._SYNC_PERIOD_SECS: - raise exceptions.MetabaseUnableToSync( - f"Timeout provided {timeout} secs, must be at least {self._SYNC_PERIOD_SECS}" - ) + exclude_sources: bool, + sync_timeout: int, + ): + super().__init__(client) + + self.database = database + self.models = [ + model + for model in models + if model.model_type != ModelType.sources or not exclude_sources + ] + self.sync_timeout = sync_timeout - self.metadata = MetabaseClient._Metadata() + self.tables: Mapping[str, MutableMapping] = {} + self.updates: MutableMapping[str, MutableMapping[str, Any]] = {} - database_id = self.find_database_id(database) - if not database_id: - raise exceptions.MetabaseUnableToSync( - f"Cannot find database by name {database}" - ) - - self.api("post", f"/api/database/{database_id}/sync_schema") + def execute(self): + success = True - deadline = int(time.time()) + timeout - sync_successful = False - while True: - self.metadata = self.build_metadata(database_id) - sync_successful = self.models_compatible(models) - time_after_wait = int(time.time()) + self._SYNC_PERIOD_SECS - if not sync_successful and time_after_wait <= deadline: - time.sleep(self._SYNC_PERIOD_SECS) - else: + database_id = None + for database in self.client.api("get", "/api/database"): + if database["name"].upper() == self.database.upper(): + database_id = database["id"] break + if not database_id: + raise MetabaseRuntimeError(f"Cannot find database by name {self.database}") - if sync_successful: - self._synced_models = models.copy() - elif self.sync: - raise exceptions.MetabaseUnableToSync( - "Unable to align models between dbt target models and Metabase" - ) - - return sync_successful - - def models_compatible(self, models: List[MetabaseModel]) -> bool: - """Checks if models compatible with the Metabase database schema. - - Arguments: - models {list} -- List of dbt models read from project. - - Returns: - bool -- True if schema compatible with models, false otherwise. - """ - - are_models_compatible = True - for model in models: - if model.model_type == ModelType.sources and self.exclude_sources: - continue - - schema_name = model.schema.upper() - model_name = model.name.upper() - - lookup_key = f"{schema_name}.{model_name}" + if self.sync_timeout: + self.client.api("post", f"/api/database/{database_id}/sync_schema") + time.sleep(self._SYNC_PERIOD) - table = self.metadata.get_table(lookup_key) - if not table: - logger().warning( - "Model %s not found in %s schema", lookup_key, schema_name - ) - are_models_compatible = False - continue + deadline = int(time.time()) + self.sync_timeout + synced = False + while not synced: + tables = self._load_tables(database_id) - for column in model.columns: - column_name = column.name.upper() + synced = True + for model in self.models: + schema_name = model.schema.upper() + model_name = model.name.upper() + table_key = f"{schema_name}.{model_name}" - field = self.metadata.get_field(lookup_key, column_name) - if not field: + table = tables.get(table_key) + if not table: logger().warning( - "Column %s not found in %s model", column_name, lookup_key + "Model %s not found in %s schema", table_key, schema_name ) - are_models_compatible = False - - return are_models_compatible + synced = False + continue - def export_models( - self, - database: str, - models: List[MetabaseModel], - aliases, - ): - """Exports dbt models to Metabase database schema. + for column in model.columns: + column_name = column.name.upper() - Arguments: - database {str} -- Metabase database name. - models {list} -- List of dbt models read from project. - aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables - """ + field = table.get("fields", {}).get(column_name) + if not field: + logger().warning( + "Column %s not found in %s model", column_name, table_key + ) + synced = False + continue - success = True + self.tables = tables - if not self.metadata or self._synced_models != models: - self.sync_and_wait(database, models) + if int(time.time()) < deadline: + time.sleep(self._SYNC_PERIOD) - for model in models: - if model.model_type == ModelType.sources and self.exclude_sources: - logger().info(":fast_forward: Skipping %s source", model.unique_id) - continue + if not synced and self.sync_timeout: + raise MetabaseRuntimeError("Unable to sync models between dbt and Metabase") - success &= self.export_model(model, aliases) + for model in self.models: + success &= self._export_model(model) - for update in self.metadata.pop_updates(): - self.api( + for update in self.updates.values(): + self.client.api( "put", f"/api/{update['kind']}/{update['id']}", json=update["body"], ) logger().info( - ":satellite: Update to %s %s applied successfully", + "Update to %s %s applied successfully", update["kind"], update["id"], ) if not success: - raise exceptions.MetabaseRuntimeError( + raise MetabaseRuntimeError( "Model export encountered non-critical errors, check output" ) - def export_model(self, model: MetabaseModel, aliases: dict) -> bool: + def queue_update(self, entity: MutableMapping, delta: Mapping): + entity.update(delta) + + key = f"{entity['kind']}.{entity['id']}" + update = self.updates.get(key, {}) + update["kind"] = entity["kind"] + update["id"] = entity["id"] + + body = update.get("body", {}) + body.update(delta) + update["body"] = body + + self.updates[key] = update + + def _export_model(self, model: MetabaseModel) -> bool: """Exports one dbt model to Metabase database schema. Arguments: model {dict} -- One dbt model read from project. - aliases {dict} -- Provided by reader class. Shuttled down to column exports to resolve FK refs against relations to aliased source tables Returns: bool -- True if exported successfully, false if there were errors. @@ -359,14 +161,11 @@ def export_model(self, model: MetabaseModel, aliases: dict) -> bool: schema_name = model.schema.upper() model_name = model.name.upper() + table_key = f"{schema_name}.{model_name}" - lookup_key = f"{schema_name}.{aliases.get(model_name, model_name)}" - - api_table = self.metadata.get_table(lookup_key) + api_table = self.tables.get(table_key) if not api_table: - logger().error( - ":cross_mark: Table %s does not exist in Metabase", lookup_key - ) + logger().error("Table %s does not exist in Metabase", table_key) return False # Empty strings not accepted by Metabase @@ -394,29 +193,28 @@ def export_model(self, model: MetabaseModel, aliases: dict) -> bool: body_table["visibility_type"] = model_visibility if body_table: - self.metadata.update(entity=api_table, delta=body_table) - logger().info(":raising_hands: Table %s will be updated", lookup_key) + self.queue_update(entity=api_table, delta=body_table) + logger().info("Table %s will be updated", table_key) else: - logger().info(":thumbs_up: Table %s is up-to-date", lookup_key) + logger().info("Table %s is up-to-date", table_key) for column in model.columns: - success &= self.export_column(schema_name, model_name, column, aliases) + success &= self._export_column(schema_name, model_name, column) return success - def export_column( + def _export_column( self, schema_name: str, model_name: str, column: MetabaseColumn, - aliases: dict, ) -> bool: """Exports one dbt column to Metabase database schema. Arguments: + schema_name {str} -- Target schema name.s model_name {str} -- One dbt model name read from project. column {dict} -- One dbt column read from project. - aliases {dict} -- Provided by reader class. Used to resolve FK refs against relations to aliased source tables Returns: bool -- True if exported successfully, false if there were errors. @@ -424,14 +222,14 @@ def export_column( success = True - table_lookup_key = f"{schema_name}.{model_name}" + table_key = f"{schema_name}.{model_name}" column_name = column.name.upper() - api_field = self.metadata.get_field(table_lookup_key, column_name) + api_field = self.tables.get(table_key, {}).get("fields", {}).get(column_name) if not api_field: logger().error( - ":cross_mark: Field %s.%s does not exist in Metabase", - table_lookup_key, + "Field %s.%s does not exist in Metabase", + table_key, column_name, ) return False @@ -444,7 +242,7 @@ def export_column( fk_target_field_id = None if column.semantic_type == "type/FK": # Target table could be aliased if we parse_ref() on a source, so we caught aliases during model parsing - # This way we can unpack any alias mapped to fk_target_table when using yml folder parser + # This way we can unpack any alias mapped to fk_target_table when using yml project reader target_table = ( column.fk_target_table.upper() if column.fk_target_table is not None @@ -458,52 +256,46 @@ def export_column( if not target_table or not target_field: logger().info( - ":bow: Passing on fk resolution for %s. Target field %s was not resolved during dbt model parsing.", - table_lookup_key, + "Passing on fk resolution for %s. Target field %s was not resolved during dbt model parsing.", + table_key, target_field, ) else: - was_aliased = ( - aliases.get(target_table.split(".", 1)[-1]) - if target_table - else None - ) - if was_aliased: - target_table = ".".join( - [target_table.split(".", 1)[0], was_aliased] - ) - logger().debug( - ":magnifying_glass_tilted_right: Looking for field %s in table %s", + "Looking for field %s in table %s", target_field, target_table, ) - fk_target_field = self.metadata.get_field(target_table, target_field) + fk_target_field = ( + self.tables.get(target_table, {}) + .get("fields", {}) + .get(target_field) + ) if fk_target_field: fk_target_field_id = fk_target_field.get("id") if fk_target_field.get(semantic_type_key) != "type/PK": logger().info( - ":key: Target field %s will be set to PK for %s column FK", + "Target field %s will be set to PK for %s column FK", fk_target_field_id, column_name, ) body_fk_target_field = { semantic_type_key: "type/PK", } - self.metadata.update( + self.queue_update( entity=fk_target_field, delta=body_fk_target_field ) else: logger().info( - ":thumbs_up: Target field %s is already PK, needed for %s column", + "Target field %s is already PK, needed for %s column", fk_target_field_id, column_name, ) else: logger().error( - ":cross_mark: Unable to find foreign key target %s.%s", + "Unable to find foreign key target %s.%s", target_table, target_field, ) @@ -558,45 +350,17 @@ def export_column( body_field[semantic_type_key] = column.semantic_type or None if body_field: - self.metadata.update(entity=api_field, delta=body_field) - logger().info( - ":sparkles: Field %s.%s will be updated", model_name, column_name - ) + self.queue_update(entity=api_field, delta=body_field) + logger().info("Field %s.%s will be updated", model_name, column_name) else: - logger().info( - ":thumbs_up: Field %s.%s is up-to-date", model_name, column_name - ) + logger().info("Field %s.%s is up-to-date", model_name, column_name) return success - def find_database_id(self, name: str) -> Optional[str]: - """Finds Metabase database ID by name. - - Arguments: - name {str} -- Metabase database name. - - Returns: - str -- Metabase database ID. - """ - - for database in self.api("get", "/api/database"): - if database["name"].upper() == name.upper(): - return database["id"] - return None - - def build_metadata(self, database_id: str) -> _Metadata: - """Builds metadata lookups. - - Arguments: - database_id {str} -- Metabase database ID. - - Returns: - _Metadata -- Metadata lookup object. - """ - - tables = [] + def _load_tables(self, database_id: str) -> Mapping[str, MutableMapping]: + tables = {} - metadata = self.api( + metadata = self.client.api( "get", f"/api/database/{database_id}/metadata", params={"include_hidden": True}, @@ -610,64 +374,86 @@ def build_metadata(self, database_id: str) -> _Metadata: table.get("schema") or bigquery_schema or METABASE_MODEL_DEFAULT_SCHEMA ).upper() - tables.append(table) + fields = {} + for field in table.get("fields", []): + new_field = field.copy() + new_field["kind"] = "field" - return MetabaseClient._Metadata(tables) + field_name = field["name"].upper() + fields[field_name] = new_field - def extract_exposures( - self, - models: List[MetabaseModel], - output_path: str = ".", - output_name: str = "metabase_exposures", - include_personal_collections: bool = True, - collection_excludes: Optional[Iterable] = None, - ) -> Mapping: - """Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader + new_table = table.copy() + new_table["kind"] = "table" + new_table["fields"] = fields - Arguments: - models {List[MetabaseModel]} -- List of models as output by dbt reader + schema_name = table["schema"].upper() + table_name = table["name"].upper() + tables[f"{schema_name}.{table_name}"] = new_table - Keyword Arguments: - output_path {str} -- The path to output the generated yaml. (default: ".") - output_name {str} -- The name of the generated yaml. (default: {"metabase_exposures"}) - include_personal_collections {bool} -- Include personal collections in Metabase processing. (default: {True}) - collection_excludes {str} -- List of collections to exclude by name. (default: {None}) + return tables - Returns: - List[Mapping] -- JSON object representation of all exposures parsed. - """ - _RESOURCE_VERSION = 2 +class _ExtractExposuresJob(_MetabaseClientJob): + _RESOURCE_VERSION = 2 - class DbtDumper(yaml.Dumper): - def increase_indent(self, flow=False, indentless=False): - indentless = False - return super(DbtDumper, self).increase_indent(flow, indentless) + # This regex is looking for from and join clauses, and extracting the table part. + # It won't recognize some valid sql table references, such as `from "table with spaces"`. + _EXPOSURE_PARSER = re.compile(r"[FfJj][RrOo][OoIi][MmNn]\s+([\w.\"]+)") + _CTE_PARSER = re.compile( + r"[Ww][Ii][Tt][Hh]\s+\b(\w+)\b\s+as|[)]\s*[,]\s*\b(\w+)\b\s+as" + ) - if collection_excludes is None: - collection_excludes = [] + class DbtDumper(yaml.Dumper): + def increase_indent(self, flow=False, indentless=False): + return super().increase_indent(flow, indentless=False) - refable_models = {node.name.upper(): node.ref for node in models} + def __init__( + self, + client: MetabaseClient, + models: List[MetabaseModel], + output_path: str, + output_name: str, + include_personal_collections: bool, + collection_excludes: Optional[Iterable], + ): + super().__init__(client) + + self.model_refs = {model.name.upper(): model.ref for model in models} + self.output_file = Path(output_path).expanduser() / f"{output_name}.yml" + self.include_personal_collections = include_personal_collections + self.collection_excludes = collection_excludes or [] + + self.table_names: Mapping = {} + self.models_exposed: List = [] + self.native_query: str = "" + + def execute(self) -> Mapping: + """Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader. - self.collections = self.api("get", "/api/collection") - self.tables = self.api("get", "/api/table") - self.table_map = {table["id"]: table["name"] for table in self.tables} + Returns: + Mapping: JSON object representation of all exposures parsed. + """ + + self.table_names = { + table["id"]: table["name"] for table in self.client.api("get", "/api/table") + } documented_exposure_names = [] parsed_exposures = [] - for collection in self.collections: - # Exclude collections by name - if collection["name"] in collection_excludes: - continue - - # Optionally exclude personal collections - if not include_personal_collections and collection.get("personal_owner_id"): + for collection in self.client.api("get", "/api/collection"): + # Exclude collections by name or personal collections (unless included) + if collection["name"] in self.collection_excludes or ( + collection.get("personal_owner_id") + and not self.include_personal_collections + ): continue # Iter through collection - logger().info(":sparkles: Exploring collection %s", collection["name"]) - for item in self.api("get", f"/api/collection/{collection['id']}/items"): + logger().info("Exploring collection %s", collection["name"]) + for item in self.client.api( + "get", f"/api/collection/{collection['id']}/items" + ): # Ensure collection item is of parsable type exposure_type = item["model"] exposure_id = item["id"] @@ -679,13 +465,17 @@ def increase_indent(self, flow=False, indentless=False): self.native_query = "" native_query = "" - exposure = self.api("get", f"/api/{exposure_type}/{exposure_id}") + exposure = self.client.api("get", f"/api/{exposure_type}/{exposure_id}") exposure_name = exposure.get("name", "Exposure [Unresolved Name]") logger().info( - "\n:bow_and_arrow: Introspecting exposure: %s", + "Introspecting exposure: %s", exposure_name, ) + header = None + creator_name = None + creator_email = None + # Process exposure if exposure_type == "card": # Build header for card and extract models to self.models_exposed @@ -694,7 +484,7 @@ def increase_indent(self, flow=False, indentless=False): ) # Parse Metabase question - self._extract_card_exposures(exposure_id, exposure, refable_models) + self._extract_card_exposures(exposure_id, exposure) native_query = self.native_query elif exposure_type == "dashboard": @@ -712,14 +502,12 @@ def increase_indent(self, flow=False, indentless=False): dashboard_item_reference = dashboard_item.get("card", {}) if "id" not in dashboard_item_reference: continue + # Parse Metabase question - self._extract_card_exposures( - dashboard_item_reference["id"], - refable_models=refable_models, - ) + self._extract_card_exposures(dashboard_item_reference["id"]) if not self.models_exposed: - logger().info(":bow: No models mapped to exposure") + logger().info("No models mapped to exposure") # Extract creator info if "creator" in exposure: @@ -728,17 +516,19 @@ def increase_indent(self, flow=False, indentless=False): elif "creator_id" in exposure: # If a metabase user is deactivated, the API returns a 404 try: - creator = self.api("get", f"/api/user/{exposure['creator_id']}") + creator = self.client.api( + "get", f"/api/user/{exposure['creator_id']}" + ) except requests.exceptions.HTTPError as error: creator = {} if error.response is None or error.response.status_code != 404: raise - creator_email = creator.get("email") creator_name = creator.get("common_name") + creator_email = creator.get("email") exposure_label = exposure_name - # Only letters, numbers and underscores allowed in model names in dbt docs DAG / No duplicate model names + # Only letters, numbers and underscores allowed in model names in dbt docs DAG / no duplicate model names exposure_name = re.sub(r"[^\w]", "_", exposure_name).lower() enumer = 1 while exposure_name in documented_exposure_names: @@ -752,11 +542,10 @@ def increase_indent(self, flow=False, indentless=False): exposure_id=exposure_id, name=exposure_name, label=exposure_label, - header=header, + header=header or "", created_at=exposure["created_at"], - creator_name=creator_name, - creator_email=creator_email, - refable_models=refable_models, + creator_name=creator_name or "", + creator_email=creator_email or "", description=exposure.get("description", ""), native_query=native_query, ) @@ -765,28 +554,25 @@ def increase_indent(self, flow=False, indentless=False): documented_exposure_names.append(exposure_name) # Output dbt YAML - with open( - os.path.expanduser(os.path.join(output_path, f"{output_name}.yml")), - "w", - encoding="utf-8", - ) as docs: + result = { + "version": self._RESOURCE_VERSION, + "exposures": parsed_exposures, + } + with open(self.output_file, "w", encoding="utf-8") as docs: yaml.dump( - {"version": _RESOURCE_VERSION, "exposures": parsed_exposures}, + result, docs, - Dumper=DbtDumper, + Dumper=self.DbtDumper, default_flow_style=False, allow_unicode=True, sort_keys=False, ) - - # Return object - return {"version": _RESOURCE_VERSION, "exposures": parsed_exposures} + return result def _extract_card_exposures( self, card_id: int, exposure: Optional[Mapping] = None, - refable_models: Optional[MutableMapping] = None, ): """Extracts exposures from Metabase questions populating `self.models_exposed` @@ -800,12 +586,9 @@ def _extract_card_exposures( None -- self.models_exposed is populated through this method. """ - if refable_models is None: - refable_models = {} - # If an exposure is not passed, pull from id if not exposure: - exposure = self.api("get", f"/api/card/{card_id}") + exposure = self.client.api("get", f"/api/card/{card_id}") query = exposure.get("dataset_query", {}) @@ -817,15 +600,13 @@ def _extract_card_exposures( if str(source_table_id).startswith("card__"): # Handle questions based on other question in virtual db - self._extract_card_exposures( - int(source_table_id.split("__")[-1]), refable_models=refable_models - ) + self._extract_card_exposures(int(source_table_id.split("__")[-1])) else: # Normal question - source_table = self.table_map.get(source_table_id) + source_table = self.table_names.get(source_table_id) if source_table: logger().info( - ":direct_hit: Model extracted from Metabase question: %s", + "Model extracted from Metabase question: %s", source_table, ) self.models_exposed.append(source_table) @@ -835,31 +616,30 @@ def _extract_card_exposures( # Handle questions based on other question in virtual db if str(query_join.get("source-table", "")).startswith("card__"): self._extract_card_exposures( - int(query_join.get("source-table").split("__")[-1]), - refable_models=refable_models, + int(query_join.get("source-table").split("__")[-1]) ) continue # Joined model parsed - joined_table = self.table_map.get(query_join.get("source-table")) + joined_table = self.table_names.get(query_join.get("source-table")) if joined_table: logger().info( - ":direct_hit: Model extracted from Metabase question join: %s", + "Model extracted from Metabase question join: %s", joined_table, ) self.models_exposed.append(joined_table) elif query.get("type") == "native": # Metabase native query - native_query = query.get("native").get("query") + native_query = query["native"].get("query") ctes: List[str] = [] # Parse common table expressions for exclusion - for matched_cte in re.findall(self.cte_parser, native_query): + for matched_cte in re.findall(self._CTE_PARSER, native_query): ctes.extend(group.upper() for group in matched_cte if group) # Parse SQL for exposures through FROM or JOIN clauses - for sql_ref in re.findall(self.exposure_parser, native_query): + for sql_ref in re.findall(self._EXPOSURE_PARSER, native_query): # Grab just the table / model name clean_exposure = sql_ref.split(".")[-1].strip('"').upper() @@ -867,12 +647,12 @@ def _extract_card_exposures( if clean_exposure in ctes and "." not in sql_ref: continue # Verify this is one of our parsed refable models so exposures dont break the DAG - if not refable_models.get(clean_exposure): + if not self.model_refs.get(clean_exposure): continue if clean_exposure: logger().info( - ":direct_hit: Model extracted from native query: %s", + "Model extracted from native query: %s", clean_exposure, ) self.models_exposed.append(clean_exposure) @@ -888,7 +668,6 @@ def _build_exposure( created_at: str, creator_name: str, creator_email: str, - refable_models: Mapping, description: str = "", native_query: str = "", ) -> Mapping: @@ -903,7 +682,6 @@ def _build_exposure( created_at {str} -- Timestamp of exposure creation derived from Metabase creator_name {str} -- Creator name derived from Metabase creator_email {str} -- Creator email derived from Metabase - refable_models {str} -- List of dbt models from dbt parser which can validly be referenced, parsed exposures are always checked against this list to avoid generating invalid yaml Keyword Arguments: description {str} -- The description of the exposure as documented in Metabase. (default: No description provided in Metabase) @@ -945,27 +723,88 @@ def _build_exposure( ) # Output exposure - return { "name": name, "label": label, "description": description, "type": "analysis" if exposure_type == "card" else "dashboard", - "url": f"{self.base_url}/{exposure_type}/{exposure_id}", + "url": f"{self.client.url}/{exposure_type}/{exposure_id}", "maturity": "medium", "owner": { "name": creator_name, - "email": creator_email or "", + "email": creator_email, }, "depends_on": list( { - refable_models[exposure.upper()] + self.model_refs[exposure.upper()] for exposure in list({m for m in self.models_exposed}) - if exposure.upper() in refable_models + if exposure.upper() in self.model_refs } ), } + +class MetabaseClient: + """Metabase API client.""" + + def __init__( + self, + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + session_id: Optional[str] = None, + verify: bool = True, + cert: Optional[Union[str, Tuple[str, str]]] = None, + http_timeout: int = 15, + http_headers: Optional[dict] = None, + http_adapter: Optional[HTTPAdapter] = None, + ): + """New Metabase client. + + Args: + url (str): Metabase URL, e.g. "https://metabase.example.com". + username (Optional[str], optional): Metabase username (required unless providing session_id). Defaults to None. + password (Optional[str], optional): Metabase password (required unless providing session_id). Defaults to None. + session_id (Optional[str], optional): Metabase session ID. Defaults to None. + verify (bool, optional): Verify the TLS certificate at the Metabase end. Defaults to True. + cert (Optional[Union[str, Tuple[str, str]]], optional): Path to a custom certificate. Defaults to None. + http_timeout (int, optional): HTTP request timeout in secs. Defaults to 15. + http_headers (Optional[dict], optional): Additional HTTP headers. Defaults to None. + http_adapter (Optional[HTTPAdapter], optional): Custom requests HTTP adapter. Defaults to None. + """ + + self.url = url + if self.url.endswith("/"): + self.url = self.url[:-1] + + self.http_timeout = http_timeout + + self.session = requests.Session() + self.session.verify = verify + self.session.cert = cert + + if http_headers: + self.session.headers.update(http_headers) + + self.session.mount( + self.url, + http_adapter or HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.5)), + ) + + if not session_id: + if username and password: + session = self.api( + "post", + "/api/session", + json={"username": username, "password": password}, + ) + session_id = str(session["id"]) + else: + raise MetabaseArgumentError("Credentials or session ID required") + self.session.headers["X-Metabase-Session"] = session_id + + logger().info("Session established successfully") + def api( self, method: str, @@ -975,45 +814,84 @@ def api( ) -> Mapping: """Unified way of calling Metabase API. - Arguments: - method {str} -- HTTP verb, e.g. get, post, put. - path {str} -- Relative path of endpoint, e.g. /api/database. - - Keyword Arguments: - authenticated {bool} -- Includes session ID when true. (default: {True}) - critical {bool} -- Raise on any HTTP errors. (default: {True}) + Args: + method (str): HTTP verb, e.g. get, post, put. + path (str): Relative path of endpoint, e.g. /api/database. + critical (bool, optional): Raise on any HTTP errors. Defaults to True. Returns: - Any -- JSON payload of the endpoint. + Mapping: JSON payload of the endpoint. """ response = self.session.request( method, - f"{self.base_url}{path}", + f"{self.url}{path}", timeout=self.http_timeout, **kwargs, ) - if critical: - try: - response.raise_for_status() - except requests.exceptions.HTTPError: - if "json" in kwargs and "password" in kwargs["json"]: - logger().error("HTTP request failed. Response: %s", response.text) - else: - logger().error( - "HTTP request failed. Payload: %s. Response: %s", - kwargs.get("json"), - response.text, - ) + try: + response.raise_for_status() + except requests.exceptions.HTTPError: + if critical: + logger().error("HTTP request failed: %s", response.text) raise - elif not response.ok: return {} - response_json = json.loads(response.text) - - # Since X.40.0 responses are encapsulated in "data" with pagination parameters + response_json = response.json() if "data" in response_json: + # Since X.40.0 responses are encapsulated in "data" with pagination parameters return response_json["data"] return response_json + + def export_models( + self, + database: str, + models: List[MetabaseModel], + exclude_sources: bool = False, + sync_timeout: int = 30, + ): + """Exports dbt models to Metabase database schema. + + Args: + database (str): Metabase database name. + models (List[MetabaseModel]): List of dbt models read from project. + exclude_sources (bool, optional): Exclude dbt sources from export. Defaults to False. + """ + _ExportModelsJob( + client=self, + database=database, + models=models, + exclude_sources=exclude_sources, + sync_timeout=sync_timeout, + ).execute() + + def extract_exposures( + self, + models: List[MetabaseModel], + output_path: str = ".", + output_name: str = "metabase_exposures", + include_personal_collections: bool = True, + collection_excludes: Optional[Iterable] = None, + ) -> Mapping: + """Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader. + + Args: + models (List[MetabaseModel]): List of dbt models. + output_path (str, optional): Path for output YAML. Defaults to ".". + output_name (str, optional): Name for output YAML. Defaults to "metabase_exposures". + include_personal_collections (bool, optional): Include personal Metabase collections. Defaults to True. + collection_excludes (Optional[Iterable], optional): Exclude certain Metabase collections. Defaults to None. + + Returns: + Mapping: _description_ + """ + return _ExtractExposuresJob( + client=self, + models=models, + output_path=output_path, + output_name=output_name, + include_personal_collections=include_personal_collections, + collection_excludes=collection_excludes, + ).execute() diff --git a/dbtmetabase/models/__init__.py b/dbtmetabase/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/dbtmetabase/models/exceptions.py b/dbtmetabase/models/exceptions.py deleted file mode 100644 index 730a78d6..00000000 --- a/dbtmetabase/models/exceptions.py +++ /dev/null @@ -1,18 +0,0 @@ -class NoDbtPathSupplied(Exception): - """Thrown when no argument for dbt path has been supplied""" - - -class NoDbtSchemaSupplied(Exception): - """Thrown when using folder parser without supplying a schema""" - - -class NoMetabaseCredentialsSupplied(Exception): - """Thrown when credentials or session id not supplied""" - - -class MetabaseUnableToSync(Exception): - """Thrown when Metabase cannot sync / align models with dbt model""" - - -class MetabaseRuntimeError(Exception): - """Thrown when Metabase execution failed.""" diff --git a/dbtmetabase/models/interface.py b/dbtmetabase/models/interface.py deleted file mode 100644 index 8ecee627..00000000 --- a/dbtmetabase/models/interface.py +++ /dev/null @@ -1,239 +0,0 @@ -import logging -from os.path import expandvars -from typing import Iterable, List, MutableMapping, Optional, Tuple, Union - -from requests.adapters import HTTPAdapter - -from ..metabase import MetabaseClient -from ..parsers.dbt import DbtReader -from ..parsers.dbt_folder import DbtFolderReader -from ..parsers.dbt_manifest import DbtManifestReader -from .exceptions import ( - NoDbtPathSupplied, - NoDbtSchemaSupplied, - NoMetabaseCredentialsSupplied, -) -from .metabase import MetabaseModel - - -class MetabaseInterface: - """Interface for interacting with Metabase and preparing a client object.""" - - _client: Optional[MetabaseClient] = None - - def __init__( - self, - database: str, - host: str, - user: Optional[str] = None, - password: Optional[str] = None, - session_id: Optional[str] = None, - use_http: bool = False, - verify: Optional[Union[str, bool]] = True, - cert: Optional[Union[str, Tuple[str, str]]] = None, - sync: bool = True, - sync_timeout: Optional[int] = None, - exclude_sources: bool = False, - http_extra_headers: Optional[dict] = None, - http_timeout: int = 15, - http_adapter: Optional[HTTPAdapter] = None, - ): - """Constructor. - - Args: - database (str): Target database name as set in Metabase (typically aliased). - host (str): Metabase hostname. - user (str): Metabase username. - password (str): Metabase password. - session_id (Optional[str], optional): Session ID. Defaults to None. - use_http (bool, optional): Use HTTP to connect to Metabase.. Defaults to False. - verify (Optional[Union[str, bool]], optional): Path to custom certificate bundle to be used by Metabase client. Defaults to True. - cert (Optional[Union[str, Tuple[str, str]]], optional): Path to a custom certificate to be used by the Metabase client, or a tuple containing the path to the certificate and key. Defaults to None. - sync (bool, optional): Attempt to synchronize Metabase schema with local models. Defaults to True. - sync_timeout (Optional[int], optional): Synchronization timeout (in secs). Defaults to None. - exclude_sources (bool, optional): Exclude exporting sources. Defaults to False. - http_extra_headers (Optional[dict], optional): HTTP headers to be used by the Metabase client. Defaults to None. - http_adapter: (Optional[HTTPAdapter], optional) Provide custom HTTP adapter implementation for requests to use. Defaults to None. - """ - - # Metabase Client - self.database = database - self.host = host - self.user = user - self.password = password - self.session_id = session_id - # Metabase additional connection opts - self.use_http = use_http - self.verify = verify - self.cert = cert - self.http_extra_headers = dict(http_extra_headers) if http_extra_headers else {} - self.http_timeout = http_timeout - self.http_adapter = http_adapter - # Metabase Sync - self.sync = sync - self.sync_timeout = sync_timeout - self.exclude_sources = exclude_sources - - @property - def client(self) -> MetabaseClient: - if self._client is None: - self.prepare_metabase_client() - assert self._client - return self._client - - def prepare_metabase_client(self, dbt_models: Optional[List[MetabaseModel]] = None): - """Prepares the metabase client which can then after be accessed via the `client` property - - Args: - dbt_models (Optional[List[MetabaseModel]]): Used if sync is enabled to verify all dbt models passed exist in Metabase - - Raises: - MetabaseUnableToSync: This error is raised if sync is enabled and a timeout is explicitly set in the `Metabase` object config - NoMetabaseCredentialsSupplied: This error is raised if credentials or session_id is not supplied - """ - - if self._client is not None: - # Already prepared - return - - if dbt_models is None: - dbt_models = [] - - if not (self.user and self.password) and not self.session_id: - raise NoMetabaseCredentialsSupplied( - "Credentials or session ID not supplied" - ) - - self._client = MetabaseClient( - host=self.host, - user=self.user, - password=self.password, - use_http=self.use_http, - verify=self.verify, - cert=self.cert, - http_extra_headers=self.http_extra_headers, - session_id=self.session_id, - sync=self.sync, - sync_timeout=self.sync_timeout, - exclude_sources=self.exclude_sources, - http_timeout=self.http_timeout, - http_adapter=self.http_adapter, - ) - # Sync and attempt schema alignment prior to execution; if timeout is not explicitly set, proceed regardless of success - if self.sync: - self._client.sync_and_wait(self.database, dbt_models) - - -class DbtInterface: - """Interface for interacting with dbt and preparing a validated parser object.""" - - _parser: Optional[Union[DbtManifestReader, DbtFolderReader]] = None - - def __init__( - self, - database: str, - manifest_path: Optional[str] = None, - path: Optional[str] = None, - schema: Optional[str] = None, - schema_excludes: Optional[Iterable] = None, - includes: Optional[Iterable] = None, - excludes: Optional[Iterable] = None, - ): - """Constructor. - - Args: - database (str): Target database name as specified in dbt models to be actioned. - manifest_path (Optional[str], optional): Path to dbt manifest.json file (typically located in the /target/ directory of the dbt project). Defaults to None. - path (Optional[str], optional): Path to dbt project. If specified with manifest_path, then the manifest is prioritized. Defaults to None. - schema (Optional[str], optional): Target schema. Should be passed if using folder parser. Defaults to None. - schema_excludes (Optional[Iterable], optional): Target schemas to exclude. Ignored in folder parser. Defaults to None. - includes (Optional[Iterable], optional): Model names to limit processing to. Defaults to None. - excludes (Optional[Iterable], optional): Model names to exclude. Defaults to None. - """ - - if schema_excludes is None: - schema_excludes = [] - if includes is None: - includes = [] - if excludes is None: - excludes = [] - - # dbt Reader - self.database = database - self.manifest_path = manifest_path - self.path = path - # dbt Target Models - self.schema = schema - self._schema_excludes = schema_excludes - self.includes = includes - self.excludes = excludes - - self.validate_config() - - @property - def parser(self) -> DbtReader: - if self._parser is None: - self.prepare_dbt_parser() - assert self._parser - return self._parser - - @property - def schema_excludes(self) -> Iterable: - return self._schema_excludes - - @schema_excludes.setter - def schema_excludes(self, value: Iterable): - self._schema_excludes = list({schema.upper() for schema in value}) - - def validate_config(self): - """Validates a dbt config object - - Raises: - NoDbtPathSupplied: If no path for either manifest or project is supplied, this error is raised - NoDbtSchemaSupplied: If no schema is supplied while using the folder parser, this error is raised - """ - # Check 1 Verify Path - if not (self.path or self.manifest_path): - raise NoDbtPathSupplied( - "One of either dbt_path or dbt_manifest_path is required." - ) - # Check 2 Notify User if Both Paths Are Supplied - if self.path and self.manifest_path: - logging.warning( - "Both dbt path and manifest path were supplied. Prioritizing manifest parser" - ) - # Check 3 Validation for Folder Parser - if self.path and not self.schema: - raise NoDbtSchemaSupplied( - "Must supply a schema if using YAML parser, it is used to resolve foreign key relations and which Metabase models to propagate documentation to" - ) - # ... Add checks to interface as needed - - def prepare_dbt_parser(self): - """Resolve dbt reader being either YAML or manifest.json based.""" - - if self._parser is not None: - # Already prepared - return - - kwargs = { - "database": self.database, - "schema": self.schema, - "schema_excludes": self.schema_excludes, - "includes": self.includes, - "excludes": self.excludes, - } - self._parser: DbtReader - if self.manifest_path: - self._parser = DbtManifestReader(expandvars(self.manifest_path), **kwargs) - elif self.path: - self._parser = DbtFolderReader(expandvars(self.path), **kwargs) - else: - raise NoDbtPathSupplied("Either path or path is required.") - - def read_models( - self, - include_tags: bool = True, - docs_url: Optional[str] = None, - ) -> Tuple[List[MetabaseModel], MutableMapping]: - return self.parser.read_models(include_tags, docs_url) diff --git a/dbtmetabase/models/metabase.py b/dbtmetabase/models/metabase.py deleted file mode 100644 index 5775160e..00000000 --- a/dbtmetabase/models/metabase.py +++ /dev/null @@ -1,84 +0,0 @@ -from dataclasses import dataclass, field -from enum import Enum -from typing import MutableMapping, Optional, Sequence - -# Allowed metabase.* fields -_METABASE_COMMON_META_FIELDS = [ - "display_name", - "visibility_type", -] -# Must be covered by MetabaseColumn attributes -METABASE_COLUMN_META_FIELDS = _METABASE_COMMON_META_FIELDS + [ - "semantic_type", - "has_field_values", - "coercion_strategy", - "number_style", -] -# Must be covered by MetabaseModel attributes -METABASE_MODEL_META_FIELDS = _METABASE_COMMON_META_FIELDS + [ - "points_of_interest", - "caveats", -] - -# Default model schema (only schema in BigQuery) -METABASE_MODEL_DEFAULT_SCHEMA = "PUBLIC" - - -class ModelType(str, Enum): - nodes = "nodes" - sources = "sources" - - -@dataclass -class MetabaseColumn: - name: str - description: Optional[str] = None - - display_name: Optional[str] = None - visibility_type: Optional[str] = None - semantic_type: Optional[str] = None - has_field_values: Optional[str] = None - coercion_strategy: Optional[str] = None - number_style: Optional[str] = None - - fk_target_table: Optional[str] = None - fk_target_field: Optional[str] = None - - meta_fields: MutableMapping = field(default_factory=dict) - - -@dataclass -class MetabaseModel: - name: str - schema: str - description: str = "" - - display_name: Optional[str] = None - visibility_type: Optional[str] = None - points_of_interest: Optional[str] = None - caveats: Optional[str] = None - - model_type: ModelType = ModelType.nodes - dbt_name: Optional[str] = None - source: Optional[str] = None - unique_id: Optional[str] = None - - @property - def ref(self) -> Optional[str]: - if self.model_type == ModelType.nodes: - return f"ref('{self.name}')" - elif self.model_type == ModelType.sources: - return f"source('{self.source}', '{self.name if self.dbt_name is None else self.dbt_name}')" - return None - - columns: Sequence[MetabaseColumn] = field(default_factory=list) - - -class _NullValue(str): - """Explicitly null field value.""" - - def __eq__(self, other: object) -> bool: - return other is None - - -NullValue = _NullValue() diff --git a/dbtmetabase/parsers/__init__.py b/dbtmetabase/parsers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/dbtmetabase/parsers/dbt.py b/dbtmetabase/parsers/dbt.py deleted file mode 100644 index 99509880..00000000 --- a/dbtmetabase/parsers/dbt.py +++ /dev/null @@ -1,125 +0,0 @@ -from abc import ABCMeta, abstractmethod -from os.path import expanduser -from typing import Iterable, List, Mapping, MutableMapping, Optional, Tuple - -from ..logger.logging import logger -from ..models.metabase import MetabaseColumn, MetabaseModel, NullValue - - -class DbtReader(metaclass=ABCMeta): - """Base dbt reader.""" - - def __init__( - self, - path: str, - database: str, - schema: Optional[str], - schema_excludes: Optional[Iterable], - includes: Optional[Iterable], - excludes: Optional[Iterable], - ): - """Constructor. - - Args: - path (str): Path to dbt target. - database (str): Target database name as specified in dbt models to be actioned. - path (Optional[str]): Path to dbt project. If specified with manifest_path, then the manifest is prioritized. - schema (Optional[str]): Target schema. Should be passed if using folder parser. - schema_excludes (Optional[Iterable]): Target schemas to exclude. Ignored in folder parser. - includes (Optional[Iterable]): Model names to limit processing to. - excludes (Optional[Iterable]): Model names to exclude. - """ - - self.path = expanduser(path) - self.database = database.upper() if database else None - self.schema = schema.upper() if schema else None - self.schema_excludes = [x.upper() for x in schema_excludes or []] - self.includes = [x.upper() for x in includes or []] - self.excludes = [x.upper() for x in excludes or []] - self.alias_mapping: MutableMapping = {} - - @abstractmethod - def read_models( - self, - include_tags: bool = True, - docs_url: Optional[str] = None, - ) -> Tuple[List[MetabaseModel], MutableMapping]: - pass - - def model_selected(self, name: str) -> bool: - """Checks whether model passes inclusion/exclusion criteria. - - Args: - name (str): Model name. - - Returns: - bool: True if included, false otherwise. - """ - n = name.upper() - return n not in self.excludes and (not self.includes or n in self.includes) - - def set_column_foreign_key( - self, - column: Mapping, - metabase_column: MetabaseColumn, - table: Optional[str], - field: Optional[str], - schema: Optional[str], - ): - """Sets foreign key target on a column. - - Args: - column (Mapping): Schema column definition. - metabase_column (MetabaseColumn): Metabase column definition. - table (str): Foreign key target table. - field (str): Foreign key target field. - schema (str): Current schema name. - """ - # Meta fields take precedence - meta = column.get("meta", {}) - table = meta.get("metabase.fk_target_table", table) - field = meta.get("metabase.fk_target_field", field) - - if not table or not field: - if table or field: - logger().warning( - "Foreign key requires table and field for column %s", - metabase_column.name, - ) - return - - table_path = table.split(".") - if len(table_path) == 1 and schema: - table_path.insert(0, schema) - - metabase_column.semantic_type = "type/FK" - metabase_column.fk_target_table = ".".join( - [x.strip('"').upper() for x in table_path] - ) - metabase_column.fk_target_field = field.strip('"').upper() - logger().debug( - "Relation from %s to %s.%s", - metabase_column.name, - metabase_column.fk_target_table, - metabase_column.fk_target_field, - ) - - @staticmethod - def read_meta_fields(obj: Mapping, fields: List) -> Mapping: - """Reads meta fields from a schem object. - - Args: - obj (Mapping): Schema object. - fields (List): List of fields to read. - - Returns: - Mapping: Field values. - """ - - vals = {} - meta = obj.get("meta", []) - for field in fields: - if f"metabase.{field}" in meta: - value = meta[f"metabase.{field}"] - vals[field] = value if value is not None else NullValue - return vals diff --git a/dbtmetabase/parsers/dbt_folder.py b/dbtmetabase/parsers/dbt_folder.py deleted file mode 100644 index be3aca4f..00000000 --- a/dbtmetabase/parsers/dbt_folder.py +++ /dev/null @@ -1,238 +0,0 @@ -import re -from pathlib import Path -from typing import List, Mapping, MutableMapping, Optional, Tuple - -import yaml - -from ..logger.logging import logger -from ..models.metabase import ( - METABASE_COLUMN_META_FIELDS, - METABASE_MODEL_DEFAULT_SCHEMA, - METABASE_MODEL_META_FIELDS, - MetabaseColumn, - MetabaseModel, - ModelType, -) -from .dbt import DbtReader - - -class DbtFolderReader(DbtReader): - """ - Reader for dbt project configuration. - """ - - def read_models( - self, - include_tags: bool = True, - docs_url: Optional[str] = None, - ) -> Tuple[List[MetabaseModel], MutableMapping]: - """Reads dbt models in Metabase-friendly format. - - Keyword Arguments: - include_tags {bool} -- Append dbt model tags to dbt model descriptions. (default: {True}) - docs_url {Optional[str]} -- Append dbt docs url to dbt model description - - Returns: - list -- List of dbt models in Metabase-friendly format. - """ - - mb_models: List[MetabaseModel] = [] - - schema = self.schema or METABASE_MODEL_DEFAULT_SCHEMA - - for path in (Path(self.path) / "models").rglob("*.yml"): - with open(path, "r", encoding="utf-8") as stream: - schema_file = yaml.safe_load(stream) - if not schema_file: - logger().warning("Skipping empty or invalid YAML: %s", path) - continue - - for model in schema_file.get("models", []): - model_name = model.get("alias", model["name"]).upper() - - # Refs will still use file name -- this alias mapping is good for getting the right name in the database - if "alias" in model: - self.alias_mapping[model_name] = model["name"].upper() - - logger().info("Processing model: %s", path) - - if not self.model_selected(model_name): - logger().debug( - "Skipping %s not included in includes or excluded by excludes", - model_name, - ) - continue - - mb_models.append( - self._read_model( - model=model, - schema=schema, - model_type=ModelType.nodes, - include_tags=include_tags, - ) - ) - - for source in schema_file.get("sources", []): - source_schema_name = source.get("schema", source["name"]).upper() - - if "{{" in source_schema_name and "}}" in source_schema_name: - logger().warning( - "dbt folder reader cannot resolve Jinja expressions, defaulting to current schema" - ) - source_schema_name = schema - - elif source_schema_name != schema: - logger().debug( - "Skipping schema %s not in target schema %s", - source_schema_name, - schema, - ) - continue - - for model in source.get("tables", []): - model_name = model.get("identifier", model["name"]).upper() - - # These will be used to resolve our regex parsed source() references - if "identifier" in model: - self.alias_mapping[model_name] = model["name"].upper() - - logger().info( - "Processing source: %s -- table: %s", path, model_name - ) - - if not self.model_selected(model_name): - logger().debug( - "Skipping %s not included in includes or excluded by excludes", - model_name, - ) - continue - - mb_models.append( - self._read_model( - model=model, - source=source["name"], - model_type=ModelType.sources, - schema=source_schema_name, - include_tags=include_tags, - ) - ) - - return mb_models, self.alias_mapping - - def _read_model( - self, - model: dict, - schema: str, - source: Optional[str] = None, - model_type: ModelType = ModelType.nodes, - include_tags: bool = True, - ) -> MetabaseModel: - """Reads one dbt model in Metabase-friendly format. - - Arguments: - model {dict} -- One dbt model to read. - schema {str} -- Schema as passed doen from CLI args or parsed from `source` - source {str, optional} -- Name of the source if source - model_type {str} -- The type of the node which can be one of either nodes or sources - include_tags: {bool} -- Flag to append tags to description of model - - Returns: - dict -- One dbt model in Metabase-friendly format. - """ - - metabase_columns: List[MetabaseColumn] = [] - - for column in model.get("columns", []): - metabase_columns.append(self._read_column(column, schema)) - - description = model.get("description", "") - if include_tags: - tags = model.get("tags", []) - if tags: - tags = ", ".join(tags) - if description: - description += "\n\n" - description += f"Tags: {tags}" - - # Resolved name is what the name will be in the database - resolved_name = model.get("alias", model.get("identifier")) - dbt_name = None - if not resolved_name: - resolved_name = model["name"] - else: - dbt_name = model["name"] - - return MetabaseModel( - name=resolved_name, - schema=schema, - description=description, - columns=metabase_columns, - model_type=model_type, - source=source, - dbt_name=dbt_name, - **self.read_meta_fields(model, METABASE_MODEL_META_FIELDS), - ) - - def _read_column(self, column: Mapping, schema: str) -> MetabaseColumn: - """Reads one dbt column in Metabase-friendly format. - - Arguments: - column {dict} -- One dbt column to read. - schema {str} -- Schema as passed down from CLI args or parsed from `source` - - Returns: - dict -- One dbt column in Metabase-friendly format. - """ - - column_name = column.get("name", "").upper().strip('"') - column_description = column.get("description") - - metabase_column = MetabaseColumn( - name=column_name, - description=column_description, - **self.read_meta_fields(column, METABASE_COLUMN_META_FIELDS), - ) - - fk_target_table = None - fk_target_field = None - - for test in column.get("tests") or []: - if isinstance(test, dict): - if "relationships" in test: - relationships = test["relationships"] - fk_target_table = self.parse_ref(relationships["to"]) - if not fk_target_table: - logger().warning( - "Could not resolve foreign key target table for column %s", - metabase_column.name, - ) - continue - fk_target_field = relationships["field"] - - self.set_column_foreign_key( - column=column, - metabase_column=metabase_column, - table=fk_target_table, - field=fk_target_field, - schema=schema, - ) - - return metabase_column - - @staticmethod - def parse_ref(text: str) -> Optional[str]: - """Parses dbt ref() or source() statement. - - Arguments: - text {str} -- Full statement in dbt YAML. - - Returns: - str -- Name of the reference. - """ - - # We are catching the rightmost argument of either source or ref which is ultimately the table name - matches = re.findall(r"['\"]([\w\_\-\ ]+)['\"][ ]*\)$", text.strip()) - if matches: - logger().debug("%s -> %s", text, matches[0]) - return matches[0] - return None diff --git a/setup.py b/setup.py index 979cb9fb..9d19cedf 100755 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def requires_from_file(filename: str) -> list: url="https://github.com/gouline/dbt-metabase", license="MIT License", entry_points={ - "console_scripts": ["dbt-metabase = dbtmetabase.cli:cli"], + "console_scripts": ["dbt-metabase = dbtmetabase.__main__:cli"], }, packages=find_packages(exclude=["tests"]), test_suite="tests", diff --git a/tests/__init__.py b/tests/__init__.py index 67a65a94..0b173941 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,2 @@ -from .test_dbt_parsers import * +from .test_dbt import * from .test_metabase import * diff --git a/tests/fixtures/exposure/baseline_test_exposures.yml b/tests/fixtures/exposure/baseline_test_exposures.yml index 7e12877f..d4c5def3 100644 --- a/tests/fixtures/exposure/baseline_test_exposures.yml +++ b/tests/fixtures/exposure/baseline_test_exposures.yml @@ -411,7 +411,7 @@ exposures: url: http://localhost:3000/card/3 maturity: medium owner: - name: null + name: '' email: '' depends_on: - ref('customers') diff --git a/tests/fixtures/mock_api/api/database/2.json b/tests/fixtures/mock_api/api/database/2.json index 4c7202ab..374ac2d5 100644 --- a/tests/fixtures/mock_api/api/database/2.json +++ b/tests/fixtures/mock_api/api/database/2.json @@ -1 +1,64 @@ -{"description": null, "features": ["full-join", "basic-aggregations", "standard-deviation-aggregations", "expression-aggregations", "percentile-aggregations", "foreign-keys", "right-join", "left-join", "native-parameters", "nested-queries", "expressions", "set-timezone", "regex", "case-sensitivity-string-filter-options", "binning", "inner-join", "advanced-math-expressions"], "cache_field_values_schedule": "0 0 22 * * ? *", "timezone": "UTC", "auto_run_queries": true, "metadata_sync_schedule": "0 46 * * * ? *", "name": "unit_testing", "caveats": null, "is_full_sync": true, "updated_at": "2021-07-21T07:05:13.699978Z", "details": {"host": "postgres", "port": null, "dbname": "test", "user": "postgres", "password": "**MetabasePass**", "ssl": false, "additional-options": null, "tunnel-enabled": false}, "is_sample": false, "id": 2, "is_on_demand": false, "options": null, "schedules": {"cache_field_values": {"schedule_minute": 0, "schedule_day": null, "schedule_frame": null, "schedule_hour": 22, "schedule_type": "daily"}, "metadata_sync": {"schedule_minute": 46, "schedule_day": null, "schedule_frame": null, "schedule_hour": null, "schedule_type": "hourly"}}, "engine": "postgres", "refingerprint": null, "created_at": "2021-07-21T05:38:53.637091Z", "points_of_interest": null} \ No newline at end of file +{ + "description": null, + "features": [ + "full-join", + "basic-aggregations", + "standard-deviation-aggregations", + "expression-aggregations", + "percentile-aggregations", + "foreign-keys", + "right-join", + "left-join", + "native-parameters", + "nested-queries", + "expressions", + "set-timezone", + "regex", + "case-sensitivity-string-filter-options", + "binning", + "inner-join", + "advanced-math-expressions" + ], + "cache_field_values_schedule": "0 0 22 * * ? *", + "timezone": "UTC", + "auto_run_queries": true, + "metadata_sync_schedule": "0 46 * * * ? *", + "name": "unit_testing", + "caveats": null, + "is_full_sync": true, + "updated_at": "2021-07-21T07:05:13.699978Z", + "details": { + "host": "postgres", + "port": null, + "dbname": "test", + "user": "postgres", + "password": "**MetabasePass**", + "ssl": false, + "additional-options": null, + "tunnel-enabled": false + }, + "is_sample": false, + "id": 2, + "is_on_demand": false, + "options": null, + "schedules": { + "cache_field_values": { + "schedule_minute": 0, + "schedule_day": null, + "schedule_frame": null, + "schedule_hour": 22, + "schedule_type": "daily" + }, + "metadata_sync": { + "schedule_minute": 46, + "schedule_day": null, + "schedule_frame": null, + "schedule_hour": null, + "schedule_type": "hourly" + } + }, + "engine": "postgres", + "refingerprint": null, + "created_at": "2021-07-21T05:38:53.637091Z", + "points_of_interest": null +} diff --git a/tests/test_dbt_parsers.py b/tests/test_dbt.py similarity index 50% rename from tests/test_dbt_parsers.py rename to tests/test_dbt.py index 5be754cb..400d0c6f 100644 --- a/tests/test_dbt_parsers.py +++ b/tests/test_dbt.py @@ -1,298 +1,34 @@ import logging import unittest -from dbtmetabase.models.interface import DbtInterface -from dbtmetabase.models.metabase import ModelType, NullValue -from dbtmetabase.parsers.dbt_folder import MetabaseColumn, MetabaseModel +from dbtmetabase.dbt import ( + DbtReader, + MetabaseColumn, + MetabaseModel, + ModelType, + NullValue, +) -class TestDbtFolderReader(unittest.TestCase): +class TestDbtReader(unittest.TestCase): def setUp(self): """Must specify dbt root dir""" - self.interface = DbtInterface( - database="test", - schema="public", - path="tests/fixtures/sample_project/", - ) - logging.getLogger(__name__) - logging.basicConfig(level=logging.DEBUG) - - def test_read_models(self): - models = self.interface.parser.read_models()[0] - expectation = [ - MetabaseModel( - name="customers", - display_name="clients", - schema="PUBLIC", - description="This table has basic information about a customer, as well as some derived facts based on a customer's orders", - model_type=ModelType.nodes, - dbt_name=None, - source=None, - unique_id=None, - columns=[ - MetabaseColumn( - name="CUSTOMER_ID", - description="This is a unique identifier for a customer", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="FIRST_NAME", - description="Customer's first name. PII.", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="LAST_NAME", - description="Customer's last name. PII.", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="FIRST_ORDER", - description="Date (UTC) of a customer's first order", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="MOST_RECENT_ORDER", - description="Date (UTC) of a customer's most recent order", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="NUMBER_OF_ORDERS", - display_name="order_count", - description="Count of the number of orders a customer has placed", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="TOTAL_ORDER_AMOUNT", - description="Total value (AUD) of a customer's orders", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - ], - ), - MetabaseModel( - name="orders", - schema="PUBLIC", - description="This table has basic information about orders, as well as some derived facts based on payments", - points_of_interest="Basic information only", - caveats="Some facts are derived from payments", - model_type=ModelType.nodes, - dbt_name=None, - source=None, - unique_id=None, - columns=[ - MetabaseColumn( - name="ORDER_ID", - description="This is a unique identifier for an order", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="CUSTOMER_ID", - description="Foreign key to the customers table", - meta_fields={}, - semantic_type="type/FK", - visibility_type=None, - fk_target_table="PUBLIC.CUSTOMERS", - fk_target_field="CUSTOMER_ID", - ), - MetabaseColumn( - name="ORDER_DATE", - description="Date (UTC) that the order was placed", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="STATUS", - description='{{ doc("orders_status") }}', - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="AMOUNT", - description="Total amount (AUD) of the order", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="CREDIT_CARD_AMOUNT", - description="Amount of the order (AUD) paid for by credit card", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="COUPON_AMOUNT", - description="Amount of the order (AUD) paid for by coupon", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="BANK_TRANSFER_AMOUNT", - description="Amount of the order (AUD) paid for by bank transfer", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="GIFT_CARD_AMOUNT", - description="Amount of the order (AUD) paid for by gift card", - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - ], - ), - MetabaseModel( - name="stg_customers", - schema="PUBLIC", - description="", - model_type=ModelType.nodes, - dbt_name=None, - source=None, - unique_id=None, - columns=[ - MetabaseColumn( - name="CUSTOMER_ID", - description=None, - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ) - ], - ), - MetabaseModel( - name="stg_orders", - schema="PUBLIC", - description="", - model_type=ModelType.nodes, - dbt_name=None, - source=None, - unique_id=None, - columns=[ - MetabaseColumn( - name="ORDER_ID", - description=None, - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="STATUS", - description=None, - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - ], - ), - MetabaseModel( - name="stg_payments", - schema="PUBLIC", - description="", - model_type=ModelType.nodes, - dbt_name=None, - source=None, - unique_id=None, - columns=[ - MetabaseColumn( - name="PAYMENT_ID", - description=None, - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - MetabaseColumn( - name="PAYMENT_METHOD", - description=None, - meta_fields={}, - semantic_type=None, - visibility_type=None, - fk_target_table=None, - fk_target_field=None, - ), - ], - ), - ] - self.assertEqual(models, expectation) - logging.info("Done") - - -class TestDbtManifestReader(unittest.TestCase): - def setUp(self): - """Must specify dbt root dir""" - self.interface = DbtInterface( + self.reader = DbtReader( + manifest_path="tests/fixtures/sample_project/target/manifest.json", database="test", schema="public", - manifest_path="tests/fixtures/sample_project/target/manifest.json", ) logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) def test_read_models(self): - models = self.interface.parser.read_models()[0] + models = self.reader.read_models() expectation = [ MetabaseModel( name="orders", schema="PUBLIC", description="This table has basic information about orders, as well as some derived facts based on payments", model_type=ModelType.nodes, - dbt_name="orders", source=None, unique_id="model.jaffle_shop.orders", columns=[ @@ -384,7 +120,6 @@ def test_read_models(self): schema="PUBLIC", description="This table has basic information about a customer, as well as some derived facts based on a customer's orders", model_type=ModelType.nodes, - dbt_name="customers", source=None, unique_id="model.jaffle_shop.customers", columns=[ @@ -458,7 +193,6 @@ def test_read_models(self): schema="PUBLIC", description="", model_type=ModelType.nodes, - dbt_name="stg_orders", source=None, unique_id="model.jaffle_shop.stg_orders", columns=[ @@ -487,7 +221,6 @@ def test_read_models(self): schema="PUBLIC", description="", model_type=ModelType.nodes, - dbt_name="stg_payments", source=None, unique_id="model.jaffle_shop.stg_payments", columns=[ @@ -516,7 +249,6 @@ def test_read_models(self): schema="PUBLIC", description="", model_type=ModelType.nodes, - dbt_name="stg_customers", source=None, unique_id="model.jaffle_shop.stg_customers", columns=[ diff --git a/tests/test_metabase.py b/tests/test_metabase.py index ebf256a9..ee8d16d6 100644 --- a/tests/test_metabase.py +++ b/tests/test_metabase.py @@ -1,12 +1,17 @@ +# pylint: disable=protected-access + import json import logging -import os import unittest +from operator import itemgetter +from pathlib import Path import yaml -from dbtmetabase.metabase import MetabaseClient -from dbtmetabase.models.metabase import MetabaseColumn, MetabaseModel, ModelType +from dbtmetabase.dbt import MetabaseColumn, MetabaseModel, ModelType +from dbtmetabase.metabase import MetabaseClient, _ExportModelsJob, _ExtractExposuresJob + +FIXTURES_PATH = Path("tests") / "fixtures" MODELS = [ MetabaseModel( @@ -242,56 +247,57 @@ class MockMetabaseClient(MetabaseClient): - def get_session_id(self, user: str, password: str) -> str: - return "dummy" - - def api(self, method: str, path: str, **kwargs): - BASE_PATH = "tests/fixtures/mock_api/" + def api(self, method: str, path: str, critical: bool = True, **kwargs): if method == "get": - if os.path.exists(f"{BASE_PATH}/{path.lstrip('/')}.json"): - with open(f"{BASE_PATH}/{path.lstrip('/')}.json") as f: + json_path = FIXTURES_PATH / "mock_api" / f"{path.lstrip('/')}.json" + if json_path.exists(): + with open(json_path, encoding="utf-8") as f: return json.load(f) - else: - return {} + return {} class TestMetabaseClient(unittest.TestCase): def setUp(self): self.client = MockMetabaseClient( - host="localhost:3000", - user="dummy", - password="dummy", - use_http=True, + url="http://localhost:3000", + session_id="dummy", ) logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) def test_exposures(self): - mbc = self.client - mbc.extract_exposures( - MODELS, - output_name="unittest_exposures", - output_path="tests/fixtures/exposure/", + output_path = FIXTURES_PATH / "exposure" + output_name = "unittest_exposures" + job = _ExtractExposuresJob( + client=self.client, + models=MODELS, + output_path=str(output_path), + output_name=output_name, + include_personal_collections=False, + collection_excludes=None, ) - # Baseline in SCM - with open( - "tests/fixtures/exposure/baseline_test_exposures.yml", "r", encoding="utf-8" - ) as f: - baseline = yaml.safe_load(f) - # Load from YAML and tear down - with open( - "tests/fixtures/exposure/unittest_exposures.yml", "r", encoding="utf-8" - ) as f: - sample = yaml.safe_load(f) + job.execute() + + with open(output_path / "baseline_test_exposures.yml", encoding="utf-8") as f: + expected = yaml.safe_load(f) + with open(output_path / f"{output_name}.yml", encoding="utf-8") as f: + actual = yaml.safe_load(f) - baseline_exposures = sorted(baseline["exposures"], key=lambda ele: ele["name"]) - sample_exposures = sorted(sample["exposures"], key=lambda ele: ele["name"]) + expected_exposures = sorted(expected["exposures"], key=itemgetter("name")) + actual_exposures = sorted(actual["exposures"], key=itemgetter("name")) - self.assertEqual(baseline_exposures, sample_exposures) + self.assertEqual(expected_exposures, actual_exposures) def test_build_lookups(self): - mbc = self.client - baseline_tables = [ + job = _ExportModelsJob( + client=self.client, + database="unit_testing", + models=[], + exclude_sources=True, + sync_timeout=0, + ) + + expected_tables = [ "PUBLIC.CUSTOMERS", "PUBLIC.ORDERS", "PUBLIC.RAW_CUSTOMERS", @@ -301,9 +307,10 @@ def test_build_lookups(self): "PUBLIC.STG_ORDERS", "PUBLIC.STG_PAYMENTS", ] - metadata = mbc.build_metadata(database_id=2) - self.assertEqual(baseline_tables, list(metadata.tables.keys())) - baseline_columns = [ + actual_tables = job._load_tables(database_id="2") + self.assertEqual(expected_tables, list(actual_tables.keys())) + + expected_columns = [ [ "CUSTOMER_ID", "FIRST_NAME", @@ -331,5 +338,5 @@ def test_build_lookups(self): ["ORDER_ID", "CUSTOMER_ID", "ORDER_DATE", "STATUS"], ["PAYMENT_ID", "ORDER_ID", "PAYMENT_METHOD", "AMOUNT"], ] - for table, columns in zip(baseline_tables, baseline_columns): - self.assertEqual(columns, list(metadata.tables[table]["fields"].keys())) + for table, columns in zip(expected_tables, expected_columns): + self.assertEqual(columns, list(actual_tables[table]["fields"].keys())) diff --git a/tests/utils_mb_test_suite.py b/tests/utils_mb_test_suite.py index 49b9f23b..710dae78 100644 --- a/tests/utils_mb_test_suite.py +++ b/tests/utils_mb_test_suite.py @@ -1,16 +1,14 @@ import json import logging -import os +from pathlib import Path +from dbtmetabase.dbt import MetabaseColumn, MetabaseModel, ModelType from dbtmetabase.metabase import MetabaseClient -from dbtmetabase.models.metabase import MetabaseColumn, MetabaseModel, ModelType mbc = MetabaseClient( - host="localhost:3000", - user="...", + url="http://localhost:3000", + username="...", password="...", - # use http for localhost docker - use_http=True, ) logging.basicConfig(level=logging.DEBUG) @@ -19,10 +17,9 @@ def test_mock_api(method: str, path: str): BASE_PATH = "tests/fixtures/mock_api/" if method == "get": - if os.path.exists(f"{BASE_PATH}/{path.lstrip('/')}.json"): - return json.load( - open(f"{BASE_PATH}/{path.lstrip('/')}.json", encoding="utf-8") - ) + json_path = Path(f"{BASE_PATH}/{path.lstrip('/')}.json") + if json_path.exists(): + return json.load(open(json_path, encoding="utf-8")) return {}