Skip to content

Commit

Permalink
Merge pull request #11 from OCHA-DAP/floodscan-stats
Browse files Browse the repository at this point in the history
Floodscan stats
  • Loading branch information
isatotun authored Nov 15, 2024
2 parents 48084ab + d229916 commit 8c2c576
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 42 deletions.
20 changes: 14 additions & 6 deletions run_raster_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def setup_logger(name, level=logging.INFO):
return logger


def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
process_name = current_process().name
logger = setup_logger(f"{process_name}: {dataset}_{start}")
logger.info(f"Starting processing for {dataset} from {start} to {end}")
Expand All @@ -56,8 +56,13 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
try:
for _, row in df_iso3s.iterrows():
iso3 = row["iso3"]
# shp_url = row["o_shp"]
max_adm = row["max_adm_level"]

# Coverage check for specific datasets
if dataset in df_iso3s.keys():
if not row[dataset]:
logger.info(f"Skipping {iso3}...")
continue
logger.info(f"Processing data for {iso3}...")

with tempfile.TemporaryDirectory() as td:
Expand Down Expand Up @@ -95,6 +100,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
con=engine,
if_exists="append",
index=False,
chunksize=chunksize,
method=postgres_upsert,
)
except Exception as e:
Expand Down Expand Up @@ -130,10 +136,10 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
logger.info(f"Updating data for {dataset}...")

create_qa_table(engine)
start, end, is_forecast, sel_iso3s = parse_pipeline_config(
start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config(
dataset, args.test, args.update_stats, args.mode
)
create_dataset_table(dataset, engine, is_forecast)
create_dataset_table(dataset, engine, is_forecast, extra_dims)

df_iso3s = get_iso3_data(sel_iso3s, engine)
date_ranges = split_date_range(start, end)
Expand All @@ -145,7 +151,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
)

process_args = [
(start, end, dataset, args.mode, df_iso3s, engine_url)
(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for start, end in date_ranges
]

Expand All @@ -154,6 +160,8 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):

else:
logger.info("Processing entire date range in a single chunk")
process_chunk(start, end, dataset, args.mode, df_iso3s, engine_url)
process_chunk(
start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize
)

logger.info("Done calculating and saving stats.")
11 changes: 11 additions & 0 deletions src/config/floodscan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
blob_prefix: floodscan/daily/v5/processed/aer_area_300s_
start_date: 1998-01-12
end_date: Null
forecast: False
extra_dims:
- band : str
test:
start_date: 2023-12-01
end_date: 2024-01-31
iso3s: ["ETH"]
coverage: ["DZA", "AGO", "BEN", "BWA", "BFA", "BDI", "CPV", "CMR", "CAF", "TCD", "COM", "COG", "CIV", "CAP", "DJI", "EGY", "GNQ", "ERI", "SWZ", "ETH", "GAB", "GMB", "GHA", "GIN", "GNB", "KEN", "LS0", "LBR", "LBY", "MDG", "MWI", "MLI", "MRT", "MUS", "MAR", "MOZ", "NAM", "NER", "NGA", "RWA", "STP", "SEN", "SYC", "SLE", "SOM", "ZAF", "SSD", "SDN", "TGO", "TUN", "UGA", "TZA", "ZMB", "ZWE"]
4 changes: 2 additions & 2 deletions src/config/imerg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ start_date: 2000-06-01
end_date: Null
forecast: False
test:
start_date: 2000-06-01
end_date: 2024-10-01
start_date: 2020-01-01
end_date: 2020-01-15
iso3s: ["ETH"]
4 changes: 3 additions & 1 deletion src/config/seas5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ blob_prefix: seas5/monthly/processed/precip_em_i
start_date: 1981-01-01
end_date: Null
forecast: True
extra_dims:
- leadtime : int
test:
start_date: 2024-01-01
end_date: 2024-10-01
end_date: 2024-02-01
iso3s: ["AFG"]
5 changes: 3 additions & 2 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import yaml
from dotenv import load_dotenv

