Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Floodscan stats #11

Merged
merged 28 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
99b2535
Add initial floodscan config
hannahker Oct 24, 2024
bedea11
Add floodscan processing function
hannahker Oct 24, 2024
54c782c
Generalize handling of 4th dimension in upsampling
hannahker Oct 24, 2024
ee60294
Generalize dimension handling
hannahker Oct 24, 2024
79d37a5
Merge branch 'main' into floodscan-stats
hannahker Oct 24, 2024
8b1a525
Generalize handling of fourth dim in runner function
hannahker Oct 24, 2024
fca4e98
Remove upsampling test with only 2 dimensions
hannahker Oct 24, 2024
ee025ea
Generalize handling of extra dimensions
hannahker Oct 24, 2024
1bdcd09
Adjust floodscan filename
hannahker Oct 25, 2024
fcee0ac
Merge branch 'main' into floodscan-stats
isatotun Nov 6, 2024
75843d5
Merging floodscan-stats and main
isatotun Nov 6, 2024
c0610b0
Merge with main
hannahker Nov 6, 2024
d1ce782
Merge branch 'floodscan-stats' into floodscan-stats-hannah
isatotun Nov 6, 2024
e208fad
Merge pull request #14 from OCHA-DAP/floodscan-stats-hannah
isatotun Nov 6, 2024
dcb53c0
Adding capability to have other types of data for the fourth dimension
isatotun Nov 7, 2024
6bdfd7c
Changing a date for testing purposes on databricks
isatotun Nov 7, 2024
53b6076
Changing the band for testing purposes on databricks
isatotun Nov 7, 2024
00d097e
Changing the band array for testing purposes on databricks
isatotun Nov 7, 2024
170aa5a
Adding changes to be able to retrieve full cogs for floodscan
isatotun Nov 8, 2024
203ded0
Adding coverage flag for floodscan
isatotun Nov 11, 2024
54e27d9
testing by pointing at the whole historical dataset
isatotun Nov 11, 2024
2d819d8
Setting chunksize
isatotun Nov 11, 2024
edb7815
Adding a chunksize parameter
isatotun Nov 11, 2024
a0d6461
Adding mode parameter back
isatotun Nov 12, 2024
1416abd
Pre-commit
isatotun Nov 12, 2024
38f2dec
Adding flag for floodscan coverage in config
isatotun Nov 12, 2024
5b3f774
Addressing code review
isatotun Nov 14, 2024
d229916
Fixing test
isatotun Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions run_raster_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def setup_logger(name, level=logging.INFO):
return logger


def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
process_name = current_process().name
logger = setup_logger(f"{process_name}: {dataset}_{start}")
logger.info(f"Starting processing for {dataset} from {start} to {end}")
Expand All @@ -56,8 +56,13 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
try:
for _, row in df_iso3s.iterrows():
iso3 = row["iso3"]
# shp_url = row["o_shp"]
max_adm = row["max_adm_level"]

# Coverage check for specific datasets
if dataset in df_iso3s.keys():
if not row[dataset]:
logger.info(f"Skipping {iso3}...")
continue
logger.info(f"Processing data for {iso3}...")

with tempfile.TemporaryDirectory() as td:
Expand Down Expand Up @@ -95,6 +100,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
con=engine,
if_exists="append",
index=False,
chunksize=chunksize,
method=postgres_upsert,
)
except Exception as e:
Expand Down Expand Up @@ -130,10 +136,10 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
logger.info(f"Updating data for {dataset}...")

create_qa_table(engine)
start, end, is_forecast, sel_iso3s = parse_pipeline_config(
start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config(
dataset, args.test, args.update_stats, args.mode
)
create_dataset_table(dataset, engine, is_forecast)
create_dataset_table(dataset, engine, is_forecast, extra_dims)

df_iso3s = get_iso3_data(sel_iso3s, engine)
date_ranges = split_date_range(start, end)
Expand All @@ -145,7 +151,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):
)

process_args = [
(start, end, dataset, args.mode, df_iso3s, engine_url)
(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for start, end in date_ranges
]

Expand All @@ -154,6 +160,8 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url):

else:
logger.info("Processing entire date range in a single chunk")
process_chunk(start, end, dataset, args.mode, df_iso3s, engine_url)
process_chunk(
start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize
)

