Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PR 238 and add tests #239

Merged
merged 4 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 69 additions & 56 deletions dbtmetabase/_exposures.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ def extract_exposures(

entity: Mapping
if item["model"] == "card":
entity = self.metabase.get_card(uid=item["id"])
card_entity = self.metabase.find_card(uid=item["id"])
if card_entity is None:
_logger.info("Card '%s' not found, skipping", item["id"])
continue

entity = card_entity
header = (
f"Visualization: {entity.get('display', 'Unknown').title()}"
)
Expand All @@ -106,8 +110,12 @@ def extract_exposures(
native_query = result["native_query"]

elif item["model"] == "dashboard":
entity = self.metabase.get_dashboard(uid=item["id"])
dashboard_entity = self.metabase.find_dashboard(uid=item["id"])
if dashboard_entity is None:
_logger.info("Dashboard '%s' not found, skipping", item["id"])
continue

entity = dashboard_entity
cards = entity.get("ordered_cards", [])
if not cards:
continue
Expand All @@ -121,7 +129,7 @@ def extract_exposures(
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.get_card(uid=card["id"]),
card=self.metabase.find_card(uid=card["id"]),
)["depends"]
)
else:
Expand Down Expand Up @@ -179,81 +187,86 @@ def extract_exposures(
def __extract_card_exposures(
self,
ctx: __Context,
card: Mapping,
card: Optional[Mapping],
) -> Mapping:
"""Extracts exposures from Metabase questions."""

depends = set()
native_query = ""

query = card.get("dataset_query", {})
if query.get("type") == "query":
# Metabase GUI derived query
query_source = query.get("query", {}).get(
"source-table", card.get("table_id")
)

if str(query_source).startswith("card__"):
# Handle questions based on other questions
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.get_card(uid=query_source.split("__")[-1]),
)["depends"]
if card:
query = card.get("dataset_query", {})
if query.get("type") == "query":
# Metabase GUI derived query
query_source = query.get("query", {}).get(
"source-table", card.get("table_id")
)
elif query_source in ctx.table_names:
# Normal question
source_table = ctx.table_names.get(query_source)
_logger.info("Extracted model '%s' from card", source_table)
depends.add(source_table)

# Find models exposed through joins
for join in query.get("query", {}).get("joins", []):
join_source = join.get("source-table")

if str(join_source).startswith("card__"):
if str(query_source).startswith("card__"):
# Handle questions based on other questions
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.get_card(
uid=join_source.split("__")[-1]
card=self.metabase.find_card(
uid=query_source.split("__")[-1]
),
)["depends"]
)
continue
elif query_source in ctx.table_names:
# Normal question
source_table = ctx.table_names.get(query_source)
_logger.info("Extracted model '%s' from card", source_table)
depends.add(source_table)

# Find models exposed through joins
for join in query.get("query", {}).get("joins", []):
join_source = join.get("source-table")

if str(join_source).startswith("card__"):
# Handle questions based on other questions
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.find_card(
uid=join_source.split("__")[-1]
),
)["depends"]
)
continue

# Joined model parsed
joined_table = ctx.table_names.get(join_source)
if joined_table:
_logger.info("Extracted model '%s' from join", joined_table)
depends.add(joined_table)
# Joined model parsed
joined_table = ctx.table_names.get(join_source)
if joined_table:
_logger.info("Extracted model '%s' from join", joined_table)
depends.add(joined_table)

elif query.get("type") == "native":
# Metabase native query
native_query = query["native"].get("query")
ctes: MutableSequence[str] = []
elif query.get("type") == "native":
# Metabase native query
native_query = query["native"].get("query")
ctes: MutableSequence[str] = []

# Parse common table expressions for exclusion
for matched_cte in re.findall(_CTE_PARSER, native_query):
ctes.extend(group.lower() for group in matched_cte if group)
# Parse common table expressions for exclusion
for matched_cte in re.findall(_CTE_PARSER, native_query):
ctes.extend(group.lower() for group in matched_cte if group)

