Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check coverage #33

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ options:
-h, --help show this help message and exit
--mode {local,dev,prod}, -m {local,dev,prod}
Run the pipeline in 'local', 'dev', or 'prod' mode.
--update-stats Calculate stats against the latest COG for a given dataset.
--backfill Whether to check and backfill for any missing dates.
--update-metadata Update the iso3 and polygon metadata tables.
--test Processes a smaller subset of the source data. Use to test the pipeline.
```

Expand Down
89 changes: 54 additions & 35 deletions run_raster_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import pandas as pd
from sqlalchemy import create_engine

from src.config.settings import LOG_LEVEL, UPSAMPLED_RESOLUTION, parse_pipeline_config
from src.config.settings import (
LOG_LEVEL,
UPSAMPLED_RESOLUTION,
config_pipeline,
)
from src.utils.cog_utils import stack_cogs
from src.utils.database_utils import (
create_dataset_table,
Expand All @@ -18,9 +22,12 @@
insert_qa_table,
postgres_upsert,
)
from src.utils.general_utils import split_date_range
from src.utils.inputs import cli_args
from src.utils.iso3_utils import create_iso3_df, get_iso3_data, load_shp_from_azure
from src.utils.iso3_utils import (
create_iso3_df,
get_iso3_data,
load_shp_from_azure,
)
from src.utils.metadata_utils import process_polygon_metadata
from src.utils.raster_utils import fast_zonal_stats_runner, prep_raster

Expand All @@ -45,13 +52,18 @@ def setup_logger(name, level=logging.INFO):
return logger


def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
def process_chunk(dates, dataset, mode, df_iso3s, engine_url, chunksize):
process_name = current_process().name
logger = setup_logger(f"{process_name}: {dataset}_{start}")
logger.info(f"Starting processing for {dataset} from {start} to {end}")
logger = setup_logger(f"{process_name}: {dataset}_{dates[0]}")
logger.info(
f"""
Starting processing for {len(dates)} dates for {dataset}
between {dates[0].strftime('%Y-%m-%d')} to {dates[-1].strftime('%Y-%m-%d')}
"""
)

engine = create_engine(engine_url)
ds = stack_cogs(start, end, dataset, mode)
ds = stack_cogs(dates, dataset, mode)

try:
for _, row in df_iso3s.iterrows():
Expand All @@ -73,13 +85,17 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
except Exception as e:
logger.error(f"Error preparing raster for {iso3}: {e}")
stack_trace = traceback.format_exc()
insert_qa_table(iso3, None, dataset, e, stack_trace, engine)
insert_qa_table(
iso3, None, dataset, e, stack_trace, engine
)
continue

try:
all_results = []
for adm_level in range(max_adm + 1):
gdf = gpd.read_file(f"{td}/{iso3.lower()}_adm{adm_level}.shp")
gdf = gpd.read_file(
f"{td}/{iso3.lower()}_adm{adm_level}.shp"
)
logger.debug(f"Computing stats for adm{adm_level}...")
df_results = fast_zonal_stats_runner(
ds_clipped,
Expand All @@ -94,7 +110,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
if df_results is not None:
all_results.append(df_results)
df_all_results = pd.concat(all_results, ignore_index=True)
logger.debug(f"Writing {len(df_all_results)} rows to database...")
logger.debug(
f"Writing {len(df_all_results)} rows to database..."
)
df_all_results.to_sql(
f"{dataset}",
con=engine,
Expand All @@ -106,7 +124,9 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
except Exception as e:
logger.error(f"Error calculating stats for {iso3}: {e}")
stack_trace = traceback.format_exc()
insert_qa_table(iso3, adm_level, dataset, e, stack_trace, engine)
insert_qa_table(
iso3, adm_level, dataset, e, stack_trace, engine
)
continue
# Clear memory
del ds_clipped
Expand All @@ -133,35 +153,34 @@ def process_chunk(start, end, dataset, mode, df_iso3s, engine_url, chunksize):
sys.exit(0)

dataset = args.dataset
logger.info(f"Updating data for {dataset}...")
logger.info("Determining pipeline configuration...")

create_qa_table(engine)
start, end, is_forecast, sel_iso3s, extra_dims = parse_pipeline_config(
dataset, args.test, args.update_stats, args.mode
config = config_pipeline(
dataset,
args.test,
args.update_stats,
args.mode,
args.backfill,
engine,
)
create_dataset_table(dataset, engine, is_forecast, extra_dims)

df_iso3s = get_iso3_data(sel_iso3s, engine)
date_ranges = split_date_range(start, end)

if len(date_ranges) > 1:
num_processes = 2
logger.info(
f"Processing {len(date_ranges)} chunks with {num_processes} processes"
)
create_dataset_table(
dataset, engine, config["forecast"], config["extra_dims"]
)
df_iso3s = get_iso3_data(config["sel_iso3s"], engine)
date_chunks = config["date_chunks"]

process_args = [
(start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for start, end in date_ranges
]
NUM_PROCESSES = 2
logger.info(
f"Processing {len(date_chunks)} date chunks with {NUM_PROCESSES} processes"
)

with Pool(num_processes) as pool:
pool.starmap(process_chunk, process_args)
process_args = [
(dates, dataset, args.mode, df_iso3s, engine_url, args.chunksize)
for dates in date_chunks
]

else:
logger.info("Processing entire date range in a single chunk")
process_chunk(
start, end, dataset, args.mode, df_iso3s, engine_url, args.chunksize
)
with Pool(NUM_PROCESSES) as pool:
pool.starmap(process_chunk, process_args)

logger.info("Done calculating and saving stats.")
1 change: 1 addition & 0 deletions src/config/era5.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: era5/monthly/processed/precip_reanalysis_v
start_date: 1981-01-01
end_date: Null
frequency: M
forecast: False
test:
start_date: 1981-01-01
Expand Down
1 change: 1 addition & 0 deletions src/config/floodscan.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: floodscan/daily/v5/processed/aer_area_300s_
start_date: 1998-01-12
end_date: Null
frequency: D
forecast: False
extra_dims:
- band : str
Expand Down
1 change: 1 addition & 0 deletions src/config/imerg.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: imerg/daily/late/v7/processed/imerg-daily-late-
start_date: 2000-06-01
end_date: Null
frequency: D
forecast: False
test:
start_date: 2000-06-01
Expand Down
1 change: 1 addition & 0 deletions src/config/seas5.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
blob_prefix: seas5/monthly/processed/precip_em_i
start_date: 1981-01-01
end_date: Null
frequency: M
forecast: True
extra_dims:
- leadtime : int
Expand Down
170 changes: 152 additions & 18 deletions src/config/settings.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import logging
import os
from datetime import date, timedelta

import coloredlogs
import pandas as pd
import yaml
from dotenv import load_dotenv

from src.utils.general_utils import get_most_recent_date, parse_extra_dims
from src.utils.general_utils import (
get_missing_dates,
get_most_recent_date,
parse_extra_dims,
)

load_dotenv()


UPSAMPLED_RESOLUTION = 0.05
LOG_LEVEL = "INFO"
LOG_LEVEL = "DEBUG"
AZURE_DB_PW_DEV = os.getenv("AZURE_DB_PW_DEV")
AZURE_DB_PW_PROD = os.getenv("AZURE_DB_PW_PROD")
DATABASES = {
Expand All @@ -19,29 +27,155 @@
}


logger = logging.getLogger(__name__)
coloredlogs.install(level=LOG_LEVEL, logger=logger)


def load_pipeline_config(pipeline_name):
config_path = os.path.join(os.path.dirname(__file__), f"{pipeline_name}.yml")
config_path = os.path.join(
os.path.dirname(__file__), f"{pipeline_name}.yml"
)
with open(config_path, "r") as config_file:
config = yaml.safe_load(config_file)
return config


def parse_pipeline_config(dataset, test, update, mode):
def config_pipeline(dataset, test, update, mode, backfill, engine):
"""
Configure pipeline parameters based on dataset configuration and runtime flags.
Also logs an overall summary of the pipeline run.

Parameters
----------
dataset : str
Name of the dataset to process
test : bool
If True, use test configuration parameters
update : bool
If True, start from most recent date
mode : str
Pipeline execution mode
backfill : bool
If True, include missing dates in processing
engine : SQLEngine
Database connection for retrieving missing dates

Returns
-------
dict
Dictionary containing
dates : list of list of datetime.date
Chunked list of dates to process
forecast : dict
Forecast configuration parameters
sel_iso3s : list or None
Selected ISO3 country codes, if any
extra_dims : dict
Additional dimension parameters
"""
config = load_pipeline_config(dataset)
if test:
start_date = config["test"]["start_date"]
end_date = config["test"]["end_date"]
sel_iso3s = config["test"]["iso3s"]
else:
start_date = config["start_date"]
end_date = config["end_date"]
sel_iso3s = None
forecast = config["forecast"]
extra_dims = parse_extra_dims(config)
config_section = config["test"] if test else config

output_config = {}
output_config["forecast"] = config["forecast"]
output_config["extra_dims"] = parse_extra_dims(config)
output_config["sel_iso3s"] = config_section.get("iso3s")

start_date = config_section["start_date"]
end_date = config_section.get("end_date")
frequency = config["frequency"]

# Now work on getting the dates to process
if not end_date:
end_date = date.today() - timedelta(days=1)
missing_dates = None
if backfill:
missing_dates = get_missing_dates(
engine,
dataset,
start_date,
end_date,
frequency,
config["forecast"],
)

# TODO: Updating by getting the most recent COG is a bit of a shortcut...
if update:
start_date = get_most_recent_date(mode, config["blob_prefix"])
end_date = None

dates = generate_date_series(
start_date, end_date, frequency, missing_dates
)
output_config["date_chunks"] = dates

# Configuration report
logger.info("=" * 50)
logger.info("Pipeline Configuration Summary:")
logger.info("=" * 50)
logger.info(f"Dataset: {dataset.upper()}")
logger.info(f"Mode: {mode}")
if update:
last_update = get_most_recent_date(mode, config["blob_prefix"])
start_date = last_update
end_date = last_update
return start_date, end_date, forecast, sel_iso3s, extra_dims
logger.info(
f"Run: Updating latest stats -- {start_date.strftime('%Y-%m-%d')}"
)
else:
logger.info(
f"Run: Archival update from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
)
logger.info(f"Total Date Chunks: {len(dates)}")
logger.info(f"Total Dates: {sum(len(chunk) for chunk in dates)}")
logger.info(f"Checked for missing dates: {backfill}")
if backfill:
logger.info(f"{len(missing_dates)} missing dates found")
for date_ in missing_dates:
logger.info(f" - {date_.strftime('%Y-%m-%d')}")
if output_config["sel_iso3s"]:
sel_iso3s = output_config["sel_iso3s"]
logger.info(f"Filtering for ISO3 codes: {sel_iso3s}")

logger.info("=" * 50)

return output_config


def generate_date_series(
start_date, end_date, frequency="D", missing_dates=None, chunk_size=100
):
"""Generate a sorted list of dates partitioned into chunks.

Parameters
----------
start_date : str or datetime
Start date in 'YYYY-MM-DD' format if string
end_date : str or datetime
End date in 'YYYY-MM-DD' format if string, or None for single date
frequency : str, default='D'
Date frequency, either 'D' for daily or 'M' for monthly
missing_dates : list, optional
Additional dates to include in the series
chunk_size : int, default=100
Maximum number of dates per chunk

Returns
-------
list of list of datetime.date
List of date chunks, where each chunk contains up to chunk_size dates,
sorted in ascending order with duplicates removed
"""
if not end_date:
dates = [start_date]
else:
dates = list(
pd.date_range(
start_date,
end_date,
freq="MS" if frequency == "M" else frequency,
)
)
if missing_dates:
dates.extend(missing_dates)
dates = sorted(list(set(dates)))
return [
dates[i : i + chunk_size] for i in range(0, len(dates), chunk_size)
]
Loading
Loading