from src.utils.general_utils import get_most_recent_date
from src.utils.general_utils import get_most_recent_date, parse_extra_dims

load_dotenv()

Expand Down Expand Up @@ -37,10 +37,11 @@ def parse_pipeline_config(dataset, test, update, mode):
end_date = config["end_date"]
sel_iso3s = None
forecast = config["forecast"]
extra_dims = parse_extra_dims(config.get("extra_dims"))
if not end_date:
end_date = date.today() - timedelta(days=1)
if update:
last_update = get_most_recent_date(mode, config["blob_prefix"])
start_date = last_update
end_date = last_update
return start_date, end_date, forecast, sel_iso3s
return start_date, end_date, forecast, sel_iso3s, extra_dims
26 changes: 24 additions & 2 deletions src/utils/cog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ def process_seas5(cog_name, mode):
return da_in


def process_floodscan(cog_name, mode):
cog_url = get_cog_url(mode, cog_name)
da_in = rxr.open_rasterio(cog_url, chunks="auto")

year_valid = da_in.attrs["year_valid"]
month_valid = str(da_in.attrs["month_valid"]).zfill(2)
date_valid = str(da_in.attrs["date_valid"]).zfill(2)
date_in = f"{year_valid}-{month_valid}-{date_valid}"

da_in = da_in.squeeze(drop=True)
da_in["date"] = date_in
da_in = da_in.expand_dims(["date"])

da_in = da_in.persist()
return da_in


def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
"""
Stack Cloud Optimized GeoTIFFs (COGs) for a specified date range into an xarray Dataset.
Expand All @@ -124,7 +141,7 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
end_date : str or datetime-like
The end date of the date range for stacking the COGs. This can be a string or a datetime object.
dataset : str, optional
The name of the dataset to retrieve COGs from. Options include "era5", "imerg", and "seas5".
The name of the dataset to retrieve COGs from. Options include "floodscan", "era5", "imerg", and "seas5".
Default is "era5".
mode : str, optional
The environment mode to use when accessing the cloud storage container. May be "dev", "prod", or "local".
Expand All @@ -149,13 +166,16 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
config = load_pipeline_config(dataset)
prefix = config["blob_prefix"]
except Exception:
logger.error("Input `dataset` must be one of `era5`, `seas5`, or `imerg`.")
logger.error(
"Input `dataset` must be one of `floodscan`, `era5`, `seas5`, or `imerg`."
)

cogs_list = [
x.name
for x in container_client.list_blobs(name_starts_with=prefix)
if (parse_date(x.name) >= start_date) & (parse_date(x.name) <= end_date) # noqa
]

if len(cogs_list) == 0:
raise Exception("No COGs found to process")

Expand All @@ -171,6 +191,8 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
da_in = process_seas5(cog, mode)
elif dataset == "imerg":
da_in = process_imerg(cog, mode)
elif dataset == "floodscan":
da_in = process_floodscan(cog, mode)
das.append(da_in)

# Note that we're dropping all attributes here
Expand Down
13 changes: 10 additions & 3 deletions src/utils/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def db_engine_url(mode):
return DATABASES[mode]


def create_dataset_table(dataset, engine, is_forecast=False):
def create_dataset_table(dataset, engine, is_forecast=False, extra_dims={}):
"""
Create a table for storing dataset statistics in the database.
Expand All @@ -48,11 +48,16 @@ def create_dataset_table(dataset, engine, is_forecast=False):
is_forecast : Bool
Whether or not the dataset is a forecast. Will include `leadtime` and
`issued_date` columns if so.
extra_dims : dict
Dictionary where the keys are names of any extra dimensions that need to be added to the
dataset table and the values are the type.
Returns
-------
None
"""
if extra_dims is None:
extra_dims = {}
metadata = MetaData()
columns = [
Column("iso3", CHAR(3)),
Expand All @@ -71,8 +76,9 @@ def create_dataset_table(dataset, engine, is_forecast=False):
unique_constraint_columns = ["valid_date", "pcode"]
if is_forecast:
columns.insert(3, Column("issued_date", Date))
columns.insert(4, Column("leadtime", Integer))
unique_constraint_columns.append("leadtime")
for idx, dim in enumerate(extra_dims):
columns.insert(idx + 4, Column(dim, extra_dims[dim]))
unique_constraint_columns.append(dim)

Table(
f"{dataset}",
Expand Down Expand Up @@ -127,6 +133,7 @@ def create_iso3_table(engine):
Column("max_adm_level", Integer),
Column("stats_last_updated", Date),
Column("shp_url", String),
Column("floodscan", Boolean),
)
metadata.create_all(engine)

Expand Down
13 changes: 13 additions & 0 deletions src/utils/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
from dateutil.relativedelta import relativedelta
from sqlalchemy import VARCHAR, Integer

from src.utils.cloud_utils import get_container_client

Expand Down Expand Up @@ -110,3 +111,15 @@ def parse_date(filename):
"""
res = re.search("([0-9]{4}-[0-9]{2}-[0-9]{2})", filename)
return pd.to_datetime(res[0])