# Parse SQL for exposures through FROM or JOIN clauses
for sql_ref in re.findall(_EXPOSURE_PARSER, native_query):
# Grab just the table / model name
parsed_model = sql_ref.split(".")[-1].strip('"').lower()
# Parse SQL for exposures through FROM or JOIN clauses
for sql_ref in re.findall(_EXPOSURE_PARSER, native_query):
# Grab just the table / model name
parsed_model = sql_ref.split(".")[-1].strip('"').lower()

# Scrub CTEs (qualified sql_refs can not reference CTEs)
if parsed_model in ctes and "." not in sql_ref:
continue
# Scrub CTEs (qualified sql_refs can not reference CTEs)
if parsed_model in ctes and "." not in sql_ref:
continue

# Verify this is one of our parsed refable models so exposures dont break the DAG
if not ctx.model_refs.get(parsed_model):
continue
# Verify this is one of our parsed refable models so exposures dont break the DAG
if not ctx.model_refs.get(parsed_model):
continue

if parsed_model:
_logger.info("Extracted model '%s' from native query", parsed_model)
depends.add(parsed_model)
if parsed_model:
_logger.info(
"Extracted model '%s' from native query", parsed_model
)
depends.add(parsed_model)

return {
"depends": depends,
Expand Down
22 changes: 17 additions & 5 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,29 @@ def get_collection_items(
results = list(filter(lambda x: x["model"] in models, results))
return results

def get_card(self, uid: str) -> Mapping:
def find_card(self, uid: str) -> Optional[Mapping]:
"""Retrieves card (known as question in Metabase UI)."""
return dict(self._api("get", f"/api/card/{uid}"))
try:
return dict(self._api("get", f"/api/card/{uid}"))
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
_logger.warning("Card '%s' not found", uid)
return None
raise

def format_card_url(self, uid: str) -> str:
"""Formats URL link to a card (known as question in Metabase UI)."""
return f"{self.url}/card/{uid}"

def get_dashboard(self, uid: str) -> Mapping:
def find_dashboard(self, uid: str) -> Optional[Mapping]:
"""Retrieves dashboard."""
return dict(self._api("get", f"/api/dashboard/{uid}"))
try:
return dict(self._api("get", f"/api/dashboard/{uid}"))
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
_logger.warning("Dashboard '%s' not found", uid)
return None
raise

def format_dashboard_url(self, uid: str) -> str:
"""Formats URL link to a dashboard."""
Expand All @@ -164,7 +176,7 @@ def find_user(self, uid: str) -> Optional[Mapping]:
try:
return dict(self._api("get", f"/api/user/{uid}"))
except requests.exceptions.HTTPError as error:
if error.response and error.response.status_code == 404:
if error.response.status_code == 404:
_logger.warning("User '%s' not found", uid)
return None
raise
Expand Down
6 changes: 6 additions & 0 deletions tests/_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from pathlib import Path
from typing import Any, Dict, Mapping, Optional, Sequence

import requests

from dbtmetabase.core import DbtMetabase
from dbtmetabase.manifest import Manifest, Model
from dbtmetabase.metabase import Metabase
Expand Down Expand Up @@ -37,6 +39,10 @@ def _api(
if json_path.exists():
with open(json_path, encoding="utf-8") as f:
return json.load(f)
else:
response = requests.Response()
response.status_code = 404
raise requests.exceptions.HTTPError(response=response)
return {}


Expand Down
18 changes: 17 additions & 1 deletion tests/fixtures/api/collection/3/items.json
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,21 @@
},
"favorite": false,
"model": "card"
},
{
"description": null,
"collection_position": null,
"name": "Missing",
"id": 404,
"display": "scalar",
"last-edit-info": {
"id": 1,
"last_name": "",
"first_name": "dbtmetabase",
"email": "[email protected]",
"timestamp": "2021-07-21T08:01:37.449936Z"
},
"favorite": false,
"model": "card"
}
]
]
Loading