From 839404a0e400b372d7c7fec6e39475e3baca446e Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 11:31:30 -0800 Subject: [PATCH 1/7] add frequency to configs --- src/config/era5.yml | 1 + src/config/floodscan.yml | 1 + src/config/imerg.yml | 1 + src/config/seas5.yml | 1 + 4 files changed, 4 insertions(+) diff --git a/src/config/era5.yml b/src/config/era5.yml index 0f6e993..d1ff30c 100644 --- a/src/config/era5.yml +++ b/src/config/era5.yml @@ -1,6 +1,7 @@ blob_prefix: era5/monthly/processed/precip_reanalysis_v start_date: 1981-01-01 end_date: Null +frequency: M forecast: False test: start_date: 1981-01-01 diff --git a/src/config/floodscan.yml b/src/config/floodscan.yml index 3cce4da..eca3907 100644 --- a/src/config/floodscan.yml +++ b/src/config/floodscan.yml @@ -1,6 +1,7 @@ blob_prefix: floodscan/daily/v5/processed/aer_area_300s_ start_date: 1998-01-12 end_date: Null +frequency: D forecast: False extra_dims: - band : str diff --git a/src/config/imerg.yml b/src/config/imerg.yml index 02844a5..dd78049 100644 --- a/src/config/imerg.yml +++ b/src/config/imerg.yml @@ -1,6 +1,7 @@ blob_prefix: imerg/daily/late/v7/processed/imerg-daily-late- start_date: 2000-06-01 end_date: Null +frequency: D forecast: False test: start_date: 2000-06-01 diff --git a/src/config/seas5.yml b/src/config/seas5.yml index 204cdd0..4f128ac 100644 --- a/src/config/seas5.yml +++ b/src/config/seas5.yml @@ -1,6 +1,7 @@ blob_prefix: seas5/monthly/processed/precip_em_i start_date: 1981-01-01 end_date: Null +frequency: M forecast: True extra_dims: - leadtime : int From e4a93e1eebd5a106a34eb9df76203a10843964ce Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 11:33:24 -0800 Subject: [PATCH 2/7] functions for identifying missing dates --- src/utils/general_utils.py | 84 +++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/src/utils/general_utils.py b/src/utils/general_utils.py index 3766d3e..2212dbd 100644 --- a/src/utils/general_utils.py +++ b/src/utils/general_utils.py @@ -1,5 +1,6 @@ import re from datetime import datetime, timedelta +from typing import List import pandas as pd from dateutil.relativedelta import relativedelta @@ -31,7 +32,9 @@ def split_date_range(start_date, end_date): date_ranges = [] while start < end: year_end = min(datetime(start.year, 12, 31), end) - date_ranges.append((start.strftime("%Y-%m-%d"), year_end.strftime("%Y-%m-%d"))) + date_ranges.append( + (start.strftime("%Y-%m-%d"), year_end.strftime("%Y-%m-%d")) + ) start = year_end + timedelta(days=1) return date_ranges @@ -59,7 +62,9 @@ def add_months_to_date(date_string, months): result_date = start_date + relativedelta(months=months) return result_date.strftime("%Y-%m-%d") except ValueError as e: - raise ValueError("Invalid date format. Please use 'YYYY-MM-DD'.") from e + raise ValueError( + "Invalid date format. Please use 'YYYY-MM-DD'." + ) from e # TODO: Might not scale well as we get more files in the blob @@ -126,3 +131,78 @@ def parse_extra_dims(config): parsed_extra_dims[dim] = Integer return parsed_extra_dims + + +def get_expected_dates( + start_date: str, end_date: str, frequency: str +) -> pd.DatetimeIndex: + """ + Generate a complete list of expected dates between start and end dates. + + Parameters + ---------- + start_date : str + Start date in YYYY-MM-DD format + end_date : str + End date in YYYY-MM-DD format + frequency : str + Frequency of dates, either 'D' for daily or 'M' for monthly + + Returns + ------- + pd.DatetimeIndex + Complete list of expected dates + """ + start = pd.to_datetime(start_date) + end = pd.to_datetime(end_date) + + if frequency == "M": + # For monthly data, always use first day of month + dates = pd.date_range( + start=start.replace(day=1), end=end.replace(day=1), freq="MS" + ) + elif frequency == "D": + dates = pd.date_range(start=start, end=end, freq="D") + else: + raise ValueError("Frequency must be either 'D' or 'M'") + + return dates + + +def get_missing_dates( + engine, dataset: str, start_date: str, end_date: str, frequency: str +) -> List[datetime]: + """ + Find missing dates in the database by comparing against expected dates. + + Parameters + ---------- + engine : sqlalchemy.engine.Engine + Database connection engine + dataset : str + Name of the dataset table in database + start_date : str + Start date in YYYY-MM-DD format + end_date : str + End date in YYYY-MM-DD format + frequency : str + Frequency of dates, either 'D' for daily or 'M' for monthly + + Returns + ------- + List[datetime] + List of missing dates that need to be processed + """ + # Get all expected dates + expected_dates = get_expected_dates(start_date, end_date, frequency) + + # Query existing dates from database + query = f"SELECT DISTINCT valid_date FROM {dataset} ORDER BY valid_date" + existing_dates = pd.read_sql_query(query, engine) + existing_dates["valid_date"] = pd.to_datetime(existing_dates["valid_date"]) + + # Find missing dates + missing_dates = expected_dates[ + ~expected_dates.isin(existing_dates["valid_date"]) + ] + return missing_dates.tolist() From 09ac79cafa222cf4fac59996f518c7fcc85df93f Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 13:44:59 -0800 Subject: [PATCH 3/7] switch to stacking cogs given input date list, not date range --- run_raster_stats.py | 86 ++++++++++++++++++++++++++---------------- src/config/settings.py | 77 +++++++++++++++++++++++++++++++++---- src/utils/cog_utils.py | 19 +++++----- src/utils/inputs.py | 5 +++ 4 files changed, 137 insertions(+), 50 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index 312faf6..eb937df 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -9,7 +9,11 @@ import pandas as pd from sqlalchemy import create_engine -from src.config.settings import LOG_LEVEL, UPSAMPLED_RESOLUTION, parse_pipeline_config +from src.config.settings import ( + LOG_LEVEL, + UPSAMPLED_RESOLUTION, + config_pipeline, +) from src.utils.cog_utils import stack_cogs from src.utils.database_utils import ( create_dataset_table, @@ -18,9 +22,12 @@ insert_qa_table, postgres_upsert, ) -from src.utils.general_utils import split_date_range from src.utils.inputs import cli_args -from src.utils.iso3_utils import create_iso3_df, get_iso3_data, load_shp_from_azure +from src.utils.iso3_utils import ( + create_iso3_df, + get_iso3_data, + load_shp_from_azure, +) from src.utils.metadata_utils import process_polygon_metadata from src.utils.raster_utils import fast_zonal_stats_runner, prep_raster @@ -45,13 +52,18 @@ def setup_logger(name, level=logging.INFO): return logger -def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): +def process_chunk(dates, dataset, mode, df_iso3s, engine_url, chunksize): process_name = current_process().name - logger = setup_logger(f"{process_name}: {dataset}_{start}") - logger.info(f"Starting processing for {dataset} from {start} to {end}") + logger = setup_logger(f"{process_name}: {dataset}_{dates[0]}") + logger.info( + f""" + Starting processing for {len(dates)} dates for {dataset} + between {dates[0].strftime('%Y-%m-%d')} to {dates[-1].strftime('%Y-%m-%d')} + """ + ) engine = create_engine(engine_url) - ds = stack_cogs(start, end, dataset, mode) + ds = stack_cogs(dates, dataset, mode) try: for _, row in df_iso3s.iterrows(): @@ -73,13 +85,17 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): except Exception as e: logger.error(f"Error preparing raster for {iso3}: {e}") stack_trace = traceback.format_exc() - insert_qa_table(iso3, None, dataset, e, stack_trace, engine) + insert_qa_table( + iso3, None, dataset, e, stack_trace, engine + ) continue try: all_results = [] for adm_level in range(max_adm + 1): - gdf = gpd.read_file(f"{td}/{iso3.lower()}_adm{adm_level}.shp") + gdf = gpd.read_file( + f"{td}/{iso3.lower()}_adm{adm_level}.shp" + ) logger.debug(f"Computing stats for adm{adm_level}...") df_results = fast_zonal_stats_runner( ds_clipped, @@ -94,7 +110,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): if df_results is not None: all_results.append(df_results) df_all_results = pd.concat(all_results, ignore_index=True) - logger.debug(f"Writing {len(df_all_results)} rows to database...") + logger.debug( + f"Writing {len(df_all_results)} rows to database..." + ) df_all_results.to_sql( f"{dataset}", con=engine, @@ -106,7 +124,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): except Exception as e: logger.error(f"Error calculating stats for {iso3}: {e}") stack_trace = traceback.format_exc() - insert_qa_table(iso3, adm_level, dataset, e, stack_trace, engine) + insert_qa_table( + iso3, adm_level, dataset, e, stack_trace, engine + ) continue # Clear memory del ds_clipped @@ -136,32 +156,34 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize): logger.info(f"Updating data for {dataset}...") create_qa_table(engine) - start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config( - dataset, args.test, args.update_stats, args.mode + ( + date_chunks, + is_forecast, + sel_iso3s, + extra_dims, + frequency, + ) = config_pipeline( + dataset, + args.test, + args.update_stats, + args.mode, + args.backfill, + engine, ) create_dataset_table(dataset, engine, is_forecast, extra_dims) - df_iso3s = get_iso3_data(sel_iso3s, engine) - date_ranges = split_date_range(start, end) - - if len(date_ranges) > 1: - num_processes = 2 - logger.info( - f"Processing {len(date_ranges)} chunks with {num_processes} processes" - ) - process_args = [ - (start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize) - for start, end in date_ranges - ] + NUM_PROCESSES = 2 + logger.info( + f"Processing {len(date_chunks)} date chunks with {NUM_PROCESSES} processes" + ) - with Pool(num_processes) as pool: - pool.starmap(process_chunk, process_args) + process_args = [ + (dates, dataset, args.mode, df_iso3s, engine_url, args.chunksize) + for dates in date_chunks + ] - else: - logger.info("Processing entire date range in a single chunk") - process_chunk( - start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize - ) + with Pool(NUM_PROCESSES) as pool: + pool.starmap(process_chunk, process_args) logger.info("Done calculating and saving stats.") diff --git a/src/config/settings.py b/src/config/settings.py index 72e82f4..440579f 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,15 +1,23 @@ +import logging import os from datetime import date, timedelta +import coloredlogs +import pandas as pd import yaml from dotenv import load_dotenv -from src.utils.general_utils import get_most_recent_date, parse_extra_dims +from src.utils.general_utils import ( + get_missing_dates, + get_most_recent_date, + parse_extra_dims, +) load_dotenv() + UPSAMPLED_RESOLUTION = 0.05 -LOG_LEVEL = "INFO" +LOG_LEVEL = "DEBUG" AZURE_DB_PW_DEV = os.getenv("AZURE_DB_PW_DEV") AZURE_DB_PW_PROD = os.getenv("AZURE_DB_PW_PROD") DATABASES = { @@ -18,15 +26,20 @@ "prod": f"postgresql+psycopg2://chdadmin:{AZURE_DB_PW_PROD}@chd-rasterstats-prod.postgres.database.azure.com/postgres", # noqa } +logger = logging.getLogger(__name__) +coloredlogs.install(level=LOG_LEVEL, logger=logger) + def load_pipeline_config(pipeline_name): - config_path = os.path.join(os.path.dirname(__file__), f"{pipeline_name}.yml") + config_path = os.path.join( + os.path.dirname(__file__), f"{pipeline_name}.yml" + ) with open(config_path, "r") as config_file: config = yaml.safe_load(config_file) return config -def parse_pipeline_config(dataset, test, update, mode): +def config_pipeline(dataset, test, update, mode, backfill, engine): config = load_pipeline_config(dataset) if test: start_date = config["test"]["start_date"] @@ -36,12 +49,60 @@ def parse_pipeline_config(dataset, test, update, mode): start_date = config["start_date"] end_date = config["end_date"] sel_iso3s = None + forecast = config["forecast"] extra_dims = parse_extra_dims(config) + + frequency = config["frequency"] if not end_date: end_date = date.today() - timedelta(days=1) + + missing_dates = None + if backfill: + missing_dates = get_missing_dates( + engine, dataset, start_date, end_date, frequency + ) + logger.info(f"Filling in {len(missing_dates)} missing dates:") + for date_ in missing_dates: + logger.info(f" - {date_.strftime('%Y-%m-%d')}") + + # TODO: Updating by getting the most recent COG is a bit of a shortcut... if update: - last_update = get_most_recent_date(mode, config["blob_prefix"]) - start_date = last_update - end_date = last_update - return start_date, end_date, forecast, sel_iso3s, extra_dims + start_date = get_most_recent_date(mode, config["blob_prefix"]) + end_date = None + + dates = generate_date_series( + start_date, end_date, frequency, missing_dates + ) + return dates, forecast, sel_iso3s, extra_dims, frequency + + +def generate_date_series( + start_date, end_date, frequency="D", missing_dates=None, chunk_size=100 +): + """ + Generate a sorted list of dates between start and end dates, incorporating missing dates, + partitioned into chunks of specified size. + + Parameters: + start_date (str or datetime): Start date in 'YYYY-MM-DD' format if string + end_date (str or datetime): End date in 'YYYY-MM-DD' format if string + frequency (str): 'D' for daily or 'M' for monthly + missing_dates (list): Optional list of dates to include, in 'YYYY-MM-DD' format if strings + chunk_size (int): Maximum number of dates per partition + + Returns: + list of lists: List of date chunks, where each chunk is a list of datetime.date objects + """ + if not end_date: + dates = [start_date] + else: + dates = pd.date_range( + start_date, end_date, freq="MS" if frequency == "M" else frequency + ) + if missing_dates: + dates.extend(missing_dates) + dates = sorted(list(set(dates))) + return [ + dates[i : i + chunk_size] for i in range(0, len(dates), chunk_size) + ] diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index 66a99a1..345b9a1 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -1,7 +1,6 @@ import logging import coloredlogs -import pandas as pd import rioxarray as rxr import tqdm import xarray as xr @@ -126,7 +125,7 @@ def process_floodscan(cog_name, mode): return da_in -def stack_cogs(start_date, end_date, dataset, mode="dev"): +def stack_cogs(dates, dataset, mode="dev"): """ Stack Cloud Optimized GeoTIFFs (COGs) for a specified date range into an xarray Dataset. @@ -142,7 +141,6 @@ def stack_cogs(start_date, end_date, dataset, mode="dev"): The end date of the date range for stacking the COGs. This can be a string or a datetime object. dataset : str, optional The name of the dataset to retrieve COGs from. Options include "floodscan", "era5", "imerg", and "seas5". - Default is "era5". mode : str, optional The environment mode to use when accessing the cloud storage container. May be "dev", "prod", or "local". @@ -158,8 +156,6 @@ def stack_cogs(start_date, end_date, dataset, mode="dev"): ) mode = "dev" - start_date = pd.to_datetime(start_date) - end_date = pd.to_datetime(end_date) container_client = get_container_client(mode, "raster") try: @@ -173,14 +169,17 @@ def stack_cogs(start_date, end_date, dataset, mode="dev"): cogs_list = [ x.name for x in container_client.list_blobs(name_starts_with=prefix) - if (parse_date(x.name) >= start_date) - & (parse_date(x.name) <= end_date) # noqa + if (parse_date(x.name) in (dates)) ] + logger.debug(f"Processing {len(cogs_list)} cog(s):") + for cog in cogs_list: + logger.debug(f" - {cog}") + + if len(cogs_list) != len(dates): + logger.warning("Not all COGs available, given input dates") if len(cogs_list) == 0: - raise Exception( - f"No COGs found to process: {start_date} to {end_date}" - ) + raise Exception(f"No COGs found to process for dates: {dates}") das = [] diff --git a/src/utils/inputs.py b/src/utils/inputs.py index 3e41b26..1f7cc03 100644 --- a/src/utils/inputs.py +++ b/src/utils/inputs.py @@ -42,4 +42,9 @@ def cli_args(): type=int, default=100000, ) + parser.add_argument( + "--backfill", + action="store_true", + help="Whether to check and backfill for any missing dates", + ) return parser.parse_args() From 4591806ca3965fbe770ef8ced38286acb13c10fa Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 15:15:19 -0800 Subject: [PATCH 4/7] clean up functions and add config summary --- run_raster_stats.py | 17 +++---- src/config/settings.py | 102 +++++++++++++++++++++++++++++++++-------- src/utils/cog_utils.py | 12 ++--- 3 files changed, 96 insertions(+), 35 deletions(-) diff --git a/run_raster_stats.py b/run_raster_stats.py index eb937df..aa35da9 100644 --- a/run_raster_stats.py +++ b/run_raster_stats.py @@ -153,16 +153,10 @@ def process_chunk(dates, dataset, mode, df_iso3s, engine_url, chunksize): sys.exit(0) dataset = args.dataset - logger.info(f"Updating data for {dataset}...") + logger.info("Determining pipeline configuration...") create_qa_table(engine) - ( - date_chunks, - is_forecast, - sel_iso3s, - extra_dims, - frequency, - ) = config_pipeline( + config = config_pipeline( dataset, args.test, args.update_stats, @@ -170,8 +164,11 @@ def process_chunk(dates, dataset, mode, df_iso3s, engine_url, chunksize): args.backfill, engine, ) - create_dataset_table(dataset, engine, is_forecast, extra_dims) - df_iso3s = get_iso3_data(sel_iso3s, engine) + create_dataset_table( + dataset, engine, config["forecast"], config["extra_dims"] + ) + df_iso3s = get_iso3_data(config["sel_iso3s"], engine) + date_chunks = config["date_chunks"] NUM_PROCESSES = 2 logger.info( diff --git a/src/config/settings.py b/src/config/settings.py index 440579f..a01a015 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -26,6 +26,7 @@ "prod": f"postgresql+psycopg2://chdadmin:{AZURE_DB_PW_PROD}@chd-rasterstats-prod.postgres.database.azure.com/postgres", # noqa } + logger = logging.getLogger(__name__) coloredlogs.install(level=LOG_LEVEL, logger=logger) @@ -40,31 +41,63 @@ def load_pipeline_config(pipeline_name): def config_pipeline(dataset, test, update, mode, backfill, engine): + """ + Configure pipeline parameters based on dataset configuration and runtime flags. + Also logs an overall summary of the pipeline run. + + Parameters + ---------- + dataset : str + Name of the dataset to process + test : bool + If True, use test configuration parameters + update : bool + If True, start from most recent date + mode : str + Pipeline execution mode + backfill : bool + If True, include missing dates in processing + engine : SQLEngine + Database connection for retrieving missing dates + + Returns + ------- + dict + Dictionary containing + dates : list of list of datetime.date + Chunked list of dates to process + forecast : dict + Forecast configuration parameters + sel_iso3s : list or None + Selected ISO3 country codes, if any + extra_dims : dict + Additional dimension parameters + """ config = load_pipeline_config(dataset) - if test: - start_date = config["test"]["start_date"] - end_date = config["test"]["end_date"] - sel_iso3s = config["test"]["iso3s"] - else: - start_date = config["start_date"] - end_date = config["end_date"] - sel_iso3s = None + config_section = config["test"] if test else config - forecast = config["forecast"] - extra_dims = parse_extra_dims(config) + output_config = {} + output_config["forecast"] = config["forecast"] + output_config["extra_dims"] = parse_extra_dims(config) + output_config["sel_iso3s"] = config_section.get("iso3s") + start_date = config_section["start_date"] + end_date = config_section.get("end_date") frequency = config["frequency"] + + # Now work on getting the dates to process if not end_date: end_date = date.today() - timedelta(days=1) - missing_dates = None if backfill: missing_dates = get_missing_dates( - engine, dataset, start_date, end_date, frequency + engine, + dataset, + start_date, + end_date, + frequency, + config["forecast"], ) - logger.info(f"Filling in {len(missing_dates)} missing dates:") - for date_ in missing_dates: - logger.info(f" - {date_.strftime('%Y-%m-%d')}") # TODO: Updating by getting the most recent COG is a bit of a shortcut... if update: @@ -74,7 +107,36 @@ def config_pipeline(dataset, test, update, mode, backfill, engine): dates = generate_date_series( start_date, end_date, frequency, missing_dates ) - return dates, forecast, sel_iso3s, extra_dims, frequency + output_config["date_chunks"] = dates + + # Configuration report + logger.info("=" * 50) + logger.info("Pipeline Configuration Summary:") + logger.info("=" * 50) + logger.info(f"Dataset: {dataset.upper()}") + logger.info(f"Mode: {mode}") + if update: + logger.info( + f"Run: Updating latest stats -- {start_date.strftime('%Y-%m-%d')}" + ) + else: + logger.info( + f"Run: Archival update from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}" + ) + logger.info(f"Total Date Chunks: {len(dates)}") + logger.info(f"Total Dates: {sum(len(chunk) for chunk in dates)}") + logger.info(f"Checked for missing dates: {backfill}") + if backfill: + logger.info(f"{len(missing_dates)} missing dates found") + for date_ in missing_dates: + logger.info(f" - {date_.strftime('%Y-%m-%d')}") + if output_config["sel_iso3s"]: + sel_iso3s = output_config["sel_iso3s"] + logger.info(f"Filtering for ISO3 codes: {sel_iso3s}") + + logger.info("=" * 50) + + return output_config def generate_date_series( @@ -97,8 +159,12 @@ def generate_date_series( if not end_date: dates = [start_date] else: - dates = pd.date_range( - start_date, end_date, freq="MS" if frequency == "M" else frequency + dates = list( + pd.date_range( + start_date, + end_date, + freq="MS" if frequency == "M" else frequency, + ) ) if missing_dates: dates.extend(missing_dates) diff --git a/src/utils/cog_utils.py b/src/utils/cog_utils.py index 345b9a1..d603074 100644 --- a/src/utils/cog_utils.py +++ b/src/utils/cog_utils.py @@ -5,12 +5,12 @@ import tqdm import xarray as xr -from src.config.settings import load_pipeline_config +from src.config.settings import LOG_LEVEL, load_pipeline_config from src.utils.cloud_utils import get_cog_url, get_container_client from src.utils.general_utils import parse_date logger = logging.getLogger(__name__) -coloredlogs.install(level="DEBUG", logger=logger) +coloredlogs.install(level=LOG_LEVEL, logger=logger) # TODO: Update now that IMERG data has the right .attrs metadata @@ -130,15 +130,13 @@ def stack_cogs(dates, dataset, mode="dev"): Stack Cloud Optimized GeoTIFFs (COGs) for a specified date range into an xarray Dataset. This function retrieves and stacks COGs from a cloud storage container for a given dataset and - date range, and returns the stacked data as an `xarray.Dataset`. The data is accessed remotely + list of dates, and returns the stacked data as an `xarray.Dataset`. The data is accessed remotely and processed into a single `Dataset` with the dimension `date` as the stacking dimension. Parameters ---------- - start_date : str or datetime-like - The start date of the date range for stacking the COGs. This can be a string or a datetime object. - end_date : str or datetime-like - The end date of the date range for stacking the COGs. This can be a string or a datetime object. + dates : list + The list of dates for which we want to load in COGs dataset : str, optional The name of the dataset to retrieve COGs from. Options include "floodscan", "era5", "imerg", and "seas5". mode : str, optional From d25d07032494f3340fafafb079238939e4322d96 Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 15:16:44 -0800 Subject: [PATCH 5/7] fix column reference for forecasts --- src/utils/general_utils.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/utils/general_utils.py b/src/utils/general_utils.py index 2212dbd..0f5527b 100644 --- a/src/utils/general_utils.py +++ b/src/utils/general_utils.py @@ -170,7 +170,12 @@ def get_expected_dates( def get_missing_dates( - engine, dataset: str, start_date: str, end_date: str, frequency: str + engine, + dataset: str, + start_date: str, + end_date: str, + frequency: str, + forecast: bool, ) -> List[datetime]: """ Find missing dates in the database by comparing against expected dates. @@ -187,6 +192,8 @@ def get_missing_dates( End date in YYYY-MM-DD format frequency : str Frequency of dates, either 'D' for daily or 'M' for monthly + forecast : bool + Whether or not the dataset is a forecast Returns ------- @@ -196,13 +203,17 @@ def get_missing_dates( # Get all expected dates expected_dates = get_expected_dates(start_date, end_date, frequency) + date_column = "issued_date" if forecast else "valid_date" + # Query existing dates from database - query = f"SELECT DISTINCT valid_date FROM {dataset} ORDER BY valid_date" + query = ( + f"SELECT DISTINCT {date_column} FROM {dataset} ORDER BY {date_column}" + ) existing_dates = pd.read_sql_query(query, engine) - existing_dates["valid_date"] = pd.to_datetime(existing_dates["valid_date"]) + existing_dates[date_column] = pd.to_datetime(existing_dates[date_column]) # Find missing dates missing_dates = expected_dates[ - ~expected_dates.isin(existing_dates["valid_date"]) + ~expected_dates.isin(existing_dates[date_column]) ] return missing_dates.tolist() From f1d18fd4ec592a03e2c3f5de0d8256b1a4a25acb Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 16:55:41 -0800 Subject: [PATCH 6/7] remove old function and update docstring --- src/config/settings.py | 33 ++++++++++++++++++++------------- src/utils/general_utils.py | 33 +-------------------------------- 2 files changed, 21 insertions(+), 45 deletions(-) diff --git a/src/config/settings.py b/src/config/settings.py index a01a015..3ea0efb 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -142,19 +142,26 @@ def config_pipeline(dataset, test, update, mode, backfill, engine): def generate_date_series( start_date, end_date, frequency="D", missing_dates=None, chunk_size=100 ): - """ - Generate a sorted list of dates between start and end dates, incorporating missing dates, - partitioned into chunks of specified size. - - Parameters: - start_date (str or datetime): Start date in 'YYYY-MM-DD' format if string - end_date (str or datetime): End date in 'YYYY-MM-DD' format if string - frequency (str): 'D' for daily or 'M' for monthly - missing_dates (list): Optional list of dates to include, in 'YYYY-MM-DD' format if strings - chunk_size (int): Maximum number of dates per partition - - Returns: - list of lists: List of date chunks, where each chunk is a list of datetime.date objects + """Generate a sorted list of dates partitioned into chunks. + + Parameters + ---------- + start_date : str or datetime + Start date in 'YYYY-MM-DD' format if string + end_date : str or datetime + End date in 'YYYY-MM-DD' format if string, or None for single date + frequency : str, default='D' + Date frequency, either 'D' for daily or 'M' for monthly + missing_dates : list, optional + Additional dates to include in the series + chunk_size : int, default=100 + Maximum number of dates per chunk + + Returns + ------- + list of list of datetime.date + List of date chunks, where each chunk contains up to chunk_size dates, + sorted in ascending order with duplicates removed """ if not end_date: dates = [start_date] diff --git a/src/utils/general_utils.py b/src/utils/general_utils.py index 0f5527b..03c2395 100644 --- a/src/utils/general_utils.py +++ b/src/utils/general_utils.py @@ -1,5 +1,5 @@ import re -from datetime import datetime, timedelta +from datetime import datetime from typing import List import pandas as pd @@ -9,37 +9,6 @@ from src.utils.cloud_utils import get_container_client -def split_date_range(start_date, end_date): - """ - Split the date range into yearly chunks if the range is greater than a year. - - Parameters - ---------- - start_date (str): Start date in 'YYYY-MM-DD' format - end_date (str): End date in 'YYYY-MM-DD' format - - Returns - ------- - list of tuples: Each tuple contains the start and end date for a chunk - """ - start = pd.to_datetime(start_date) - end = pd.to_datetime(end_date) - - # If the date range is less than or equal to a year, return it as a single chunk - if (end - start).days <= 365: - return [(start_date, end_date)] - - date_ranges = [] - while start < end: - year_end = min(datetime(start.year, 12, 31), end) - date_ranges.append( - (start.strftime("%Y-%m-%d"), year_end.strftime("%Y-%m-%d")) - ) - start = year_end + timedelta(days=1) - - return date_ranges - - def add_months_to_date(date_string, months): """ Add or subtract a number of months to/from a given date string. From 431f1b394741149703bfec23ab14761869fe1f6b Mon Sep 17 00:00:00 2001 From: hannahker Date: Wed, 11 Dec 2024 16:59:43 -0800 Subject: [PATCH 7/7] update docs --- README.md | 3 +++ src/utils/inputs.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f22d3bb..d3e7e70 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,9 @@ options: -h, --help show this help message and exit --mode {local,dev,prod}, -m {local,dev,prod} Run the pipeline in 'local', 'dev', or 'prod' mode. + --update-stats Calculate stats against the latest COG for a given dataset. + --backfill Whether to check and backfill for any missing dates. + --update-metadata Update the iso3 and polygon metadata tables. --test Processes a smaller subset of the source data. Use to test the pipeline. ``` diff --git a/src/utils/inputs.py b/src/utils/inputs.py index 1f7cc03..e63e645 100644 --- a/src/utils/inputs.py +++ b/src/utils/inputs.py @@ -28,7 +28,7 @@ def cli_args(): ) parser.add_argument( "--update-stats", - help="""Calculates stats based on recently updated data""", + help="""Calculate stats against the latest COG for a given dataset.""", action="store_true", ) parser.add_argument(