From 99b25355d8aba9244cf12f678688b3d7188e21ef Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 10:49:32 -0400 Subject: [PATCH 01/23] Add initial floodscan config --- src/config/floodscan.yml | 8 ++++++++ src/utils/cog_utils.py | 8 ++++++-- src/utils/inputs.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) create mode 100644 src/config/floodscan.yml diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml new file mode 100644 index 0000000..70729c6 --- /dev/null +++ b/src/config/floodscan.yml @@ -0,0 +1,8 @@ +blob_prefix: floodscan/daily/v5/processed/aer__area_300s_ +start_date: 1998-01-12 +end_date: Null +forecast: False +test: + start_date: 2023-12-01 + end_date: 2023-12-05 + iso3s: ["ETH"] diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index a7a7a96..e821705 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -21,8 +21,10 @@ def parse_date(filename, dataset): date = pd.to_datetime(filename[-14:-4]) elif dataset == "seas5": date = pd.to_datetime(filename[-18:-8]) + elif dataset == "floodscan": + date = pd.to_datetime(filename[-21:-11]) else: - raise Exception("Input `dataset` must be one of: imerg, era5, seas5") + raise Exception("Input `dataset` must be one of: floodscan, imerg, era5, seas5") return pd.to_datetime(date) @@ -161,7 +163,9 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): config = load_pipeline_config(dataset) prefix = config["blob_prefix"] except Exception: - logger.error("Input `dataset` must be one of `era5`, `seas5`, or `imerg`.") + logger.error( + "Input `dataset` must be one of `floodscan`, `era5`, `seas5`, or `imerg`." + ) cogs_list = [ x.name diff --git a/src/utils/inputs.py b/src/utils/inputs.py index 0ab8585..38480cc 100644 --- a/src/utils/inputs.py +++ b/src/utils/inputs.py @@ -9,7 +9,7 @@ def cli_args(): parser.add_argument( "dataset", help="Dataset for which to calculate raster stats", - choices=["seas5", "era5", "imerg"], + choices=["seas5", "era5", "imerg", "floodscan"], default=None, ) parser.add_argument( From bedea1180dad26e1ddc8b55fc7ffc0c3bb67999d Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 13:15:47 -0400 Subject: [PATCH 02/23] Add floodscan processing function --- src/utils/cog_utils.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index e821705..29d8355 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -123,6 +123,23 @@ def process_seas5(cog_name, mode): return da_in +def process_floodscan(cog_name, mode): + cog_url = get_cog_url(mode, cog_name) + da_in = rxr.open_rasterio(cog_url, chunks="auto") + + year_valid = da_in.attrs["year_valid"] + month_valid = str(da_in.attrs["month_valid"]).zfill(2) + date_valid = cog_name[-13:-11] # TODO: Change once attr is updated correctly + date_in = f"{year_valid}-{month_valid}-{date_valid}" + + da_in = da_in.squeeze(drop=True) + da_in["date"] = date_in + da_in = da_in.expand_dims(["date"]) + + da_in = da_in.persist() + return da_in + + def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): """ Stack Cloud Optimized GeoTIFFs (COGs) for a specified date range into an xarray Dataset. @@ -138,7 +155,7 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): end_date : str or datetime-like The end date of the date range for stacking the COGs. This can be a string or a datetime object. dataset : str, optional - The name of the dataset to retrieve COGs from. Options include "era5", "imerg", and "seas5". + The name of the dataset to retrieve COGs from. Options include "floodscan", "era5", "imerg", and "seas5". Default is "era5". mode : str, optional The environment mode to use when accessing the cloud storage container. May be "dev", "prod", or "local". @@ -189,8 +206,11 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): da_in = process_seas5(cog, mode) elif dataset == "imerg": da_in = process_imerg(cog, mode) + elif dataset == "floodscan": + da_in = process_floodscan(cog, mode) das.append(da_in) # Note that we're dropping all attributes here - ds = xr.combine_by_coords(das, combine_attrs="drop") + # ds = xr.combine_by_coords(das, combine_attrs="drop") + ds = xr.concat(das, dim="date") return ds From 54c782c20f924080c92b7bba638f690a2a757b1f Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 15:16:06 -0400 Subject: [PATCH 03/23] Generalize handling of 4th dimension in upsampling --- src/utils/raster_utils.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index 30dfa88..ff83743 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -212,7 +212,8 @@ def upsample_raster(ds, resampled_resolution=0.05, logger=None): Parameters ---------- ds : xarray.Dataset - The raster data set to upsample. Must not have >4 dimensions. + The raster data set to upsample. Must have dimensions 'x', 'y', and 'date', + with an optional fourth dimension (e.g., 'band' or 'leadtime'). resampled_resolution : float, optional The desired resolution for the upsampled raster. Default is 0.05. @@ -221,11 +222,16 @@ def upsample_raster(ds, resampled_resolution=0.05, logger=None): xarray.Dataset The upsampled raster as a Dataset with the new resolution. """ - if logger is None: logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) + # Verify required dimensions + required_dims = {"x", "y", "date"} + missing_dims = required_dims - set(ds.dims) + if missing_dims: + raise ValueError(f"Dataset missing required dimensions: {missing_dims}") + # Assuming square resolution input_resolution = ds.rio.resolution()[0] upscale_factor = input_resolution / resampled_resolution @@ -247,31 +253,32 @@ def upsample_raster(ds, resampled_resolution=0.05, logger=None): ) ds = ds.rio.write_crs("EPSG:4326") - # Forecast data will have 4 dims, since we have a leadtime - nd = len(list(ds.dims)) - if nd == 4: + # Get the fourth dimension if it exists (not x, y, or date) + dims = list(ds.dims) + fourth_dim = next((dim for dim in dims if dim not in {"x", "y", "date"}), None) + + if fourth_dim: # 4D case resampled_arrays = [] - for lt in ds.leadtime.values: - ds_ = ds.sel(leadtime=lt) + for val in ds[fourth_dim].values: + ds_ = ds.sel({fourth_dim: val}) ds_ = ds_.rio.reproject( ds_.rio.crs, - shape=(ds_.rio.height * 2, ds_.rio.width * 2), + shape=(new_height, new_width), resampling=Resampling.nearest, nodata=np.nan, ) - ds_ = ds_.expand_dims(["leadtime"]) + # Expand along the fourth dimension + ds_ = ds_.expand_dims([fourth_dim]) resampled_arrays.append(ds_) ds_resampled = xr.combine_by_coords(resampled_arrays, combine_attrs="drop") - elif (nd == 2) or (nd == 3): + else: # 3D case (x, y, date) ds_resampled = ds.rio.reproject( ds.rio.crs, shape=(new_height, new_width), resampling=Resampling.nearest, nodata=np.nan, ) - else: - raise Exception("Input Dataset must have 2, 3, or 4 dimensions.") return ds_resampled From ee6029424d6ca519a775faacd9666b794d464217 Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 15:29:50 -0400 Subject: [PATCH 04/23] Generalize dimension handling --- run_raster_stats.py | 1 - src/utils/raster_utils.py | 21 ++++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index ee17e61..4ba9c06 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -55,7 +55,6 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): try: for _, row in df_iso3s.iterrows(): iso3 = row["iso3"] - # shp_url = row["o_shp"] max_adm = row["max_adm_level"] logger.info(f"Processing data for {iso3}...") diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index ff83743..c2f0cc1 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -16,6 +16,17 @@ coloredlogs.install(level=LOG_LEVEL, logger=logger) +def validate_dimensions(ds): + required_dims = {"x", "y", "date"} + missing_dims = required_dims - set(ds.dims) + if missing_dims: + raise ValueError(f"Dataset missing required dimensions: {missing_dims}") + # Get the fourth dimension if it exists (not x, y, or date) + dims = list(ds.dims) + fourth_dim = next((dim for dim in dims if dim not in {"x", "y", "date"}), None) + return fourth_dim + + def fast_zonal_stats_runner( ds, gdf, @@ -226,11 +237,7 @@ def upsample_raster(ds, resampled_resolution=0.05, logger=None): logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) - # Verify required dimensions - required_dims = {"x", "y", "date"} - missing_dims = required_dims - set(ds.dims) - if missing_dims: - raise ValueError(f"Dataset missing required dimensions: {missing_dims}") + fourth_dim = validate_dimensions(ds) # Assuming square resolution input_resolution = ds.rio.resolution()[0] @@ -253,10 +260,6 @@ def upsample_raster(ds, resampled_resolution=0.05, logger=None): ) ds = ds.rio.write_crs("EPSG:4326") - # Get the fourth dimension if it exists (not x, y, or date) - dims = list(ds.dims) - fourth_dim = next((dim for dim in dims if dim not in {"x", "y", "date"}), None) - if fourth_dim: # 4D case resampled_arrays = [] for val in ds[fourth_dim].values: From 8b1a525a18161a0e613b7ed3497eb6275fc683a1 Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 16:38:49 -0400 Subject: [PATCH 05/23] Generalize handling of fourth dim in runner function --- src/utils/cog_utils.py | 3 +-- src/utils/raster_utils.py | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index 29d8355..907173d 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -211,6 +211,5 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): das.append(da_in) # Note that we're dropping all attributes here - # ds = xr.combine_by_coords(das, combine_attrs="drop") - ds = xr.concat(das, dim="date") + ds = xr.combine_by_coords(das, combine_attrs="drop") return ds diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index c2f0cc1..9d7785c 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -46,7 +46,8 @@ def fast_zonal_stats_runner( Parameters ---------- ds : xarray.Dataset - The input raster dataset. Should have the following dimensions: `x`, `y`, `date`, `leadtime` (optional). + The input raster dataset. Must have dimensions 'x', 'y', and 'date', + with an optional fourth dimension (e.g., 'band' or 'leadtime'). gdf : geopandas.GeoDataFrame A GeoDataFrame containing the administrative boundaries. adm_level : int @@ -76,7 +77,8 @@ def fast_zonal_stats_runner( logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) - # TODO: Pre-compute and save + fourth_dim = validate_dimensions(ds) + # Rasterize the adm bounds src_transform = ds.rio.transform() src_width = ds.rio.width @@ -88,14 +90,14 @@ def fast_zonal_stats_runner( n_adms = len(adm_ids) outputs = [] - # TODO: Can this be vectorized further? for date in ds.date.values: logger.debug(f"Calculating for {date}...") ds_sel = ds.sel(date=date) - if "leadtime" in ds_sel.dims: - for lt in ds_sel.leadtime.values: - ds__ = ds_sel.sel(leadtime=lt) - # Some leadtime/date combos are invalid and so don't have any data + + if fourth_dim: # 4D case + for val in ds_sel[fourth_dim].values: + ds__ = ds_sel.sel({fourth_dim: val}) + # Skip if all values are NaN if bool(np.all(np.isnan(ds__.values))): continue results = fast_zonal_stats( @@ -103,12 +105,14 @@ def fast_zonal_stats_runner( ) for i, result in enumerate(results): result["valid_date"] = date - result["issued_date"] = add_months_to_date(date, -lt) + # Special handling for leadtime dimension + if fourth_dim == "leadtime": + result["issued_date"] = add_months_to_date(date, -val) result["pcode"] = adm_ids[i] result["adm_level"] = adm_level - result["leadtime"] = lt + result[fourth_dim] = val # Store the fourth dimension value outputs.extend(results) - else: + else: # 3D case results = fast_zonal_stats( ds_sel.values, admin_raster, n_adms, stats=stats, rast_fill=rast_fill ) From fca4e9879ea092af20624d5237816e9e26ccc4b1 Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 16:42:55 -0400 Subject: [PATCH 06/23] Remove upsampling test with only 2 dimensions --- tests/test_raster_stats.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/tests/test_raster_stats.py b/tests/test_raster_stats.py index 81ecf0b..fbfa87c 100644 --- a/tests/test_raster_stats.py +++ b/tests/test_raster_stats.py @@ -288,19 +288,6 @@ def dataset_with_leadtime(dataset_with_time): return ds -def test_basic_upsampling(simple_dataset): - """Test basic upsampling of a 2D dataset.""" - target_res = 0.5 # Upsample from 1.0 to 0.5 degrees - result = upsample_raster(simple_dataset, resampled_resolution=target_res) - - # Check output resolution - assert result.rio.resolution()[0] == target_res - - # Check output dimensions doubled (since resolution halved) - assert result.rio.width == simple_dataset.rio.width * 2 - assert result.rio.height == simple_dataset.rio.height * 2 - - def test_upsampling_with_time(dataset_with_time): """Test upsampling of a 3D dataset with time dimension.""" target_res = 0.5 From ee025ea822f98bdd3dbabd11c29506af901d56ea Mon Sep 17 00:00:00 2001 From: hannahker Date: Thu, 24 Oct 2024 16:59:05 -0400 Subject: [PATCH 07/23] Generalize handling of extra dimensions --- run_raster_stats.py | 4 ++-- src/config/floodscan.yml | 2 ++ src/config/imerg.yml | 4 ++-- src/config/seas5.yml | 8 +++++--- src/config/settings.py | 3 ++- src/utils/database_utils.py | 8 +++++--- 6 files changed, 18 insertions(+), 11 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index 4ba9c06..6a13465 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -117,8 +117,8 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): create_qa_table(engine) settings = load_pipeline_config(dataset) - start, end, is_forecast = parse_pipeline_config(settings, args.test) - create_dataset_table(dataset, engine, is_forecast) + start, end, is_forecast, extra_dims = parse_pipeline_config(settings, args.test) + create_dataset_table(dataset, engine, is_forecast, extra_dims) if args.build_iso3: logger.info("Creating ISO3 table in Postgres database...") create_iso3_df(engine) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index 70729c6..fcaa537 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -2,6 +2,8 @@ blob_prefix: floodscan/daily/v5/processed/aer__area_300s_ start_date: 1998-01-12 end_date: Null forecast: False +extra_dims: + - band test: start_date: 2023-12-01 end_date: 2023-12-05 diff --git a/src/config/imerg.yml b/src/config/imerg.yml index 99836ca..eeee069 100644 --- a/src/config/imerg.yml +++ b/src/config/imerg.yml @@ -3,6 +3,6 @@ start_date: 2000-06-01 end_date: 2024-10-30 forecast: False test: - start_date: 2000-06-01 - end_date: 2024-10-01 + start_date: 2020-01-01 + end_date: 2020-01-15 iso3s: ["ETH"] diff --git a/src/config/seas5.yml b/src/config/seas5.yml index 08acb8e..cb727b0 100644 --- a/src/config/seas5.yml +++ b/src/config/seas5.yml @@ -2,7 +2,9 @@ blob_prefix: seas5/monthly/processed/precip_em_i start_date: 1981-01-01 end_date: 2024-10-30 forecast: True +extra_dims: + - leadtime test: - start_date: 2012-01-01 - end_date: 2024-10-01 - iso3s: [] + start_date: 2024-01-01 + end_date: 2024-02-01 + iso3s: ["AFG"] diff --git a/src/config/settings.py b/src/config/settings.py index 2538726..de35422 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -31,4 +31,5 @@ def parse_pipeline_config(config, test): start_date = config["start_date"] end_date = config["end_date"] forecast = config["forecast"] - return start_date, end_date, forecast + extra_dims = config.get("extra_dims", []) + return start_date, end_date, forecast, extra_dims diff --git a/src/utils/database_utils.py b/src/utils/database_utils.py index b4d97a5..6ebebd0 100644 --- a/src/utils/database_utils.py +++ b/src/utils/database_utils.py @@ -35,7 +35,7 @@ def db_engine_url(mode): return DATABASES[mode] -def create_dataset_table(dataset, engine, is_forecast=False): +def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): """ Create a table for storing dataset statistics in the database. @@ -71,8 +71,10 @@ def create_dataset_table(dataset, engine, is_forecast=False): unique_constraint_columns = ["valid_date", "pcode"] if is_forecast: columns.insert(3, Column("issued_date", Date)) - columns.insert(4, Column("leadtime", Integer)) - unique_constraint_columns.append("leadtime") + for idx, dim in enumerate(extra_dims): + # TODO: Support non-integer columns + columns.insert(idx + 4, Column(dim, Integer)) + unique_constraint_columns.append(dim) Table( f"{dataset}", From 1bdcd09a45fc172b24255e3efa0cff31024028aa Mon Sep 17 00:00:00 2001 From: hannahker Date: Fri, 25 Oct 2024 12:49:47 -0400 Subject: [PATCH 08/23] Adjust floodscan filename --- src/config/floodscan.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index fcaa537..ea78805 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -1,4 +1,4 @@ -blob_prefix: floodscan/daily/v5/processed/aer__area_300s_ +blob_prefix: floodscan/daily/v5/processed/aer_area_300s_ start_date: 1998-01-12 end_date: Null forecast: False @@ -6,5 +6,5 @@ extra_dims: - band test: start_date: 2023-12-01 - end_date: 2023-12-05 + end_date: 2024-01-31 iso3s: ["ETH"] From 75843d52ccb0a9a62b911f569279f5cdec0704eb Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Wed, 6 Nov 2024 18:03:00 +0000 Subject: [PATCH 09/23] Merging floodscan-stats and main --- run_raster_stats.py | 4 +- src/utils/cog_utils.py | 17 +------- src/utils/database_utils.py | 7 ++- src/utils/raster_utils.py | 85 ++++++++++++++++++------------------- 4 files changed, 51 insertions(+), 62 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index ed3e69d..6053f4d 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -130,10 +130,10 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): logger.info(f"Updating data for {dataset}...") create_qa_table(engine) - start, end, is_forecast, sel_iso3s = parse_pipeline_config( + start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config( dataset, args.test, args.update_stats, args.mode ) - create_dataset_table(dataset, engine, is_forecast) + create_dataset_table(dataset, engine, is_forecast, extra_dims) df_iso3s = get_iso3_data(sel_iso3s, engine) date_ranges = split_date_range(start, end) diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index f5d272c..13ffdc8 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -14,19 +14,6 @@ coloredlogs.install(level="DEBUG", logger=logger) -def parse_date(filename, dataset): - """ - Parses the date based on a COG filename. - """ - if (dataset == "era5") or (dataset == "imerg"): - date = pd.to_datetime(filename[-14:-4]) - elif dataset == "seas5": - date = pd.to_datetime(filename[-18:-8]) - else: - raise Exception("Input `dataset` must be one of: imerg, era5, seas5") - return pd.to_datetime(date) - - # TODO: Update now that IMERG data has the right .attrs metadata def process_imerg(cog_name, mode): """ @@ -184,8 +171,8 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): cogs_list = [ x.name for x in container_client.list_blobs(name_starts_with=prefix) - if (parse_date(x.name, dataset) >= start_date) - & (parse_date(x.name, dataset) <= end_date) # noqa + if (parse_date(x.name) >= start_date) + & (parse_date(x.name) <= end_date) # noqa ] if len(cogs_list) == 0: diff --git a/src/utils/database_utils.py b/src/utils/database_utils.py index 577fda6..a1d0605 100644 --- a/src/utils/database_utils.py +++ b/src/utils/database_utils.py @@ -35,7 +35,7 @@ def db_engine_url(mode): return DATABASES[mode] -def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): +def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=None): """ Create a table for storing dataset statistics in the database. @@ -48,11 +48,14 @@ def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): is_forecast : Bool Whether or not the dataset is a forecast. Will include `leadtime` and `issued_date` columns if so. - + extra_dims: List + List containing the name of the extra dimensions Returns ------- None """ + if extra_dims is None: + extra_dims = [] metadata = MetaData() columns = [ Column("iso3", CHAR(3)), diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index bc05fbd..086cc43 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -16,17 +16,6 @@ coloredlogs.install(level=LOG_LEVEL, logger=logger) -def validate_dimensions(ds): - required_dims = {"x", "y", "date"} - missing_dims = required_dims - set(ds.dims) - if missing_dims: - raise ValueError(f"Dataset missing required dimensions: {missing_dims}") - # Get the fourth dimension if it exists (not x, y, or date) - dims = list(ds.dims) - fourth_dim = next((dim for dim in dims if dim not in {"x", "y", "date"}), None) - return fourth_dim - - def fast_zonal_stats_runner( ds, gdf, @@ -46,8 +35,7 @@ def fast_zonal_stats_runner( Parameters ---------- ds : xarray.Dataset - The input raster dataset. Must have dimensions 'x', 'y', and 'date', - with an optional fourth dimension (e.g., 'band' or 'leadtime'). + The input raster dataset. Should have the following dimensions: `x`, `y`, `date`, `leadtime` (optional). gdf : geopandas.GeoDataFrame A GeoDataFrame containing the administrative boundaries. adm_level : int @@ -77,8 +65,7 @@ def fast_zonal_stats_runner( logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) - fourth_dim = validate_dimensions(ds) - + # TODO: Pre-compute and save # Rasterize the adm bounds src_transform = ds.rio.transform() src_width = ds.rio.width @@ -90,14 +77,14 @@ def fast_zonal_stats_runner( n_adms = len(adm_ids) outputs = [] + # TODO: Can this be vectorized further? for date in ds.date.values: logger.debug(f"Calculating for {date}...") ds_sel = ds.sel(date=date) - - if fourth_dim: # 4D case - for val in ds_sel[fourth_dim].values: - ds__ = ds_sel.sel({fourth_dim: val}) - # Skip if all values are NaN + if "leadtime" in ds_sel.dims: + for lt in ds_sel.leadtime.values: + ds__ = ds_sel.sel(leadtime=lt) + # Some leadtime/date combos are invalid and so don't have any data if bool(np.all(np.isnan(ds__.values))): continue results = fast_zonal_stats( @@ -105,14 +92,12 @@ def fast_zonal_stats_runner( ) for i, result in enumerate(results): result["valid_date"] = date - # Special handling for leadtime dimension - if fourth_dim == "leadtime": - result["issued_date"] = add_months_to_date(date, -val) + result["issued_date"] = add_months_to_date(date, -lt) result["pcode"] = adm_ids[i] result["adm_level"] = adm_level - result[fourth_dim] = val # Store the fourth dimension value + result["leadtime"] = lt outputs.extend(results) - else: # 3D case + else: results = fast_zonal_stats( ds_sel.values, admin_raster, n_adms, stats=stats, rast_fill=rast_fill ) @@ -230,8 +215,7 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): Parameters ---------- ds : xarray.Dataset - The raster data set to upsample. Must have dimensions 'x', 'y', and 'date', - with an optional fourth dimension (e.g., 'band' or 'leadtime'). + The raster data set to upsample. Must not have >4 dimensions. resampled_resolution : float, optional The desired resolution for the upsampled raster. @@ -244,8 +228,6 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) - fourth_dim = validate_dimensions(ds) - # Assuming square resolution input_resolution = ds.rio.resolution()[0] upscale_factor = input_resolution / resampled_resolution @@ -267,28 +249,45 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): ) ds = ds.rio.write_crs("EPSG:4326") - if fourth_dim: # 4D case + # Forecast data will have 4 dims, since we have a leadtime + nd = len(list(ds.dims)) + if nd == 4: resampled_arrays = [] - for val in ds[fourth_dim].values: - ds_ = ds.sel({fourth_dim: val}) - ds_ = ds_.rio.reproject( - ds_.rio.crs, - shape=(new_height, new_width), - resampling=Resampling.nearest, - nodata=np.nan, - ) - # Expand along the fourth dimension - ds_ = ds_.expand_dims([fourth_dim]) - resampled_arrays.append(ds_) + + # TODO: fix the rioxarray.exceptions.NoDataInBounds: Unable to determine bounds from coordinates. here + if 'band' in ds.dims: + for band in ds.band.values: + ds_ = ds.sel(band=band) + ds_ = ds_.rio.reproject( + ds_.rio.crs, + shape=(new_height, new_width), + resampling=Resampling.nearest, + nodata=np.nan, + ) + ds_ = ds_.expand_dims(["band"]) + resampled_arrays.append(ds_) + else: + for lt in ds.leadtime.values: + ds_ = ds.sel(leadtime=lt) + ds_ = ds_.rio.reproject( + ds_.rio.crs, + shape=(new_height, new_width), + resampling=Resampling.nearest, + nodata=np.nan, + ) + ds_ = ds_.expand_dims(["leadtime"]) + resampled_arrays.append(ds_) ds_resampled = xr.combine_by_coords(resampled_arrays, combine_attrs="drop") - else: # 3D case (x, y, date) + elif (nd == 2) or (nd == 3): ds_resampled = ds.rio.reproject( ds.rio.crs, shape=(new_height, new_width), resampling=Resampling.nearest, nodata=np.nan, ) + else: + raise Exception("Input Dataset must have 2, 3, or 4 dimensions.") return ds_resampled @@ -368,4 +367,4 @@ def rasterize_admin( fill=rast_fill, all_touched=all_touched, ) - return admin_raster + return admin_raster \ No newline at end of file From dcb53c01b19e2c3e29bea77a813bbb86a9b74c22 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 7 Nov 2024 15:49:05 +0000 Subject: [PATCH 10/23] Adding capability to have other types of data for the fourth dimension --- src/config/floodscan.yml | 2 +- src/config/seas5.yml | 2 +- src/config/settings.py | 16 +++++++++++++++- src/utils/database_utils.py | 13 ++++++------- src/utils/raster_utils.py | 6 +++++- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index ea78805..600409f 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -3,7 +3,7 @@ start_date: 1998-01-12 end_date: Null forecast: False extra_dims: - - band + - band : str test: start_date: 2023-12-01 end_date: 2024-01-31 diff --git a/src/config/seas5.yml b/src/config/seas5.yml index 77baa31..fc0b7c4 100644 --- a/src/config/seas5.yml +++ b/src/config/seas5.yml @@ -3,7 +3,7 @@ start_date: 1981-01-01 end_date: Null forecast: True extra_dims: - - leadtime + - leadtime : int test: start_date: 2024-01-01 end_date: 2024-02-01 diff --git a/src/config/settings.py b/src/config/settings.py index d4f169d..e7eb533 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -3,6 +3,7 @@ import yaml from dotenv import load_dotenv +from sqlalchemy import Integer, VARCHAR from src.utils.general_utils import get_most_recent_date @@ -25,6 +26,19 @@ def load_pipeline_config(pipeline_name): config = yaml.safe_load(config_file) return config + # TODO shift this to some utils? +def parse_extra_dims(extra_dims): + + parsed_extra_dims = {} + for extra_dim in extra_dims: + dim = next(iter(extra_dim)) + if extra_dim[dim] == 'str': + parsed_extra_dims[dim] = VARCHAR + else: + parsed_extra_dims[dim] = Integer + + return parsed_extra_dims + def parse_pipeline_config(dataset, test, update, mode): config = load_pipeline_config(dataset) @@ -37,7 +51,7 @@ def parse_pipeline_config(dataset, test, update, mode): end_date = config["end_date"] sel_iso3s = None forecast = config["forecast"] - extra_dims = config.get("extra_dims", []) + extra_dims = parse_extra_dims(config.get("extra_dims")) if not end_date: end_date = date.today() - timedelta(days=1) if update: diff --git a/src/utils/database_utils.py b/src/utils/database_utils.py index 3b148c2..2e1c026 100644 --- a/src/utils/database_utils.py +++ b/src/utils/database_utils.py @@ -35,7 +35,7 @@ def db_engine_url(mode): return DATABASES[mode] -def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): +def create_dataset_table(dataset, engine, is_forecast=False, extra_dims={}): """ Create a table for storing dataset statistics in the database. @@ -48,16 +48,16 @@ def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): is_forecast : Bool Whether or not the dataset is a forecast. Will include `leadtime` and `issued_date` columns if so. - extra_dims : List - A list of the names of any extra dimensions that need to be added to the - dataset table. + extra_dims : dict + Dictionary where the keys are names of any extra dimensions that need to be added to the + dataset table and the values are the type. Returns ------- None """ if extra_dims is None: - extra_dims = [] + extra_dims = {} metadata = MetaData() columns = [ Column("iso3", CHAR(3)), @@ -77,8 +77,7 @@ def create_dataset_table(dataset, engine, is_forecast=False, extra_dims=[]): if is_forecast: columns.insert(3, Column("issued_date", Date)) for idx, dim in enumerate(extra_dims): - # TODO: Support non-integer columns - columns.insert(idx + 4, Column(dim, Integer)) + columns.insert(idx + 4, Column(dim, extra_dims[dim])) unique_constraint_columns.append(dim) Table( diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index d0c86d0..7255ef3 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -279,7 +279,11 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): nodata=np.nan, ) # Expand along the fourth dimension - ds_ = ds_.expand_dims([fourth_dim]) + if fourth_dim == "band": + # Falls under different bands, use the long_name instead of integer value + ds_ = ds_.expand_dims({'band': [ds_.long_name[val-1]]}) + else: + ds_ = ds_.expand_dims([fourth_dim]) resampled_arrays.append(ds_) ds_resampled = xr.combine_by_coords(resampled_arrays, combine_attrs="drop") From 6bdfd7c05d9c8f4094d996b78d3c29bfd19435b7 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 7 Nov 2024 17:00:18 +0000 Subject: [PATCH 11/23] Changing a date for testing purposes on databricks --- src/config/floodscan.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index 600409f..6637b42 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -1,5 +1,5 @@ blob_prefix: floodscan/daily/v5/processed/aer_area_300s_ -start_date: 1998-01-12 +start_date: 2023-01-01 end_date: Null forecast: False extra_dims: From 53b6076ef851c5bbe1f78dff61204bf593ad0792 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 7 Nov 2024 17:29:51 +0000 Subject: [PATCH 12/23] Changing the band for testing purposes on databricks --- src/utils/raster_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index 7255ef3..f5a9553 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -281,7 +281,11 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): # Expand along the fourth dimension if fourth_dim == "band": # Falls under different bands, use the long_name instead of integer value - ds_ = ds_.expand_dims({'band': [ds_.long_name[val-1]]}) + #ds_ = ds_.expand_dims({'band': [ds_.long_name[val-1]]}) + if int(ds_['band']) ==1 : + ds_ = ds_.expand_dims({'band': 'SFED'}) + else: + ds_ = ds_.expand_dims({'band': 'MFED'}) else: ds_ = ds_.expand_dims([fourth_dim]) resampled_arrays.append(ds_) From 00d097ee5075565161b84d2ee75397596afd1b08 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 7 Nov 2024 17:38:15 +0000 Subject: [PATCH 13/23] Changing the band array for testing purposes on databricks --- src/utils/raster_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index f5a9553..f285862 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -283,9 +283,9 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): # Falls under different bands, use the long_name instead of integer value #ds_ = ds_.expand_dims({'band': [ds_.long_name[val-1]]}) if int(ds_['band']) ==1 : - ds_ = ds_.expand_dims({'band': 'SFED'}) + ds_ = ds_.expand_dims({'band': ['SFED']}) else: - ds_ = ds_.expand_dims({'band': 'MFED'}) + ds_ = ds_.expand_dims({'band': ['MFED']}) else: ds_ = ds_.expand_dims([fourth_dim]) resampled_arrays.append(ds_) From 170aa5a359f2f0ee5b4e2c3a6b913d4407a782d4 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Fri, 8 Nov 2024 13:55:25 +0000 Subject: [PATCH 14/23] Adding changes to be able to retrieve full cogs for floodscan --- src/config/floodscan.yml | 2 +- src/utils/cloud_utils.py | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index 6637b42..600409f 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -1,5 +1,5 @@ blob_prefix: floodscan/daily/v5/processed/aer_area_300s_ -start_date: 2023-01-01 +start_date: 1998-01-12 end_date: Null forecast: False extra_dims: diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index 9ad7ce2..6e923ed 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -24,7 +24,7 @@ def get_container_client(mode, container_name): azure.storage.blob.ContainerClient A `ContainerClient` object that can be used to interact with the specified Azure Blob Storage container - """ + blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") blob_url = ( f"https://imb0chd0{mode}.blob.core.windows.net/" @@ -32,6 +32,14 @@ def get_container_client(mode, container_name): + "?" # noqa + blob_sas # noqa ) + """ + blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") + blob_url = ( + f"https://imb0chd0prod.blob.core.windows.net/" + + container_name # noqa + + "?" # noqa + + blob_sas # noqa + ) return ContainerClient.from_container_url(blob_url) @@ -55,5 +63,7 @@ def get_cog_url(mode, cog_name): """ if mode == "local": return "test_outputs/" + cog_name - blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") - return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" + #blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") + #return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" + blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") + return f"https://imb0chd0prod.blob.core.windows.net/raster/{cog_name}?{blob_sas}" From 203ded08f897cc821717e1b93209dbe1a403fc5d Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Mon, 11 Nov 2024 12:32:13 +0000 Subject: [PATCH 15/23] Adding coverage flag for floodscan --- run_raster_stats.py | 6 ++++++ src/utils/cloud_utils.py | 19 ++++++++++++++----- src/utils/database_utils.py | 1 + src/utils/iso3_utils.py | 5 +++++ 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index 8a76bfe..44dce1a 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -57,6 +57,12 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): for _, row in df_iso3s.iterrows(): iso3 = row["iso3"] max_adm = row["max_adm_level"] + + # Coverage check for specific datasets + if dataset in df_iso3s.keys() : + if not row[dataset]: + logger.info(f"Skipping {iso3}...") + continue logger.info(f"Processing data for {iso3}...") with tempfile.TemporaryDirectory() as td: diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index 6e923ed..ee2a3b1 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -32,10 +32,19 @@ def get_container_client(mode, container_name): + "?" # noqa + blob_sas # noqa ) + + blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") + blob_url = ( + f"https://imb0chd0{mode}.blob.core.windows.net/" + + container_name # noqa + + "?" # noqa + + blob_sas # noqa + ) + TODO add back """ - blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") + blob_sas = os.getenv(f"DSCI_AZ_SAS_DEV") blob_url = ( - f"https://imb0chd0prod.blob.core.windows.net/" + f"https://imb0chd0dev.blob.core.windows.net/" + container_name # noqa + "?" # noqa + blob_sas # noqa @@ -63,7 +72,7 @@ def get_cog_url(mode, cog_name): """ if mode == "local": return "test_outputs/" + cog_name - #blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") + #TODO add back blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") #return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" - blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") - return f"https://imb0chd0prod.blob.core.windows.net/raster/{cog_name}?{blob_sas}" + blob_sas = os.getenv(f"DSCI_AZ_SAS_DEV") + return f"https://imb0chd0dev.blob.core.windows.net/raster/{cog_name}?{blob_sas}" diff --git a/src/utils/database_utils.py b/src/utils/database_utils.py index 2e1c026..11ad9d2 100644 --- a/src/utils/database_utils.py +++ b/src/utils/database_utils.py @@ -133,6 +133,7 @@ def create_iso3_table(engine): Column("max_adm_level", Integer), Column("stats_last_updated", Date), Column("shp_url", String), + Column("floodscan", Boolean), ) metadata.create_all(engine) diff --git a/src/utils/iso3_utils.py b/src/utils/iso3_utils.py index a7989a0..2aebb4a 100644 --- a/src/utils/iso3_utils.py +++ b/src/utils/iso3_utils.py @@ -178,6 +178,10 @@ def create_iso3_df(engine): ) & (df_hrp["endDate"] >= current_date) # noqa ] + + # TODO add info on how to retrieve this file + floodscan_iso3_coverage = pd.read_csv("data/floodscan_iso3s.csv")['iso3'].tolist() + iso3_codes = set() for locations in df_active_hrp["locations"]: iso3_codes.update(locations.split("|")) @@ -186,6 +190,7 @@ def create_iso3_df(engine): df["has_active_hrp"] = df["iso_3"].isin(iso3_codes) df["max_adm_level"] = df.apply(determine_max_adm_level, axis=1) df["stats_last_updated"] = None + df["floodscan"] = df["iso_3"].isin(floodscan_iso3_coverage) # TODO: This list seems to have some inconsistencies when compared against the # contents of all polygons From 54e27d9c079bc6765509b98d705018fa51c8ac03 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Mon, 11 Nov 2024 13:01:00 +0000 Subject: [PATCH 16/23] testing by pointing at the whole historical dataset --- src/utils/cloud_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index ee2a3b1..5c4c620 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -42,9 +42,9 @@ def get_container_client(mode, container_name): ) TODO add back """ - blob_sas = os.getenv(f"DSCI_AZ_SAS_DEV") + blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") blob_url = ( - f"https://imb0chd0dev.blob.core.windows.net/" + f"https://imb0chd0prod.blob.core.windows.net/" + container_name # noqa + "?" # noqa + blob_sas # noqa @@ -74,5 +74,5 @@ def get_cog_url(mode, cog_name): return "test_outputs/" + cog_name #TODO add back blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") #return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" - blob_sas = os.getenv(f"DSCI_AZ_SAS_DEV") - return f"https://imb0chd0dev.blob.core.windows.net/raster/{cog_name}?{blob_sas}" + blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") + return f"https://imb0chd0prod.blob.core.windows.net/raster/{cog_name}?{blob_sas}" From 2d819d81c7c35d9ddbf384d8baea1454bb88ac4e Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Mon, 11 Nov 2024 17:41:04 +0000 Subject: [PATCH 17/23] Setting chunksize --- run_raster_stats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/run_raster_stats.py b/run_raster_stats.py index 44dce1a..f776783 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -100,6 +100,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): con=engine, if_exists="append", index=False, + chunksize=1000, method=postgres_upsert, ) except Exception as e: From edb7815041d500572ce52f8ecf983ac13e5195f4 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Mon, 11 Nov 2024 18:16:25 +0000 Subject: [PATCH 18/23] Adding a chunksize parameter --- run_raster_stats.py | 8 ++++---- src/utils/inputs.py | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index f776783..6595506 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -45,7 +45,7 @@ def setup_logger(name, level=logging.INFO): return logger -def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): +def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): process_name = current_process().name logger = setup_logger(f"{process_name}: {dataset}_{start}") logger.info(f"Starting processing for {dataset} from {start} to {end}") @@ -100,7 +100,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): con=engine, if_exists="append", index=False, - chunksize=1000, + chunksize=chunksize, method=postgres_upsert, ) except Exception as e: @@ -151,7 +151,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): ) process_args = [ - (start, end, dataset, args.mode, df_iso3s, engine_url) + (start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize) for start, end in date_ranges ] @@ -160,6 +160,6 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url): else: logger.info("Processing entire date range in a single chunk") - process_chunk(start, end, dataset, args.mode, df_iso3s, engine_url) + process_chunk(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize) logger.info("Done calculating and saving stats.") diff --git a/src/utils/inputs.py b/src/utils/inputs.py index 03a59f0..3e41b26 100644 --- a/src/utils/inputs.py +++ b/src/utils/inputs.py @@ -36,4 +36,10 @@ def cli_args(): help="Update the iso3 and polygon metadata tables.", action="store_true", ) + parser.add_argument( + "--chunksize", + help="Limit the SQL insert batches to an specific chunksize.", + type=int, + default=100000, + ) return parser.parse_args() From a0d64610d7ea49b32c79945c06b16a6da5dc877d Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Tue, 12 Nov 2024 13:52:06 +0000 Subject: [PATCH 19/23] Adding mode parameter back --- src/utils/cloud_utils.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index 5c4c620..f5b513d 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -32,7 +32,7 @@ def get_container_client(mode, container_name): + "?" # noqa + blob_sas # noqa ) - + """ blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") blob_url = ( f"https://imb0chd0{mode}.blob.core.windows.net/" @@ -40,15 +40,6 @@ def get_container_client(mode, container_name): + "?" # noqa + blob_sas # noqa ) - TODO add back - """ - blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") - blob_url = ( - f"https://imb0chd0prod.blob.core.windows.net/" - + container_name # noqa - + "?" # noqa - + blob_sas # noqa - ) return ContainerClient.from_container_url(blob_url) @@ -72,7 +63,5 @@ def get_cog_url(mode, cog_name): """ if mode == "local": return "test_outputs/" + cog_name - #TODO add back blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") - #return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" - blob_sas = os.getenv(f"DSCI_AZ_SAS_PROD") - return f"https://imb0chd0prod.blob.core.windows.net/raster/{cog_name}?{blob_sas}" + blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") + return f"https://imb0chd0{mode}.blob.core.windows.net/raster/{cog_name}?{blob_sas}" From 1416abdf20411fbd1547257b95298cada0466d14 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Tue, 12 Nov 2024 13:59:52 +0000 Subject: [PATCH 20/23] Pre-commit --- run_raster_stats.py | 6 ++++-- src/config/settings.py | 8 ++++---- src/utils/iso3_utils.py | 2 +- src/utils/raster_utils.py | 9 ++++----- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index 6595506..312faf6 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -59,7 +59,7 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): max_adm = row["max_adm_level"] # Coverage check for specific datasets - if dataset in df_iso3s.keys() : + if dataset in df_iso3s.keys(): if not row[dataset]: logger.info(f"Skipping {iso3}...") continue @@ -160,6 +160,8 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): else: logger.info("Processing entire date range in a single chunk") - process_chunk(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize) + process_chunk( + start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize + ) logger.info("Done calculating and saving stats.") diff --git a/src/config/settings.py b/src/config/settings.py index e7eb533..e11ad87 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -3,7 +3,7 @@ import yaml from dotenv import load_dotenv -from sqlalchemy import Integer, VARCHAR +from sqlalchemy import VARCHAR, Integer from src.utils.general_utils import get_most_recent_date @@ -26,13 +26,13 @@ def load_pipeline_config(pipeline_name): config = yaml.safe_load(config_file) return config - # TODO shift this to some utils? -def parse_extra_dims(extra_dims): +# TODO shift this to some utils? +def parse_extra_dims(extra_dims): parsed_extra_dims = {} for extra_dim in extra_dims: dim = next(iter(extra_dim)) - if extra_dim[dim] == 'str': + if extra_dim[dim] == "str": parsed_extra_dims[dim] = VARCHAR else: parsed_extra_dims[dim] = Integer diff --git a/src/utils/iso3_utils.py b/src/utils/iso3_utils.py index 2aebb4a..c3363b9 100644 --- a/src/utils/iso3_utils.py +++ b/src/utils/iso3_utils.py @@ -180,7 +180,7 @@ def create_iso3_df(engine): ] # TODO add info on how to retrieve this file - floodscan_iso3_coverage = pd.read_csv("data/floodscan_iso3s.csv")['iso3'].tolist() + floodscan_iso3_coverage = pd.read_csv("data/floodscan_iso3s.csv")["iso3"].tolist() iso3_codes = set() for locations in df_active_hrp["locations"]: diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index f285862..47e75dd 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -281,11 +281,10 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): # Expand along the fourth dimension if fourth_dim == "band": # Falls under different bands, use the long_name instead of integer value - #ds_ = ds_.expand_dims({'band': [ds_.long_name[val-1]]}) - if int(ds_['band']) ==1 : - ds_ = ds_.expand_dims({'band': ['SFED']}) + if int(ds_["band"]) == 1: + ds_ = ds_.expand_dims({"band": ["SFED"]}) else: - ds_ = ds_.expand_dims({'band': ['MFED']}) + ds_ = ds_.expand_dims({"band": ["MFED"]}) else: ds_ = ds_.expand_dims([fourth_dim]) resampled_arrays.append(ds_) @@ -377,4 +376,4 @@ def rasterize_admin( fill=rast_fill, all_touched=all_touched, ) - return admin_raster \ No newline at end of file + return admin_raster From 38f2decfd913c75ed6d19429f9221273d962c5c7 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Tue, 12 Nov 2024 16:44:15 +0000 Subject: [PATCH 21/23] Adding flag for floodscan coverage in config --- src/config/floodscan.yml | 1 + src/utils/iso3_utils.py | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index 600409f..6acd7b4 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -8,3 +8,4 @@ test: start_date: 2023-12-01 end_date: 2024-01-31 iso3s: ["ETH"] +coverage: ["DZA", "AGO", "BEN", "BWA", "BFA", "BDI", "CPV", "CMR", "CAF", "TCD", "COM", "COG", "CIV", "CAP", "DJI", "EGY", "GNQ", "ERI", "SWZ", "ETH", "GAB", "GMB", "GHA", "GIN", "GNB", "KEN", "LS0", "LBR", "LBY", "MDG", "MWI", "MLI", "MRT", "MUS", "MAR", "MOZ", "NAM", "NER", "NGA", "RWA", "STP", "SEN", "SYC", "SLE", "SOM", "ZAF", "SSD", "SDN", "TGO", "TUN", "UGA", "TZA", "ZMB", "ZWE"] diff --git a/src/utils/iso3_utils.py b/src/utils/iso3_utils.py index c3363b9..95af203 100644 --- a/src/utils/iso3_utils.py +++ b/src/utils/iso3_utils.py @@ -8,6 +8,7 @@ import requests from sqlalchemy import text +from src.config.settings import load_pipeline_config from src.utils.cloud_utils import get_container_client from src.utils.database_utils import create_iso3_table @@ -144,6 +145,19 @@ def determine_max_adm_level(row): return min(1, row["src_lvl"]) +def load_coverage(): + pipelines = ["seas5", "era5", "imerg", "floodscan"] + coverage = {} + + for dataset in pipelines: + config = load_pipeline_config(dataset) + if "coverage" in config: + dataset_coverage = config["coverage"] + coverage[dataset] = dataset_coverage + + return coverage + + def create_iso3_df(engine): """ Create and populate an ISO3 table in the database with country information. @@ -178,9 +192,7 @@ def create_iso3_df(engine): ) & (df_hrp["endDate"] >= current_date) # noqa ] - - # TODO add info on how to retrieve this file - floodscan_iso3_coverage = pd.read_csv("data/floodscan_iso3s.csv")["iso3"].tolist() + dataset_coverage = load_coverage() iso3_codes = set() for locations in df_active_hrp["locations"]: @@ -190,7 +202,9 @@ def create_iso3_df(engine): df["has_active_hrp"] = df["iso_3"].isin(iso3_codes) df["max_adm_level"] = df.apply(determine_max_adm_level, axis=1) df["stats_last_updated"] = None - df["floodscan"] = df["iso_3"].isin(floodscan_iso3_coverage) + + for dataset in dataset_coverage: + df[dataset] = df["iso_3"].isin(dataset_coverage[dataset]) # TODO: This list seems to have some inconsistencies when compared against the # contents of all polygons From 5b3f774ef9f67ac68339e4585d07f09f902a9b10 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 14 Nov 2024 15:42:36 +0000 Subject: [PATCH 22/23] Addressing code review --- src/config/settings.py | 16 +--------------- src/utils/cloud_utils.py | 10 +--------- src/utils/cog_utils.py | 5 ++--- src/utils/general_utils.py | 13 +++++++++++++ src/utils/raster_utils.py | 2 +- tests/test_raster_stats.py | 13 +++++++++++++ 6 files changed, 31 insertions(+), 28 deletions(-) diff --git a/src/config/settings.py b/src/config/settings.py index e11ad87..978e2d7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -3,9 +3,8 @@ import yaml from dotenv import load_dotenv -from sqlalchemy import VARCHAR, Integer -from src.utils.general_utils import get_most_recent_date +from src.utils.general_utils import get_most_recent_date, parse_extra_dims load_dotenv() @@ -27,19 +26,6 @@ def load_pipeline_config(pipeline_name): return config -# TODO shift this to some utils? -def parse_extra_dims(extra_dims): - parsed_extra_dims = {} - for extra_dim in extra_dims: - dim = next(iter(extra_dim)) - if extra_dim[dim] == "str": - parsed_extra_dims[dim] = VARCHAR - else: - parsed_extra_dims[dim] = Integer - - return parsed_extra_dims - - def parse_pipeline_config(dataset, test, update, mode): config = load_pipeline_config(dataset) if test: diff --git a/src/utils/cloud_utils.py b/src/utils/cloud_utils.py index f5b513d..9ad7ce2 100644 --- a/src/utils/cloud_utils.py +++ b/src/utils/cloud_utils.py @@ -24,7 +24,7 @@ def get_container_client(mode, container_name): azure.storage.blob.ContainerClient A `ContainerClient` object that can be used to interact with the specified Azure Blob Storage container - + """ blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") blob_url = ( f"https://imb0chd0{mode}.blob.core.windows.net/" @@ -32,14 +32,6 @@ def get_container_client(mode, container_name): + "?" # noqa + blob_sas # noqa ) - """ - blob_sas = os.getenv(f"DSCI_AZ_SAS_{mode.upper()}") - blob_url = ( - f"https://imb0chd0{mode}.blob.core.windows.net/" - + container_name # noqa - + "?" # noqa - + blob_sas # noqa - ) return ContainerClient.from_container_url(blob_url) diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index cd63ec6..4f28c44 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -115,7 +115,7 @@ def process_floodscan(cog_name, mode): year_valid = da_in.attrs["year_valid"] month_valid = str(da_in.attrs["month_valid"]).zfill(2) - date_valid = cog_name[-13:-11] # TODO: Change once attr is updated correctly + date_valid = str(da_in.attrs["date_valid"]).zfill(2) date_in = f"{year_valid}-{month_valid}-{date_valid}" da_in = da_in.squeeze(drop=True) @@ -173,8 +173,7 @@ def stack_cogs(start_date, end_date, dataset="era5", mode="dev"): cogs_list = [ x.name for x in container_client.list_blobs(name_starts_with=prefix) - if (parse_date(x.name) >= start_date) - & (parse_date(x.name) <= end_date) # noqa + if (parse_date(x.name) >= start_date) & (parse_date(x.name) <= end_date) # noqa ] if len(cogs_list) == 0: diff --git a/src/utils/general_utils.py b/src/utils/general_utils.py index e8df317..2f0dbca 100644 --- a/src/utils/general_utils.py +++ b/src/utils/general_utils.py @@ -3,6 +3,7 @@ import pandas as pd from dateutil.relativedelta import relativedelta +from sqlalchemy import VARCHAR, Integer from src.utils.cloud_utils import get_container_client @@ -110,3 +111,15 @@ def parse_date(filename): """ res = re.search("([0-9]{4}-[0-9]{2}-[0-9]{2})", filename) return pd.to_datetime(res[0]) + + +def parse_extra_dims(extra_dims): + parsed_extra_dims = {} + for extra_dim in extra_dims: + dim = next(iter(extra_dim)) + if extra_dim[dim] == "str": + parsed_extra_dims[dim] = VARCHAR + else: + parsed_extra_dims[dim] = Integer + + return parsed_extra_dims diff --git a/src/utils/raster_utils.py b/src/utils/raster_utils.py index 47e75dd..e651539 100644 --- a/src/utils/raster_utils.py +++ b/src/utils/raster_utils.py @@ -269,7 +269,7 @@ def upsample_raster(ds, resampled_resolution=UPSAMPLED_RESOLUTION, logger=None): if fourth_dim: # 4D case resampled_arrays = [] - + for val in ds[fourth_dim].values: ds_ = ds.sel({fourth_dim: val}) ds_ = ds_.rio.reproject( diff --git a/tests/test_raster_stats.py b/tests/test_raster_stats.py index fbfa87c..81ecf0b 100644 --- a/tests/test_raster_stats.py +++ b/tests/test_raster_stats.py @@ -288,6 +288,19 @@ def dataset_with_leadtime(dataset_with_time): return ds +def test_basic_upsampling(simple_dataset): + """Test basic upsampling of a 2D dataset.""" + target_res = 0.5 # Upsample from 1.0 to 0.5 degrees + result = upsample_raster(simple_dataset, resampled_resolution=target_res) + + # Check output resolution + assert result.rio.resolution()[0] == target_res + + # Check output dimensions doubled (since resolution halved) + assert result.rio.width == simple_dataset.rio.width * 2 + assert result.rio.height == simple_dataset.rio.height * 2 + + def test_upsampling_with_time(dataset_with_time): """Test upsampling of a 3D dataset with time dimension.""" target_res = 0.5 From d2299166b8c6fa5e9d8c8ec952a84e04bbea8f87 Mon Sep 17 00:00:00 2001 From: Isa Tot Date: Thu, 14 Nov 2024 16:01:25 +0000 Subject: [PATCH 23/23] Fixing test --- tests/test_raster_stats.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_raster_stats.py b/tests/test_raster_stats.py index 81ecf0b..d84e04e 100644 --- a/tests/test_raster_stats.py +++ b/tests/test_raster_stats.py @@ -288,17 +288,17 @@ def dataset_with_leadtime(dataset_with_time): return ds -def test_basic_upsampling(simple_dataset): +def test_basic_upsampling(dataset_with_time): """Test basic upsampling of a 2D dataset.""" target_res = 0.5 # Upsample from 1.0 to 0.5 degrees - result = upsample_raster(simple_dataset, resampled_resolution=target_res) + result = upsample_raster(dataset_with_time, resampled_resolution=target_res) # Check output resolution assert result.rio.resolution()[0] == target_res # Check output dimensions doubled (since resolution halved) - assert result.rio.width == simple_dataset.rio.width * 2 - assert result.rio.height == simple_dataset.rio.height * 2 + assert result.rio.width == dataset_with_time.rio.width * 2 + assert result.rio.height == dataset_with_time.rio.height * 2 def test_upsampling_with_time(dataset_with_time):