logger.info("Done calculating and saving stats.")
11 changes: 11 additions & 0 deletions src/config/floodscan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
blob_prefix: floodscan/daily/v5/processed/aer_area_300s_
start_date: 1998-01-12
end_date: Null
forecast: False
extra_dims:
- band : str
test:
start_date: 2023-12-01
end_date: 2024-01-31
iso3s: ["ETH"]
coverage: ["DZA", "AGO", "BEN", "BWA", "BFA", "BDI", "CPV", "CMR", "CAF", "TCD", "COM", "COG", "CIV", "CAP", "DJI", "EGY", "GNQ", "ERI", "SWZ", "ETH", "GAB", "GMB", "GHA", "GIN", "GNB", "KEN", "LS0", "LBR", "LBY", "MDG", "MWI", "MLI", "MRT", "MUS", "MAR", "MOZ", "NAM", "NER", "NGA", "RWA", "STP", "SEN", "SYC", "SLE", "SOM", "ZAF", "SSD", "SDN", "TGO", "TUN", "UGA", "TZA", "ZMB", "ZWE"]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a nice way to do this!

4 changes: 2 additions & 2 deletions src/config/imerg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ start_date: 2000-06-01
end_date: Null
forecast: False
test:
start_date: 2000-06-01
end_date: 2024-10-01
start_date: 2020-01-01
end_date: 2020-01-15
iso3s: ["ETH"]
4 changes: 3 additions & 1 deletion src/config/seas5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ blob_prefix: seas5/monthly/processed/precip_em_i
start_date: 1981-01-01
end_date: Null
forecast: True
extra_dims:
- leadtime : int
test:
start_date: 2024-01-01
end_date: 2024-10-01
end_date: 2024-02-01
iso3s: ["AFG"]
5 changes: 3 additions & 2 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import yaml
from dotenv import load_dotenv

from src.utils.general_utils import get_most_recent_date
from src.utils.general_utils import get_most_recent_date, parse_extra_dims

load_dotenv()

Expand Down Expand Up @@ -37,10 +37,11 @@ def parse_pipeline_config(dataset, test, update, mode):
end_date = config["end_date"]
sel_iso3s = None
forecast = config["forecast"]
extra_dims = parse_extra_dims(config.get("extra_dims"))
if not end_date:
end_date = date.today() - timedelta(days=1)
if update:
last_update = get_most_recent_date(mode, config["blob_prefix"])
start_date = last_update
end_date = last_update
return start_date, end_date, forecast, sel_iso3s
return start_date, end_date, forecast, sel_iso3s, extra_dims
26 changes: 24 additions & 2 deletions src/utils/cog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ def process_seas5(cog_name, mode):
return da_in


def process_floodscan(cog_name, mode):
cog_url = get_cog_url(mode, cog_name)
da_in = rxr.open_rasterio(cog_url, chunks="auto")

year_valid = da_in.attrs["year_valid"]
month_valid = str(da_in.attrs["month_valid"]).zfill(2)
date_valid = str(da_in.attrs["date_valid"]).zfill(2)
date_in = f"{year_valid}-{month_valid}-{date_valid}"

da_in = da_in.squeeze(drop=True)
da_in["date"] = date_in
da_in = da_in.expand_dims(["date"])

da_in = da_in.persist()
return da_in


def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
"""
Stack Cloud Optimized GeoTIFFs (COGs) for a specified date range into an xarray Dataset.
Expand All @@ -124,7 +141,7 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
end_date : str or datetime-like
The end date of the date range for stacking the COGs. This can be a string or a datetime object.
dataset : str, optional
The name of the dataset to retrieve COGs from. Options include "era5", "imerg", and "seas5".
The name of the dataset to retrieve COGs from. Options include "floodscan", "era5", "imerg", and "seas5".
Default is "era5".
mode : str, optional
The environment mode to use when accessing the cloud storage container. May be "dev", "prod", or "local".
Expand All @@ -149,13 +166,16 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
config = load_pipeline_config(dataset)
prefix = config["blob_prefix"]
except Exception:
logger.error("Input `dataset` must be one of `era5`, `seas5`, or `imerg`.")
logger.error(
"Input `dataset` must be one of `floodscan`, `era5`, `seas5`, or `imerg`."
)

cogs_list = [
x.name
for x in container_client.list_blobs(name_starts_with=prefix)
if (parse_date(x.name) >= start_date) & (parse_date(x.name) <= end_date) # noqa
]

if len(cogs_list) == 0:
raise Exception("No COGs found to process")

Expand All @@ -171,6 +191,8 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"):
da_in = process_seas5(cog, mode)
elif dataset == "imerg":
da_in = process_imerg(cog, mode)
elif dataset == "floodscan":
da_in = process_floodscan(cog, mode)
das.append(da_in)

# Note that we're dropping all attributes here
Expand Down
13 changes: 10 additions & 3 deletions src/utils/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def db_engine_url(mode):
return DATABASES[mode]


def create_dataset_table(dataset, engine, is_forecast=False):
def create_dataset_table(dataset, engine, is_forecast=False, extra_dims={}):
"""
Create a table for storing dataset statistics in the database.