def parse_extra_dims(extra_dims):
parsed_extra_dims = {}
for extra_dim in extra_dims:
dim = next(iter(extra_dim))
if extra_dim[dim] == "str":
parsed_extra_dims[dim] = VARCHAR
else:
parsed_extra_dims[dim] = Integer

return parsed_extra_dims
8 changes: 7 additions & 1 deletion src/utils/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def cli_args():
parser.add_argument(
"dataset",
help="Dataset for which to calculate raster stats",
choices=["seas5", "era5", "imerg"],
choices=["seas5", "era5", "imerg", "floodscan"],
default=None,
nargs="?",
)
Expand All @@ -36,4 +36,10 @@ def cli_args():
help="Update the iso3 and polygon metadata tables.",
action="store_true",
)
parser.add_argument(
"--chunksize",
help="Limit the SQL insert batches to an specific chunksize.",
type=int,
default=100000,
)
return parser.parse_args()
19 changes: 19 additions & 0 deletions src/utils/iso3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from sqlalchemy import text

from src.config.settings import load_pipeline_config
from src.utils.cloud_utils import get_container_client
from src.utils.database_utils import create_iso3_table

Expand Down Expand Up @@ -144,6 +145,19 @@ def determine_max_adm_level(row):
return min(1, row["src_lvl"])


def load_coverage():
pipelines = ["seas5", "era5", "imerg", "floodscan"]
coverage = {}

for dataset in pipelines:
config = load_pipeline_config(dataset)
if "coverage" in config:
dataset_coverage = config["coverage"]
coverage[dataset] = dataset_coverage

return coverage


def create_iso3_df(engine):
"""
Create and populate an ISO3 table in the database with country information.
Expand Down Expand Up @@ -178,6 +192,8 @@ def create_iso3_df(engine):
)
& (df_hrp["endDate"] >= current_date) # noqa
]
dataset_coverage = load_coverage()

iso3_codes = set()
for locations in df_active_hrp["locations"]:
iso3_codes.update(locations.split("|"))
Expand All @@ -187,6 +203,9 @@ def create_iso3_df(engine):
df["max_adm_level"] = df.apply(determine_max_adm_level, axis=1)
df["stats_last_updated"] = None

for dataset in dataset_coverage:
df[dataset] = df["iso_3"].isin(dataset_coverage[dataset])

# TODO: This list seems to have some inconsistencies when compared against the
# contents of all polygons
# Also need global p-codes list from https://fieldmaps.io/data/cod
Expand Down
Loading

0 comments on commit 8c2c576

Please sign in to comment.