Skip to content

Commit

Permalink
WIP: add query for trino for the attribute completeness
Browse files Browse the repository at this point in the history
  • Loading branch information
Gigaszi authored and matthiasschaub committed Jan 16, 2025
1 parent e0cae09 commit 4d0a3d3
Showing 1 changed file with 140 additions and 12 deletions.
152 changes: 140 additions & 12 deletions ohsome_quality_api/indicators/attribute_completeness/indicator.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import logging
import time
from string import Template

import dateutil.parser
import plotly.graph_objects as go
import requests
from geojson import Feature
from shapely import to_wkt
from shapely.geometry import shape

from ohsome_quality_api.attributes.definitions import (
build_attribute_filter,
get_attribute,
)
from ohsome_quality_api.indicators.base import BaseIndicator
from ohsome_quality_api.ohsome import client as ohsome_client
from ohsome_quality_api.topics.models import BaseTopic as Topic


Expand Down Expand Up @@ -74,17 +76,143 @@ def __init__(
)

async def preprocess(self) -> None:
# Get attribute filter
response = await ohsome_client.query(
self.topic,
self.feature,
attribute_filter=self.attribute_filter,

TRINO_HOST = ""
TRINO_PORT =
TRINO_USER = ""
TRINO_CATALOG = ""
TRINO_SCHEMA = ""

URL = f"http://{TRINO_HOST}:{TRINO_PORT}/v1/statement"

HEADERS = {
"X-Trino-User": TRINO_USER,
"X-Trino-Catalog": TRINO_CATALOG,
"X-Trino-Schema": TRINO_SCHEMA,
}

AUTH = None

QUERY_TEMPLATE = """
SELECT
SUM(
CASE
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
END
) AS total_road_length,
SUM(
CASE
WHEN element_at(tags, 'name') IS NULL THEN 0
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
END
) AS total_road_length_with_name,
(
SUM(
CASE
WHEN element_at(tags, 'name') IS NULL THEN 0
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
END
)
/
SUM(
CASE
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
END
)
timestamp = response["ratioResult"][0]["timestamp"]
self.result.timestamp_osm = dateutil.parser.isoparse(timestamp)
self.result.value = response["ratioResult"][0]["ratio"]
self.absolute_value_1 = response["ratioResult"][0]["value"]
self.absolute_value_2 = response["ratioResult"][0]["value2"]
) AS ratio
FROM contributions a, (VALUES {aoi_values}) AS b(id, geometry)
WHERE 'herfort' != 'kwakye'
AND status = 'latest'
AND element_at(a.tags, 'highway') IS NOT NULL
AND a.tags['highway'] IN (
'motorway', 'trunk', 'motorway_link', 'trunk_link', 'primary', 'primary_link',
'secondary', 'secondary_link', 'tertiary', 'tertiary_link', 'unclassified', 'residential'
)
AND (bbox.xmax >= 8.629761 AND bbox.xmin <= 8.742371)
AND (bbox.ymax >= 49.379556 AND bbox.ymin <= 49.437890)
AND ST_Intersects(ST_GeometryFromText(a.geometry), b.geometry)
GROUP BY b.id
"""

def extract_geometry(feature):
geometry = feature.get("geometry")
if not geometry:
raise ValueError("Feature does not contain a geometry")
geom_shape = shape(geometry)
return to_wkt(geom_shape)

def format_aoi_values(geom_wkt):
return f"('AOI', ST_GeometryFromText('{geom_wkt}'))"

def execute_query(query):
try:
response = requests.post(URL, data=query, headers=HEADERS, auth=AUTH)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error submitting query: {e}")
return None

def poll_query(next_uri):
"""Poll the query's nextUri until results are ready."""
results = []
while next_uri:
try:
response = requests.get(next_uri, headers=HEADERS, auth=AUTH)
response.raise_for_status()
data = response.json()

state = data["stats"]["state"]
print(f"Query state: {state}")

if state == "FINISHED":
if "data" in data:
results.extend(data["data"])
print("Query completed successfully!")
break
elif state in {"FAILED", "CANCELLED"}:
print(f"Query failed or was cancelled: {data}")
break

next_uri = data.get("nextUri")
except requests.exceptions.RequestException as e:
print(f"Error polling query: {e}")
break
time.sleep(1)

return results


geom_wkt = extract_geometry(self.feature)

aoi_values = format_aoi_values(geom_wkt)

query = QUERY_TEMPLATE.format(aoi_values=aoi_values)

initial_response = execute_query(query)
if not initial_response:
return
next_uri = initial_response.get("nextUri")
if not next_uri:
print("No nextUri found. Query might have failed immediately.")
print(initial_response)
return

response = poll_query(next_uri)


# timestamp = response["ratioResult"][0]["timestamp"]
# self.result.timestamp_osm = dateutil.parser.isoparse(timestamp)
self.absolute_value_1 = response[0][0]
self.absolute_value_2 = response[0][1]
self.result.value = self.absolute_value_2 / self.absolute_value_1

def calculate(self) -> None:
# result (ratio) can be NaN if no features matching filter1
Expand Down

0 comments on commit 4d0a3d3

Please sign in to comment.