Expand All @@ -48,11 +48,16 @@ def create_dataset_table(dataset, engine, is_forecast=False):
is_forecast : Bool
Whether or not the dataset is a forecast. Will include `leadtime` and
`issued_date` columns if so.
extra_dims : dict
Dictionary where the keys are names of any extra dimensions that need to be added to the
dataset table and the values are the type.

Returns
-------
None
"""
if extra_dims is None:
extra_dims = {}
metadata = MetaData()
columns = [
Column("iso3", CHAR(3)),
Expand All @@ -71,8 +76,9 @@ def create_dataset_table(dataset, engine, is_forecast=False):
unique_constraint_columns = ["valid_date", "pcode"]
if is_forecast:
columns.insert(3, Column("issued_date", Date))
columns.insert(4, Column("leadtime", Integer))
unique_constraint_columns.append("leadtime")
for idx, dim in enumerate(extra_dims):
columns.insert(idx + 4, Column(dim, extra_dims[dim]))
unique_constraint_columns.append(dim)

Table(
f"{dataset}",
Expand Down Expand Up @@ -127,6 +133,7 @@ def create_iso3_table(engine):
Column("max_adm_level", Integer),
Column("stats_last_updated", Date),
Column("shp_url", String),
Column("floodscan", Boolean),
)
metadata.create_all(engine)

Expand Down
13 changes: 13 additions & 0 deletions src/utils/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
from dateutil.relativedelta import relativedelta
from sqlalchemy import VARCHAR, Integer

from src.utils.cloud_utils import get_container_client

Expand Down Expand Up @@ -110,3 +111,15 @@ def parse_date(filename):
"""
res = re.search("([0-9]{4}-[0-9]{2}-[0-9]{2})", filename)
return pd.to_datetime(res[0])


def parse_extra_dims(extra_dims):
parsed_extra_dims = {}
for extra_dim in extra_dims:
dim = next(iter(extra_dim))
if extra_dim[dim] == "str":
parsed_extra_dims[dim] = VARCHAR
else:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we'll probably want to have other data types, but can leave as is for now

parsed_extra_dims[dim] = Integer

return parsed_extra_dims
8 changes: 7 additions & 1 deletion src/utils/inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def cli_args():
parser.add_argument(
"dataset",
help="Dataset for which to calculate raster stats",
choices=["seas5", "era5", "imerg"],
choices=["seas5", "era5", "imerg", "floodscan"],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also as discussed, in the future it'd be good to think about how we can avoid the hard-coding of each dataset (since this happens in numerous places throughout the code). Although really more of a note for myself since I set it up this way...

default=None,
nargs="?",
)
Expand All @@ -36,4 +36,10 @@ def cli_args():
help="Update the iso3 and polygon metadata tables.",
action="store_true",
)
parser.add_argument(
"--chunksize",
help="Limit the SQL insert batches to an specific chunksize.",
type=int,
default=100000,
)
return parser.parse_args()
19 changes: 19 additions & 0 deletions src/utils/iso3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from sqlalchemy import text

from src.config.settings import load_pipeline_config
from src.utils.cloud_utils import get_container_client
from src.utils.database_utils import create_iso3_table

Expand Down Expand Up @@ -144,6 +145,19 @@ def determine_max_adm_level(row):
return min(1, row["src_lvl"])


def load_coverage():
pipelines = ["seas5", "era5", "imerg", "floodscan"]
coverage = {}

for dataset in pipelines:
config = load_pipeline_config(dataset)
if "coverage" in config:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be worth adding at least a comment in here to remind ourselves that we're assuming that the coverage is specified in ISO3 values.

dataset_coverage = config["coverage"]
coverage[dataset] = dataset_coverage

return coverage


def create_iso3_df(engine):
"""
Create and populate an ISO3 table in the database with country information.
Expand Down Expand Up @@ -178,6 +192,8 @@ def create_iso3_df(engine):
)
& (df_hrp["endDate"] >= current_date) # noqa
]
dataset_coverage = load_coverage()

iso3_codes = set()
for locations in df_active_hrp["locations"]:
iso3_codes.update(locations.split("|"))
Expand All @@ -187,6 +203,9 @@ def create_iso3_df(engine):
df["max_adm_level"] = df.apply(determine_max_adm_level, axis=1)
df["stats_last_updated"] = None

for dataset in dataset_coverage:
df[dataset] = df["iso_3"].isin(dataset_coverage[dataset])

# TODO: This list seems to have some inconsistencies when compared against the
# contents of all polygons
# Also need global p-codes list from https://fieldmaps.io/data/cod
Expand Down
Loading
Loading