diff --git a/requirements.txt b/requirements.txt index 061088d..7ca552f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ prefect<3 pytest +pre-commit +requests +pandas diff --git a/src/flows.py b/src/flows.py index 9a390ec..d4ed21d 100644 --- a/src/flows.py +++ b/src/flows.py @@ -1,18 +1,35 @@ -from prefect import flow -from tasks import uppercase_the_text +from datetime import datetime + +from prefect import flow, get_run_logger +from tasks import get_precipitation, post_timeseries + +# Constants +uuid_gpm_ts = "fb384ac4-c8e8-452e-83d3-30f23c522577" +uuid_gpm_rast = "f23e174e-0636-45f1-86fd-e543a7aa49ac" + + +start = datetime(2024, 11, 1, 0, 0, 0) +end = datetime(2024, 11, 6, 0, 0, 0) +coordinates = (74.590958, 42.871773, 0.0) @flow( name="Clear name of your flow", - flow_run_name = "kghub_example_task Flow run", - description= "Short description of what the flow does.", - retries=0, # If wanted, place your retries count here, + flow_run_name="kghub_example_task Flow run", + description="Short description of what the flow does.", + retries=0, # If wanted, place your retries count here, retry_delay_seconds=10, - log_prints=True + log_prints=True, ) -def kghub_example_task_flow(text: str = "Hi"): - uppercase_text = uppercase_the_text(text) - print(f"Turned {text} into {uppercase_text}") +def kghub_example_task_flow(): + logger = get_run_logger() + + logger.info("Downloading precipitation data from Dutch Lizard") + df = get_precipitation(uuid_gpm_rast, start, end, coordinates) + + logger.info("Data downloaded, uploading to KGhub") + + post_timeseries(uuid_gpm_ts, df) if __name__ == "__main__": diff --git a/src/server.py b/src/server.py index 4a80f1e..27307dc 100644 --- a/src/server.py +++ b/src/server.py @@ -2,14 +2,11 @@ from prefect import serve if __name__ == "__main__": - kghub_example_task_deployment = kghub_example_task_flow.to_deployment( - name="Clear name of your deployment", - interval=60, # alternative: cron="5 4 * * *". Check crontab.guru to create the correct schedule expression - parameters={"text": "Python is absolutely fabulous"}, # Input needs to be convertable to json - description="Code found at: https://github.com/nens/prefect-kghub-example-task", # Place this in EACH deployment, so that it its clear which repo serves which deployment - tags=["Your name", "Project keyword"] # Add the name of the first point of contact, and one or more keywords of the project + name="GPM data", + # interval=60, # alternative: cron="5 4 * * *". Check crontab.guru to create the correct schedule expression + description="Code found at: https://github.com/nens/prefect-kghub-example-task", # Place this in EACH deployment, so that it its clear which repo serves which deployment + tags=["KGhub", "example"], ) - - serve(kghub_example_task_deployment) # It is possible to create multiple deployments and serve them all. + serve(kghub_example_task_deployment) diff --git a/src/tasks.py b/src/tasks.py index 907770c..91d2d9e 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -1,41 +1,33 @@ -from prefect import task, get_run_logger -import requests -import pandas as pd -from datetime import datetime import json -# Constants -uuid_gpm_ts = "fb384ac4-c8e8-452e-83d3-30f23c522577" -uuid_gpm_rast = "f23e174e-0636-45f1-86fd-e543a7aa49ac" - -lizard_api = 'https://weact.lizard.net/api/v4' -kghub_api = 'http://kghub.caiag.kg/api' - - -lizard_headers = { - "username": "__key__", - "password": "yLMMChiw.5inIEGlTVGsebmhAEQWSyFjoTVXsm2ln", - "Content-Type": "application/json", -} - - +import pandas as pd +import requests +from prefect import get_run_logger, task +from prefect.blocks.system import Secret -bearer_token = 'testing' +lizard_api = "https://weact.lizard.net/api/v4" +kghub_api = "http://kghub.caiag.kg/api" -kghub_headers = { - 'Authorization': f'Bearer {bearer_token}', - 'Content-Type': 'application/json' # Specify that the data is in JSON format -} @task(retries=2) def get_precipitation(uuid, start, end, coordinates): + lizard_api_key = Secret.load("lizard-api-key").get() + + lizard_headers = { + "username": "__key__", + "password": lizard_api_key, + "Content-Type": "application/json", + } + logger = get_run_logger() url = f"{lizard_api}/rasters/{uuid}/point/" lat = coordinates[0] lon = coordinates[1] - params = {"geom": 'POINT (74.590958 42.871773)', - "start": start.isoformat() + "Z", - "stop": end.isoformat() + "Z"} + params = { + "geom": f"POINT ({lat} {lon})", + "start": start.isoformat() + "Z", + "stop": end.isoformat() + "Z", + } r = requests.get(url, params=params, headers=lizard_headers) r.raise_for_status() @@ -47,24 +39,21 @@ def get_precipitation(uuid, start, end, coordinates): return df -def post_timeseries(ts_uuid, df_data, json_headers): +@task(retries=2) +def post_timeseries(ts_uuid, df_data): + kghub_token = Secret.load("kghub-token").get() + + kghub_headers = { + "Authorization": f"Bearer {kghub_token}", + "Content-Type": "application/json", # Specify that the data is in JSON format + } + events_url = f"{kghub_api}/timeseries/{ts_uuid}/events/" - events = df_data[['time', 'value']].to_dict(orient='records') - print((events)) - print(json_headers) + events = df_data[["time", "value"]].to_dict(orient="records") + # POST each event to the server response = requests.post( - url=events_url, - data=json.dumps(events), - headers=json_headers + url=events_url, data=json.dumps(events), headers=kghub_headers ) return response.json() - - -start = datetime(2024,11,1,0,0,0) -end = datetime(2024,11,6,0,0,0) -coordinates = (74.590958, 42.871773, 0.0) - -df = get_precipitation(uuid_gpm_rast, start, end, coordinates) -post_timeseries(uuid_gpm_ts, df, kghub_headers)