Skip to content

Commit

Permalink
fixes for release in prefect
Browse files Browse the repository at this point in the history
  • Loading branch information
mdkrol committed Nov 6, 2024
1 parent 634c4cf commit 3d45491
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 59 deletions.
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
prefect<3
pytest
pre-commit
requests
pandas
35 changes: 26 additions & 9 deletions src/flows.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
from prefect import flow
from tasks import uppercase_the_text
from datetime import datetime

from prefect import flow, get_run_logger
from tasks import get_precipitation, post_timeseries

# Constants
uuid_gpm_ts = "fb384ac4-c8e8-452e-83d3-30f23c522577"
uuid_gpm_rast = "f23e174e-0636-45f1-86fd-e543a7aa49ac"


start = datetime(2024, 11, 1, 0, 0, 0)
end = datetime(2024, 11, 6, 0, 0, 0)
coordinates = (74.590958, 42.871773, 0.0)


@flow(
name="Clear name of your flow",
flow_run_name = "kghub_example_task Flow run",
description= "Short description of what the flow does.",
retries=0, # If wanted, place your retries count here,
flow_run_name="kghub_example_task Flow run",
description="Short description of what the flow does.",
retries=0, # If wanted, place your retries count here,
retry_delay_seconds=10,
log_prints=True
log_prints=True,
)
def kghub_example_task_flow(text: str = "Hi"):
uppercase_text = uppercase_the_text(text)
print(f"Turned {text} into {uppercase_text}")
def kghub_example_task_flow():
logger = get_run_logger()

logger.info("Downloading precipitation data from Dutch Lizard")
df = get_precipitation(uuid_gpm_rast, start, end, coordinates)

logger.info("Data downloaded, uploading to KGhub")

post_timeseries(uuid_gpm_ts, df)


if __name__ == "__main__":
Expand Down
13 changes: 5 additions & 8 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@
from prefect import serve

if __name__ == "__main__":

kghub_example_task_deployment = kghub_example_task_flow.to_deployment(
name="Clear name of your deployment",
interval=60, # alternative: cron="5 4 * * *". Check crontab.guru to create the correct schedule expression
parameters={"text": "Python is absolutely fabulous"}, # Input needs to be convertable to json
description="Code found at: https://github.com/nens/prefect-kghub-example-task", # Place this in EACH deployment, so that it its clear which repo serves which deployment
tags=["Your name", "Project keyword"] # Add the name of the first point of contact, and one or more keywords of the project
name="GPM data",
# interval=60, # alternative: cron="5 4 * * *". Check crontab.guru to create the correct schedule expression
description="Code found at: https://github.com/nens/prefect-kghub-example-task", # Place this in EACH deployment, so that it its clear which repo serves which deployment
tags=["KGhub", "example"],
)


serve(kghub_example_task_deployment) # It is possible to create multiple deployments and serve them all.
serve(kghub_example_task_deployment)
73 changes: 31 additions & 42 deletions src/tasks.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,33 @@
from prefect import task, get_run_logger
import requests
import pandas as pd
from datetime import datetime
import json

# Constants
uuid_gpm_ts = "fb384ac4-c8e8-452e-83d3-30f23c522577"
uuid_gpm_rast = "f23e174e-0636-45f1-86fd-e543a7aa49ac"

lizard_api = 'https://weact.lizard.net/api/v4'
kghub_api = 'http://kghub.caiag.kg/api'


lizard_headers = {
"username": "__key__",
"password": "yLMMChiw.5inIEGlTVGsebmhAEQWSyFjoTVXsm2ln",
"Content-Type": "application/json",
}


import pandas as pd
import requests
from prefect import get_run_logger, task
from prefect.blocks.system import Secret

bearer_token = 'testing'
lizard_api = "https://weact.lizard.net/api/v4"
kghub_api = "http://kghub.caiag.kg/api"

kghub_headers = {
'Authorization': f'Bearer {bearer_token}',
'Content-Type': 'application/json' # Specify that the data is in JSON format
}

@task(retries=2)
def get_precipitation(uuid, start, end, coordinates):
lizard_api_key = Secret.load("lizard-api-key").get()

lizard_headers = {
"username": "__key__",
"password": lizard_api_key,
"Content-Type": "application/json",
}

logger = get_run_logger()
url = f"{lizard_api}/rasters/{uuid}/point/"
lat = coordinates[0]
lon = coordinates[1]
params = {"geom": 'POINT (74.590958 42.871773)',
"start": start.isoformat() + "Z",
"stop": end.isoformat() + "Z"}
params = {
"geom": f"POINT ({lat} {lon})",
"start": start.isoformat() + "Z",
"stop": end.isoformat() + "Z",
}

r = requests.get(url, params=params, headers=lizard_headers)
r.raise_for_status()
Expand All @@ -47,24 +39,21 @@ def get_precipitation(uuid, start, end, coordinates):
return df


def post_timeseries(ts_uuid, df_data, json_headers):
@task(retries=2)
def post_timeseries(ts_uuid, df_data):
kghub_token = Secret.load("kghub-token").get()

kghub_headers = {
"Authorization": f"Bearer {kghub_token}",
"Content-Type": "application/json", # Specify that the data is in JSON format
}

events_url = f"{kghub_api}/timeseries/{ts_uuid}/events/"
events = df_data[['time', 'value']].to_dict(orient='records')
print((events))
print(json_headers)
events = df_data[["time", "value"]].to_dict(orient="records")

# POST each event to the server
response = requests.post(
url=events_url,
data=json.dumps(events),
headers=json_headers
url=events_url, data=json.dumps(events), headers=kghub_headers
)

return response.json()


start = datetime(2024,11,1,0,0,0)
end = datetime(2024,11,6,0,0,0)
coordinates = (74.590958, 42.871773, 0.0)

df = get_precipitation(uuid_gpm_rast, start, end, coordinates)
post_timeseries(uuid_gpm_ts, df, kghub_headers)

0 comments on commit 3d45491

Please sign in to comment.