Skip to content

Commit

Permalink
Merge pull request #33 from OCHA-DAP/check-coverage
Browse files Browse the repository at this point in the history
Check coverage
  • Loading branch information
hannahker authored Dec 20, 2024
2 parents ca5d895 + 431f1b3 commit ac468e5
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 102 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ options:
-h, --help show this help message and exit
--mode {local,dev,prod}, -m {local,dev,prod}
Run the pipeline in 'local', 'dev', or 'prod' mode.
--update-stats Calculate stats against the latest COG for a given dataset.
--backfill Whether to check and backfill for any missing dates.
--update-metadata Update the iso3 and polygon metadata tables.
--test Processes a smaller subset of the source data. Use to test the pipeline.
```

Expand Down
89 changes: 54 additions & 35 deletions run_raster_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import pandas as pd
from sqlalchemy import create_engine

from src.config.settings import LOG_LEVEL, UPSAMPLED_RESOLUTION, parse_pipeline_config
from src.config.settings import (
LOG_LEVEL,
UPSAMPLED_RESOLUTION,
config_pipeline,
)
from src.utils.cog_utils import stack_cogs
from src.utils.database_utils import (
create_dataset_table,
Expand All @@ -18,9 +22,12 @@
insert_qa_table,
postgres_upsert,
)
from src.utils.general_utils import split_date_range
from src.utils.inputs import cli_args
from src.utils.iso3_utils import create_iso3_df, get_iso3_data, load_shp_from_azure
from src.utils.iso3_utils import (
create_iso3_df,
get_iso3_data,
load_shp_from_azure,
)
from src.utils.metadata_utils import process_polygon_metadata
from src.utils.raster_utils import fast_zonal_stats_runner, prep_raster

Expand All @@ -45,13 +52,18 @@ def setup_logger(name, level=logging.INFO):
return logger


def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
def process_chunk(dates, dataset, mode, df_iso3s, engine_url, chunksize):
process_name = current_process().name
logger = setup_logger(f"{process_name}: {dataset}_{start}")
logger.info(f"Starting processing for {dataset} from {start} to {end}")
logger = setup_logger(f"{process_name}: {dataset}_{dates[0]}")
logger.info(
f"""
Starting processing for {len(dates)} dates for {dataset}
between {dates[0].strftime('%Y-%m-%d')} to {dates[-1].strftime('%Y-%m-%d')}
"""
)

engine = create_engine(engine_url)
ds = stack_cogs(start, end, dataset, mode)
ds = stack_cogs(dates, dataset, mode)

try:
for _, row in df_iso3s.iterrows():
Expand All @@ -73,13 +85,17 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
except Exception as e:
logger.error(f"Error preparing raster for {iso3}: {e}")
stack_trace = traceback.format_exc()
insert_qa_table(iso3, None, dataset, e, stack_trace, engine)
insert_qa_table(
iso3, None, dataset, e, stack_trace, engine
)
continue

try:
all_results = []
for adm_level in range(max_adm + 1):
gdf = gpd.read_file(f"{td}/{iso3.lower()}_adm{adm_level}.shp")
gdf = gpd.read_file(
f"{td}/{iso3.lower()}_adm{adm_level}.shp"
)
logger.debug(f"Computing stats for adm{adm_level}...")
df_results = fast_zonal_stats_runner(
ds_clipped,
Expand All @@ -94,7 +110,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
if df_results is not None:
all_results.append(df_results)
df_all_results = pd.concat(all_results, ignore_index=True)
logger.debug(f"Writing {len(df_all_results)} rows to database...")
logger.debug(
f"Writing {len(df_all_results)} rows to database..."
)
df_all_results.to_sql(
f"{dataset}",
con=engine,
Expand All @@ -106,7 +124,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
except Exception as e:
logger.error(f"Error calculating stats for {iso3}: {e}")
stack_trace = traceback.format_exc()
insert_qa_table(iso3, adm_level, dataset, e, stack_trace, engine)
insert_qa_table(
iso3, adm_level, dataset, e, stack_trace, engine
)
continue
# Clear memory
del ds_clipped
Expand All @@ -133,35 +153,34 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
sys.exit(0)

dataset = args.dataset
logger.info(f"Updating data for {dataset}...")
logger.info("Determining pipeline configuration...")

create_qa_table(engine)
start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config(
dataset, args.test, args.update_stats, args.mode
config = config_pipeline(
dataset,
args.test,
args.update_stats,
args.mode,
args.backfill,
engine,
)
create_dataset_table(dataset, engine, is_forecast, extra_dims)

df_iso3s = get_iso3_data(sel_iso3s, engine)
date_ranges = split_date_range(start, end)

if len(date_ranges) > 1:
num_processes = 2
logger.info(
f"Processing {len(date_ranges)} chunks with {num_processes} processes"
)
create_dataset_table(
dataset, engine, config["forecast"], config["extra_dims"]
)
df_iso3s = get_iso3_data(config["sel_iso3s"], engine)
date_chunks = config["date_chunks"]

process_args = [
(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for start, end in date_ranges
]
NUM_PROCESSES = 2
logger.info(
f"Processing {len(date_chunks)} date chunks with {NUM_PROCESSES} processes"
)

with Pool(num_processes) as pool:
pool.starmap(process_chunk, process_args)
process_args = [
(dates, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for dates in date_chunks
]

else:
logger.info("Processing entire date range in a single chunk")
process_chunk(
start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize
)
with Pool(NUM_PROCESSES) as pool:
pool.starmap(process_chunk, process_args)

logger.info("Done calculating and saving stats.")
1 change: 1 addition & 0 deletions src/config/era5.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: era5/monthly/processed/precip_reanalysis_v
start_date: 1981-01-01
end_date: Null
frequency: M
forecast: False
test:
start_date: 1981-01-01
Expand Down
1 change: 1 addition & 0 deletions src/config/floodscan.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: floodscan/daily/v5/processed/aer_area_300s_
start_date: 1998-01-12
end_date: Null
frequency: D
forecast: False
extra_dims:
- band : str
Expand Down
1 change: 1 addition & 0 deletions src/config/imerg.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: imerg/daily/late/v7/processed/imerg-daily-late-
start_date: 2000-06-01
end_date: Null
frequency: D
forecast: False
test:
start_date: 2000-06-01
Expand Down
1 change: 1 addition & 0 deletions src/config/seas5.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: seas5/monthly/processed/precip_em_i
start_date: 1981-01-01
end_date: Null
frequency: M
forecast: True
extra_dims:
- leadtime : int
Expand Down
170 changes: 152 additions & 18 deletions src/config/settings.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import logging
import os
from datetime import date, timedelta

import coloredlogs
import pandas as pd
import yaml
from dotenv import load_dotenv

from src.utils.general_utils import get_most_recent_date, parse_extra_dims
from src.utils.general_utils import (
get_missing_dates,
get_most_recent_date,
parse_extra_dims,
)

load_dotenv()


UPSAMPLED_RESOLUTION = 0.05
LOG_LEVEL = "INFO"
LOG_LEVEL = "DEBUG"
AZURE_DB_PW_DEV = os.getenv("AZURE_DB_PW_DEV")
AZURE_DB_PW_PROD = os.getenv("AZURE_DB_PW_PROD")
DATABASES = {
Expand All @@ -19,29 +27,155 @@
}


logger = logging.getLogger(__name__)
coloredlogs.install(level=LOG_LEVEL, logger=logger)


def load_pipeline_config(pipeline_name):
config_path = os.path.join(os.path.dirname(__file__), f"{pipeline_name}.yml")
config_path = os.path.join(
os.path.dirname(__file__), f"{pipeline_name}.yml"
)
with open(config_path, "r") as config_file:
config = yaml.safe_load(config_file)
return config


def parse_pipeline_config(dataset, test, update, mode):
def config_pipeline(dataset, test, update, mode, backfill, engine):
"""
Configure pipeline parameters based on dataset configuration and runtime flags.
Also logs an overall summary of the pipeline run.
Parameters
----------
dataset : str
Name of the dataset to process
test : bool
If True, use test configuration parameters
update : bool
If True, start from most recent date
mode : str
Pipeline execution mode
backfill : bool
If True, include missing dates in processing
engine : SQLEngine
Database connection for retrieving missing dates
Returns
-------
dict
Dictionary containing
dates : list of list of datetime.date
Chunked list of dates to process
forecast : dict
Forecast configuration parameters
sel_iso3s : list or None
Selected ISO3 country codes, if any
extra_dims : dict
Additional dimension parameters
"""
config = load_pipeline_config(dataset)
if test:
start_date = config["test"]["start_date"]
end_date = config["test"]["end_date"]
sel_iso3s = config["test"]["iso3s"]
else:
start_date = config["start_date"]
end_date = config["end_date"]
sel_iso3s = None
forecast = config["forecast"]
extra_dims = parse_extra_dims(config)
config_section = config["test"] if test else config

output_config = {}
output_config["forecast"] = config["forecast"]
output_config["extra_dims"] = parse_extra_dims(config)
output_config["sel_iso3s"] = config_section.get("iso3s")

start_date = config_section["start_date"]
end_date = config_section.get("end_date")
frequency = config["frequency"]

# Now work on getting the dates to process
if not end_date:
end_date = date.today() - timedelta(days=1)
missing_dates = None
if backfill:
missing_dates = get_missing_dates(
engine,
dataset,
start_date,
end_date,
frequency,
config["forecast"],
)

# TODO: Updating by getting the most recent COG is a bit of a shortcut...
if update:
start_date = get_most_recent_date(mode, config["blob_prefix"])
end_date = None

dates = generate_date_series(
start_date, end_date, frequency, missing_dates
)
output_config["date_chunks"] = dates

# Configuration report
logger.info("=" * 50)
logger.info("Pipeline Configuration Summary:")
logger.info("=" * 50)
logger.info(f"Dataset: {dataset.upper()}")
logger.info(f"Mode: {mode}")
if update:
last_update = get_most_recent_date(mode, config["blob_prefix"])
start_date = last_update
end_date = last_update
return start_date, end_date, forecast, sel_iso3s, extra_dims
logger.info(
f"Run: Updating latest stats -- {start_date.strftime('%Y-%m-%d')}"
)
else:
logger.info(
f"Run: Archival update from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
)
logger.info(f"Total Date Chunks: {len(dates)}")
logger.info(f"Total Dates: {sum(len(chunk) for chunk in dates)}")
logger.info(f"Checked for missing dates: {backfill}")
if backfill:
logger.info(f"{len(missing_dates)} missing dates found")
for date_ in missing_dates:
logger.info(f" - {date_.strftime('%Y-%m-%d')}")
if output_config["sel_iso3s"]:
sel_iso3s = output_config["sel_iso3s"]
logger.info(f"Filtering for ISO3 codes: {sel_iso3s}")

logger.info("=" * 50)

return output_config


def generate_date_series(
start_date, end_date, frequency="D", missing_dates=None, chunk_size=100
):
"""Generate a sorted list of dates partitioned into chunks.
Parameters
----------
start_date : str or datetime
Start date in 'YYYY-MM-DD' format if string
end_date : str or datetime
End date in 'YYYY-MM-DD' format if string, or None for single date
frequency : str, default='D'
Date frequency, either 'D' for daily or 'M' for monthly
missing_dates : list, optional
Additional dates to include in the series
chunk_size : int, default=100
Maximum number of dates per chunk
Returns
-------
list of list of datetime.date
List of date chunks, where each chunk contains up to chunk_size dates,
sorted in ascending order with duplicates removed
"""
if not end_date:
dates = [start_date]
else:
dates = list(
pd.date_range(
start_date,
end_date,
freq="MS" if frequency == "M" else frequency,
)
)
if missing_dates:
dates.extend(missing_dates)
dates = sorted(list(set(dates)))
return [
dates[i : i + chunk_size] for i in range(0, len(dates), chunk_size)
]
Loading

0 comments on commit ac468e5

Please sign in to comment.