From 971ca71418c1061fdb0a8c5fc89d1205fc928af8 Mon Sep 17 00:00:00 2001 From: akrherz Date: Mon, 30 Dec 2024 09:57:47 -0600 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=90=9B=20Significant=20CLI=20refactor?= =?UTF-8?q?=20to=20fix=20grid=20nav=20issues?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refs #307 --- scripts/RT/env2database.py | 15 +- scripts/cligen/daily_clifile_editor.py | 355 ++++++++++++------------- scripts/cligen/proctor_tile_edit.py | 93 +++++-- 3 files changed, 247 insertions(+), 216 deletions(-) diff --git a/scripts/RT/env2database.py b/scripts/RT/env2database.py index 7bd6b0f6..b4253cd1 100644 --- a/scripts/RT/env2database.py +++ b/scripts/RT/env2database.py @@ -9,7 +9,6 @@ import argparse import datetime -import gzip import os import re import sys @@ -18,9 +17,8 @@ import geopandas as gpd import numpy as np import pandas as pd -from affine import Affine +import rasterio from pyiem.grid.zs import CachingZonalStats -from pyiem.iemre import NORTH, WEST from pyiem.util import get_dbconn, get_dbconnstr, logger from tqdm import tqdm @@ -28,7 +26,6 @@ from pydep.util import load_scenarios LOG = logger() -PRECIP_AFF = Affine(0.01, 0.0, WEST, 0.0, -0.01, NORTH) CONFIG = {"subset": False} # Maximum precip value allowed, will alert otherwise, see dailyerosion/dep#65 @@ -147,21 +144,23 @@ def load_precip(dates, huc12s): if CONFIG["subset"]: huc12df = huc12df.loc[huc12s] - czs = CachingZonalStats(PRECIP_AFF) + czs = None # 2. Loop over dates res = {} progress = tqdm(dates, disable=(not sys.stdout.isatty())) for date in progress: progress.set_description(date.strftime("%Y-%m-%d")) - fn = date.strftime("/mnt/idep2/data/dailyprecip/%Y/%Y%m%d.npy.gz") + fn = date.strftime("/mnt/idep2/data/dailyprecip/%Y/%Y%m%d.geotiff") if not os.path.isfile(fn): LOG.info("Missing precip: %s", fn) for huc12 in huc12df.index.values: d = res.setdefault(huc12, []) d.append(0) continue - with gzip.GzipFile(fn, "r") as fh: - pcp = np.flipud(np.load(file=fh)) + with rasterio.open(fn, "r") as ds: + if czs is None: + czs = CachingZonalStats(ds.transform) + pcp = ds.read(1) * ds.scales[0] # nodata here represents the value that is set to missing within the # source dataset!, setting to zero has strange side affects pcp[pcp < 0] = np.nan diff --git a/scripts/cligen/daily_clifile_editor.py b/scripts/cligen/daily_clifile_editor.py index 1f0c400c..77111c2d 100644 --- a/scripts/cligen/daily_clifile_editor.py +++ b/scripts/cligen/daily_clifile_editor.py @@ -1,9 +1,10 @@ -"""DEP WEPP cli editor. One "tile" at a time.""" +"""DEP WEPP cli editor. + +This script is given some chunk of work to do. We are presently tied to a +0.01x0.01 degree grid. + +""" -try: - from zoneinfo import ZoneInfo # type: ignore -except ImportError: - from backports.zoneinfo import ZoneInfo # type: ignore import os import sys import traceback @@ -12,24 +13,24 @@ from io import StringIO from multiprocessing import cpu_count from multiprocessing.pool import ThreadPool +from zoneinfo import ZoneInfo import click import numpy as np +import pandas as pd +import rasterio from affine import Affine from osgeo import gdal -from pyiem import iemre +from pyiem.database import get_sqlalchemy_conn +from pyiem.grid.nav import IEMRE, STAGE4 +from pyiem.iemre import get_grids, hourly_offset from pyiem.util import archive_fetch, convert_value, logger, ncopen from rasterio.errors import NotGeoreferencedWarning from rasterio.warp import reproject +from sqlalchemy import text from tqdm import tqdm -from pydep.io.cli import ( - BOUNDS, - check_has_clifiles, - compute_tile_bounds, - daily_formatter, -) -from pydep.util import get_cli_fname +from pydep.io.cli import daily_formatter # NB: This seems to be some thread safety issue with GDAL warnings.simplefilter("ignore", NotGeoreferencedWarning) @@ -47,6 +48,29 @@ ESTIMATED_YEARS = date.today().year - 2007 + 1 +def load_clifiles( + scenario: int, west: int, east: int, south: int, north: int +) -> pd.DataFrame: + """Load the climate files for the given scenario""" + with get_sqlalchemy_conn("idep") as conn: + clidf = pd.read_sql( + text(""" + select st_x(geom) as lon, st_y(geom) as lat, filepath from climate_files + WHERE scenario = :scenario and ST_Contains( + ST_MakeEnvelope(:west, :south, :east, :north, 4326), geom) + """), + conn, + params={ + "scenario": scenario, + "west": west - 0.005, + "east": east, + "north": north, + "south": south - 0.005, + }, + ) + return clidf + + def get_sts_ets_at_localhour(dt: date, local_hour): """Return a Day Interval in UTC for the given date at CST/CDT hour.""" # ZoneInfo is supposed to get this right at instanciation @@ -71,96 +95,47 @@ def get_sts_ets_at_localhour(dt: date, local_hour): ) -def iemre_bounds_check(name, val, lower, upper) -> None: +def iemre_bounds_check(clidf: pd.DataFrame, col: str, lower, upper) -> None: """Make sure our data is within bounds, if not, exit!""" - if np.isnan(val).all(): - LOG.warning("FATAL: iemre %s all NaN", name) - sys.exit(3) - minval = np.nanmin(val) - maxval = np.nanmax(val) - if minval < lower or maxval > upper: + if clidf[col].min() <= lower or clidf[col].max() > upper: LOG.warning( "FATAL: iemre failure %s %.3f to %.3f [%.3f to %.3f]", - name, - minval, - maxval, + col, + clidf[col].min(), + clidf[col].max(), lower, upper, ) sys.exit(3) -def load_iemre(data, valid, tile_affine: Affine): +def load_iemre(clidf: pd.DataFrame, valid: date) -> None: """Use IEM Reanalysis for non-precip data 1/8 degree product is nearest neighbor resampled to the 0.01 degree grid """ - ds = iemre.get_grids( + ds = get_grids( valid, varnames=["high_tmpk", "low_tmpk", "avg_dwpk", "wind_speed", "rsds"], ) - - dom = iemre.DOMAINS[""] - iemre_affine = Affine( - iemre.DX, 0.0, dom["west"], 0.0, iemre.DY, dom["south"] - ) - reproject( + highc = convert_value(ds["high_tmpk"], "degK", "degC") + lowc = convert_value(ds["low_tmpk"], "degK", "degC") + dwpc = convert_value(ds["avg_dwpk"], "degK", "degC") + for idx, row in clidf.iterrows(): + i, j = IEMRE.find_ij(row["lon"], row["lat"]) # Convert W m-2 to langleys per day - ds["rsds"] * 86400.0 / 1_000_000.0 * 23.9, - data["solar"], - src_transform=iemre_affine, - src_crs="EPSG:4326", - dst_transform=tile_affine, - dst_crs="EPSG:4326", - dst_nodata=np.nan, - ) - # What's a reasonable max bounts? Well, 50 MJ/d should be an extreme high - iemre_bounds_check("rsds", data["solar"], 0, 50.0 * 23.9) - - reproject( - convert_value(ds["high_tmpk"], "degK", "degC"), - data["high"], - src_transform=iemre_affine, - src_crs="EPSG:4326", - dst_transform=tile_affine, - dst_crs="EPSG:4326", - dst_nodata=np.nan, - ) - iemre_bounds_check("high", data["high"], -60, 60) - - reproject( - convert_value(ds["low_tmpk"], "degK", "degC"), - data["low"], - src_transform=iemre_affine, - src_crs="EPSG:4326", - dst_transform=tile_affine, - dst_crs="EPSG:4326", - dst_nodata=np.nan, - ) - iemre_bounds_check("low", data["low"], -60, 60) - - reproject( - convert_value(ds["avg_dwpk"], "degK", "degC"), - data["dwpt"], - src_transform=iemre_affine, - src_crs="EPSG:4326", - dst_transform=tile_affine, - dst_crs="EPSG:4326", - dst_nodata=np.nan, - ) - iemre_bounds_check("dwpt", data["dwpt"], -60, 60) + clidf.at[idx, "solar"] = ds["rsds"][j, i] * 86400 / 1_000_000.0 * 23.9 + clidf.at[idx, "high"] = highc[j, i] + clidf.at[idx, "low"] = lowc[j, i] + clidf.at[idx, "dwpt"] = dwpc[j, i] + clidf.at[idx, "wind"] = ds["wind_speed"][j, i] - # Wind is already in m/s, but could be masked - reproject( - ds["wind_speed"], - data["wind"], - src_transform=iemre_affine, - src_crs="EPSG:4326", - dst_transform=tile_affine, - dst_crs="EPSG:4326", - dst_nodata=np.nan, - ) - iemre_bounds_check("wind", data["wind"], 0, 40) + # What's a reasonable max bounts? Well, 50 MJ/d should be an extreme high + iemre_bounds_check(clidf, "solar", 0, 50.0 * 23.9) + iemre_bounds_check(clidf, "high", -60, 60) + iemre_bounds_check(clidf, "low", -60, 60) + iemre_bounds_check(clidf, "dwpt", -60, 60) + iemre_bounds_check(clidf, "wind", 0, 40) def load_stage4(data, dt: date, tile_affine): @@ -173,8 +148,8 @@ def load_stage4(data, dt: date, tile_affine): # The stage4 files store precip in the rears, so compute 1 AM one_am, tomorrow = get_sts_ets_at_localhour(dt, 1) - sts_tidx = iemre.hourly_offset(one_am) - ets_tidx = iemre.hourly_offset(tomorrow) + sts_tidx = hourly_offset(one_am) + ets_tidx = hourly_offset(tomorrow) LOG.debug( "stage4 sts_tidx:%s[%s] ets_tidx:%s[%s]", sts_tidx, @@ -182,16 +157,6 @@ def load_stage4(data, dt: date, tile_affine): ets_tidx, tomorrow, ) - # ll, so no flipping - projparams = { - "a": 6371200.0, - "b": 6371200.0, - "proj": "stere", - "lat_ts": 60.0, - "lat_0": 90.0, - "lon_0": 255.0 - 360.0, - } - stage4_affine = Affine(4762.5, 0.0, -1902531, 0.0, 4762.5, -7617604) with ncopen(f"{ST4PATH}/{dt:%Y}_stage4_hourly.nc", "r") as nc: p01m = nc.variables["p01m"] @@ -216,15 +181,15 @@ def load_stage4(data, dt: date, tile_affine): reproject( totals.filled(np.nan), # rasterio does not like masked arrays data["stage4"], - src_transform=stage4_affine, - src_crs=projparams, + src_transform=STAGE4.affine, + src_crs=STAGE4.crs, dst_transform=tile_affine, dst_crs="EPSG:4326", ) LOG.debug("finished") -def qc_precip(data, dt: date, tile_bounds: BOUNDS): +def qc_precip(data, dt: date, tile_affine: Affine): """Make some adjustments to the `precip` grid Not very sophisticated here, if the hires precip grid is within 33% of @@ -233,7 +198,7 @@ def qc_precip(data, dt: date, tile_bounds: BOUNDS): """ hires_total = np.sum(data["precip"], 2) # Only do this check for tiles west of -101, likely too crude for east - if dt.year >= 2015 and tile_bounds.west < -101: + if dt.year >= 2015 and tile_affine.c < -101: # Do an assessment of the hires_total (A2M MRMS), it may have too many # zeros where stage IV has actual data. a2m_zeros = hires_total < 0.01 @@ -241,7 +206,7 @@ def qc_precip(data, dt: date, tile_bounds: BOUNDS): score = np.sum(np.logical_and(a2m_zeros, s4)) / np.multiply(*s4.shape) if score > 0.005: # 0.5% is arb, but good enough? LOG.warning("MRMS had %.2f%% bad zeros, using legacy", score * 100) - load_precip_legacy(data, dt, tile_bounds) + load_precip_legacy(data, dt, tile_affine) hires_total = np.sum(data["precip"], 2) # prevent zeros @@ -256,10 +221,12 @@ def qc_precip(data, dt: date, tile_bounds: BOUNDS): data["precip"][np.isnan(data["precip"])] = 0.0 -def _reader(utcnow: datetime, tidx, shp, west: float, north: float): +def _reader(utcnow: datetime, tidx, shp, west: float, south: float): """Read the legacy N0R data""" ppath = utcnow.strftime("%Y/%m/%d/GIS/uscomp/n0r_%Y%m%d%H%M.png") x0 = int((west + 126.0) * 100) + ny, nx = shp + north = south + (ny + 1) * 0.01 # meh y0 = int((50.0 - north) * 100) with archive_fetch(ppath) as fn: if fn is None: @@ -268,8 +235,8 @@ def _reader(utcnow: datetime, tidx, shp, west: float, north: float): with gdal.Open(fn, 0) as ds: # Get the size of the raster so that we don't go out of bounds imgshp = ds.RasterYSize, ds.RasterXSize - y1 = min(imgshp[0] - y0, shp[0]) - x1 = min(imgshp[1] - x0, shp[1]) + y1 = min(imgshp[0] - y0, ny) + x1 = min(imgshp[1] - x0, nx) imgdata = ds.ReadAsArray( xoff=x0, yoff=y0, xsize=x1, ysize=y1 ).astype(np.float32) @@ -279,7 +246,7 @@ def _reader(utcnow: datetime, tidx, shp, west: float, north: float): return tidx, np.flipud(dbz) -def load_precip_legacy(data, valid, tile_bounds: BOUNDS): +def load_precip_legacy(data, valid, tile_affine: Affine): """Compute a Legacy Precip product for dates prior to 1 Jan 2014""" LOG.info("called") ts = 12 * 24 # 5 minute @@ -288,7 +255,7 @@ def load_precip_legacy(data, valid, tile_bounds: BOUNDS): now = midnight # To avoid division by zero below when we have the strip of no-data 23-24N - m5 = np.ones((ts, *data["solar"].shape), np.float32) * 0.01 + m5 = np.ones((ts, *data["stage4"].shape), np.float32) * 0.01 def _cb_pl(args): """callback""" @@ -306,7 +273,7 @@ def _cb_pl(args): m5[tidx] = precip tidx = 0 - shp = data["solar"].shape + shp = data["stage4"].shape with ThreadPool(CPUCOUNT) as pool: # Load up the n0r data, every 5 minutes while now < tomorrow: @@ -319,8 +286,8 @@ def _cb_pl(args): now.astimezone(UTC), tidx, shp, - tile_bounds.west, - tile_bounds.north, + tile_affine.c, + tile_affine.f, ), callback=_cb_pl, error_callback=LOG.error, @@ -354,19 +321,28 @@ def _compute(yidx, xidx): # Now apply the weights to the s4total data["precip"][yidx, xidx, :] = weights * s4total - for x in range(data["solar"].shape[1]): - for y in range(data["solar"].shape[0]): + for x in range(data["stage4"].shape[1]): + for y in range(data["stage4"].shape[0]): _compute(y, x) LOG.info("finished precip calculation") def _mrms_reader( - utcnow: datetime, tidx, shp, west: float, north: float, a2m_divisor + utcnow: datetime, + tidx, + shp, + west_edge: float, + south_edge: float, + a2m_divisor, ): """Read the MRMS data.""" ppath = utcnow.strftime("%Y/%m/%d/GIS/mrms/a2m_%Y%m%d%H%M.png") - x0 = int((west + 130.0) * 100) + # MRMS pixels are not centered on 0.01 degree grid, so we are not being + # the most exact here. + x0 = int((west_edge + 130.0) * 100) + ny, nx = shp + north = south_edge + (ny + 1) * 0.01 # meh y0 = int((55.0 - north) * 100) with archive_fetch(ppath) as fn: if fn is None: @@ -375,13 +351,13 @@ def _mrms_reader( # so it is much faster to do a small window/exact extraction with gdal.Open(fn, 0) as ds: imgdata = ds.ReadAsArray( - xoff=x0, yoff=y0, xsize=shp[1], ysize=shp[0] + xoff=x0, yoff=y0, xsize=nx, ysize=ny ).astype(np.float32) imgdata = np.where(imgdata < 255, imgdata / a2m_divisor, 0) return tidx, np.flipud(imgdata) -def load_precip(data, dt: date, tile_bounds: BOUNDS): +def load_precip(data, dt: date, tile_affine: Affine): """Load the 2 minute precipitation data into our ginormus grid""" LOG.info("called") ts = 30 * 24 # 2 minute @@ -404,7 +380,7 @@ def _cb_lp(args): quorum["cnt"] -= 1 tidx = 0 - shp = data["solar"].shape + shp = data["stage4"].shape with ThreadPool(CPUCOUNT) as pool: while now < tomorrow: pool.apply_async( @@ -413,8 +389,8 @@ def _cb_lp(args): now.astimezone(UTC), tidx, shp, - tile_bounds.west, - tile_bounds.north, + tile_affine.c, + tile_affine.f, a2m_divisor, ), callback=_cb_lp, @@ -429,7 +405,7 @@ def _cb_lp(args): "Failed 75%% quorum with MRMS a2m %.1f, loading legacy", quorum["cnt"], ) - load_precip_legacy(data, dt, tile_bounds) + load_precip_legacy(data, dt, tile_affine) LOG.info("finished precip calculation") @@ -491,7 +467,7 @@ def compute_breakpoint(ar, accumThreshold=2.0, intensityThreshold=1.0): return bp -def write_grid(grid, valid, xtile, ytile, fnadd=""): +def write_grid(grid: np.ndarray, valid, tile_affine: Affine, fnadd=""): """Save off the daily precip totals for usage later in computing huc_12""" if fnadd != "" and not sys.stdout.isatty(): LOG.debug("not writting extra grid to disk") @@ -499,10 +475,28 @@ def write_grid(grid, valid, xtile, ytile, fnadd=""): basedir = f"/mnt/idep2/data/dailyprecip/{valid.year}" if not os.path.isdir(basedir): os.makedirs(basedir) - np.save(f"{basedir}/{valid:%Y%m%d}{fnadd}.tile_{xtile}_{ytile}", grid) - - -def precip_workflow(data, valid, tile_affine: Affine, tile_bounds: BOUNDS): + lat_prefix = "N" if tile_affine.f > 0 else "S" + lon_prefix = "E" if tile_affine.c > 0 else "W" + fn = ( + f"{basedir}/{valid:%Y%m%d}{fnadd}." + f"tile_{lon_prefix}{abs(tile_affine.c):.0f}_" + f"{lat_prefix}{abs(tile_affine.f):.0f}.geotiff" + ) + with rasterio.open( + fn, + "w", + driver="GTiff", + height=grid.shape[0], + width=grid.shape[1], + count=1, + dtype=grid.dtype, + crs="EPSG:4326", + transform=tile_affine, + ) as dst: + dst.write(grid, 1) + + +def precip_workflow(data, valid, tile_affine: Affine): """Drive the precipitation workflow""" load_stage4(data, valid, tile_affine) # We have MRMS a2m RASTER files prior to 1 Jan 2015, but these files used @@ -510,27 +504,29 @@ def precip_workflow(data, valid, tile_affine: Affine, tile_bounds: BOUNDS): # to capture low intensity events. Files after 1 Jan 2015 used a better # 0.02mm resolution if valid.year < 2015: - load_precip_legacy(data, valid, tile_bounds) + load_precip_legacy(data, valid, tile_affine) else: - load_precip(data, valid, tile_bounds) - qc_precip(data, valid, tile_bounds) + load_precip(data, valid, tile_affine) + qc_precip(data, valid, tile_affine) -def edit_clifile(xidx, yidx, clifn, data, valid) -> bool: +def edit_clifile( + clirow: dict, xidx: int, yidx: int, data: np.ndarray, valid: date +) -> bool: """Edit the climate file, run from thread.""" # Okay we have work to do - with open(clifn, "r", encoding="utf8") as fh: + with open(clirow["filepath"], "r", encoding="utf8") as fh: clidata = fh.read() pos = clidata.find(valid.strftime("%-d\t%-m\t%Y")) if pos == -1: - LOG.warning("Date find failure for %s", clifn) + LOG.warning("Date find failure for %s", clirow["filepath"]) return False pos2 = clidata[pos:].find( (valid + timedelta(days=1)).strftime("%-d\t%-m\t%Y") ) if pos2 == -1: - LOG.warning("Date2 find failure for %s", clifn) + LOG.warning("Date2 find failure for %s", clirow["filepath"]) return False intensity_threshold = 1.0 @@ -548,11 +544,11 @@ def edit_clifile(xidx, yidx, clifn, data, valid) -> bool: intensityThreshold=intensity_threshold, ) - high = data["high"][yidx, xidx] - low = data["low"][yidx, xidx] - solar = data["solar"][yidx, xidx] - wind = data["wind"][yidx, xidx] - dwpt = data["dwpt"][yidx, xidx] + high = clirow["high"] + low = clirow["low"] + solar = clirow["solar"] + wind = clirow["wind"] + dwpt = clirow["dwpt"] if np.isnan([high, low, solar, wind, dwpt]).any(): msg = [] for v, n in zip( @@ -561,7 +557,9 @@ def edit_clifile(xidx, yidx, clifn, data, valid) -> bool: ): if np.isnan(v): msg.append(f"{n}={v}") - LOG.warning("Missing data[%s] for %s", ",".join(msg), clifn) + LOG.warning( + "Missing data[%s] for %s", ",".join(msg), clirow["filepath"] + ) return False thisday = daily_formatter( valid, @@ -573,67 +571,55 @@ def edit_clifile(xidx, yidx, clifn, data, valid) -> bool: dwpt, ) - with open(clifn, "w", encoding="utf8") as fh: + with open(clirow["filepath"], "w", encoding="utf8") as fh: fh.write(clidata[:pos] + thisday + clidata[(pos + pos2) :]) return True @click.command() @click.option("--scenario", "-s", type=int, default=0, help="Scenario") -@click.option("--xtile", type=int, required=True, help="X Tile") -@click.option("--ytile", type=int, required=True, help="Y Tile") +@click.option("--west", type=int, required=True, help="Left Longitude") +@click.option("--east", type=int, required=True, help="Right Longitude") +@click.option("--south", type=int, required=True, help="Bottom Latitude") +@click.option("--north", type=int, required=True, help="Top Latitude") @click.option( "--date", "dt", type=click.DateTime(), required=True, help="Date" ) -def main(scenario, xtile, ytile, dt: datetime): +def main( + scenario: int, west: int, east: int, south: int, north: int, dt: datetime +): """The workflow to get the weather data variables we want!""" valid = dt.date() - tile_bounds = compute_tile_bounds(xtile, ytile) - tile_affine = Affine( - 0.01, 0.0, tile_bounds.west, 0.0, 0.01, tile_bounds.south - ) - LOG.info("bounds %s", tile_bounds) - shp = ( - int((tile_bounds.north - tile_bounds.south) * 100), - int((tile_bounds.east - tile_bounds.west) * 100), - ) - data = {} - for vname in "high low dwpt wind solar stage4".split(): - data[vname] = np.zeros(shp, np.float32) - - # Optimize for continuous memory - data["precip"] = np.zeros((*shp, 30 * 24), np.float32) - - # We could be outside contiguous US, if so, just write out zeros - if not check_has_clifiles(tile_bounds): - LOG.info("Exiting as tile %s %s is outside CONUS", xtile, ytile) - write_grid(np.sum(data["precip"], 2), valid, xtile, ytile) + clidf = load_clifiles(scenario, west, east, south, north) + if clidf.empty: + LOG.info("No climate files found for scenario %s", scenario) return + # This is the outer edge and our grid is the points + tile_affine = Affine(0.01, 0.0, west - 0.005, 0.0, 0.01, south - 0.005) + + # Fill out clidf with IEMRE values # 1. Max Temp C # 2. Min Temp C # 3. Radiation l/d # 4. wind mps # 6. Mean dewpoint C - load_iemre(data, valid, tile_affine) + load_iemre(clidf, valid) + + shp = (int((north - south) * 100), int((east - west) * 100)) + data = { + "stage4": np.zeros(shp, np.float32), + "precip": np.zeros((*shp, 30 * 24), np.float32), + } + # 5. wind direction (always zero) # 7. breakpoint precip mm - precip_workflow(data, valid, tile_affine, tile_bounds) - write_grid(data["stage4"], valid, xtile, ytile, "stage4") - write_grid(np.sum(data["precip"], 2), valid, xtile, ytile) - - queue = [] - for yidx in range(shp[0]): - for xidx in range(shp[1]): - lon = tile_bounds.west + xidx * 0.01 - lat = tile_bounds.south + yidx * 0.01 - clifn = get_cli_fname(lon, lat, scenario) - if not os.path.isfile(clifn): - continue - queue.append([xidx, yidx, clifn]) + precip_workflow(data, valid, tile_affine) + write_grid(data["stage4"], valid, tile_affine, fnadd="stage4") + write_grid(np.sum(data["precip"], 2), valid, tile_affine) - progress = tqdm(total=len(queue), disable=not sys.stdout.isatty()) + progress = tqdm(total=len(clidf), disable=not sys.stdout.isatty()) errors = {"cnt": 0} with ThreadPool(CPUCOUNT) as pool: @@ -644,12 +630,15 @@ def _callback(res): errors["cnt"] += 1 progress.update(1) - def _proxy(xidx, yidx, clifn, data, valid): + def _proxy(clirow: dict, data, valid): """Proxy.""" if errors["cnt"] > 10: return False + clifn = clirow["filepath"] + xidx = int((clirow["lon"] - west) * 100) + yidx = int((clirow["lat"] - north) * 100) try: - return edit_clifile(xidx, yidx, clifn, data, valid) + return edit_clifile(clirow, xidx, yidx, data, valid) except Exception as _exp: with StringIO() as sio: traceback.print_exc(file=sio) @@ -662,10 +651,13 @@ def _proxy(xidx, yidx, clifn, data, valid): ) return False - for _, (xidx, yidx, clifn) in enumerate(queue): + for _, row in clidf.iterrows(): + if not os.path.isfile(row["filepath"]): + continue + pool.apply_async( _proxy, - (xidx, yidx, clifn, data, valid), + (row.to_dict(), data, valid), callback=_callback, error_callback=LOG.error, ) @@ -676,5 +668,4 @@ def _proxy(xidx, yidx, clifn, data, valid): if __name__ == "__main__": - # Go Main Go main() diff --git a/scripts/cligen/proctor_tile_edit.py b/scripts/cligen/proctor_tile_edit.py index c7f26e10..e1ddbbb6 100644 --- a/scripts/cligen/proctor_tile_edit.py +++ b/scripts/cligen/proctor_tile_edit.py @@ -1,6 +1,6 @@ """Proctor the editing of DEP CLI files.""" -import gzip +import glob import os import stat import subprocess @@ -8,11 +8,14 @@ import time from concurrent.futures import ProcessPoolExecutor from datetime import date, datetime +from math import ceil from multiprocessing import cpu_count import click import numpy as np -from pyiem import iemre +import rasterio +from affine import Affine +from pyiem.grid.nav import IEMRE from pyiem.util import logger LOG = logger() @@ -21,28 +24,61 @@ def get_fn(dt: date): """Return the filename for this date.""" - return f"{DATADIR}/{dt.year}/{dt:%Y%m%d}.npy.gz" + return f"{DATADIR}/{dt.year}/{dt:%Y%m%d}.geotiff" -def assemble_grids(tilesz, dt: date): - """Build back the grid from the tiles.""" - dom = iemre.DOMAINS[""] - YS = int((dom["north"] - dom["south"]) * 100.0) - XS = int((dom["east"] - dom["west"]) * 100.0) +def assemble_geotiffs(dt: date): + """Build back the grid from the tiles. + + Important: The tiles are S-N, but we flip and store as N-S (image) + """ + north = ceil(IEMRE.top_edge) + YS = int((north - IEMRE.bottom) * 100.0) + XS = int((ceil(IEMRE.right) - IEMRE.left) * 100.0) res = np.zeros((YS, XS)) - basedir = f"{DATADIR}/{dt.year}" - for i, _lo in enumerate(np.arange(dom["west"], dom["east"], tilesz)): - for j, _la in enumerate(np.arange(dom["south"], dom["north"], tilesz)): - fn = f"{basedir}/{dt:%Y%m%d}.tile_{i}_{j}.npy" - if not os.path.isfile(fn): - continue - yslice = slice(j * 100 * tilesz, (j + 1) * 100 * tilesz) - xslice = slice(i * 100 * tilesz, (i + 1) * 100 * tilesz) - res[yslice, xslice] = np.load(fn) - os.unlink(fn) + for gfn in glob.glob(f"{DATADIR}/{dt:%Y}/{dt:%Y%m%d}.tile*.geotiff"): + with rasterio.open(gfn) as src: + data = src.read(1) + meta = src.meta + # The raster is stored south to north and has the small offset + height = meta["height"] + width = meta["width"] + xs = int((meta["transform"][2] + 0.005 - IEMRE.left) * 100.0) + ys = int((meta["transform"][5] + 0.005 - IEMRE.bottom) * 100.0) + LOG.info( + "%s %s %s %s %s %s %s", + gfn, + xs, + ys, + width, + height, + data.shape, + res.shape, + ) + res[ys : ys + height, xs : xs + width] = data + os.unlink(gfn) - with gzip.GzipFile(get_fn(dt), "w") as fh: - np.save(file=fh, arr=res) + # Write out a GeoTIFF, we will store as a UInt16 with a scale factor + # of 100, so we can store 0 to 655.35 mm of precipitation + res = (res * 100.0).astype(np.uint16) + with rasterio.open( + get_fn(dt), + "w", + driver="GTiff", + compress="lzw", + height=res.shape[0], + width=res.shape[1], + count=1, + dtype=res.dtype, + crs="EPSG:4326", + # Oh boy. We never actually get to the north edge + transform=Affine( + 0.01, 0.0, IEMRE.left - 0.005, 0.0, -0.01, north - 0.005 + ), + ) as dst: + dst._set_all_scales([0.01]) + dst._set_all_offsets([0.0]) + dst.write(np.flipud(res), 1) def myjob(cmd: list): @@ -85,14 +121,19 @@ def main(scenario, dt: datetime, domain: str): filets = os.stat(fn)[stat.ST_MTIME] LOG.warning("%s was last processed on %s", dt, time.ctime(filets)) jobs = [] - dom = iemre.DOMAINS[domain] - for i, _lo in enumerate(np.arange(dom["west"], dom["east"], tilesz)): - for j, _la in enumerate(np.arange(dom["south"], dom["north"], tilesz)): + # This is more convoluted than it should be, but our IEMRE grid is not + # exactly even. This also needs to assume that the IEMRE corner is an int + east = ceil(IEMRE.right) + north = ceil(IEMRE.top) + for lon in np.arange(IEMRE.left, east, tilesz): + for lat in np.arange(IEMRE.bottom, north, tilesz): cmd = [ "python", "daily_clifile_editor.py", - f"--xtile={i}", - f"--ytile={j}", + f"--west={lon:.0f}", + f"--east={min(east, lon + tilesz):.0f}", + f"--south={lat:.0f}", + f"--north={min(north, lat + tilesz):.0f}", f"--scenario={scenario}", f"--date={dt:%Y-%m-%d}", ] @@ -110,7 +151,7 @@ def main(scenario, dt: datetime, domain: str): LOG.warning("Aborting due to job failures") sys.exit(3) - assemble_grids(tilesz, dt) + assemble_geotiffs(dt) if __name__ == "__main__": From 6fc98b55e0b978b9b622802a1687d97cd45a06e7 Mon Sep 17 00:00:00 2001 From: akrherz Date: Mon, 30 Dec 2024 10:03:13 -0600 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20Address=20lint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/cligen/daily_clifile_editor.py | 2 +- scripts/cligen/proctor_tile_edit.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/scripts/cligen/daily_clifile_editor.py b/scripts/cligen/daily_clifile_editor.py index 77111c2d..dfb992ee 100644 --- a/scripts/cligen/daily_clifile_editor.py +++ b/scripts/cligen/daily_clifile_editor.py @@ -515,7 +515,7 @@ def edit_clifile( ) -> bool: """Edit the climate file, run from thread.""" # Okay we have work to do - with open(clirow["filepath"], "r", encoding="utf8") as fh: + with open(clirow["filepath"], "r", encoding="utf8") as fh: # skipcq clidata = fh.read() pos = clidata.find(valid.strftime("%-d\t%-m\t%Y")) if pos == -1: diff --git a/scripts/cligen/proctor_tile_edit.py b/scripts/cligen/proctor_tile_edit.py index e1ddbbb6..345c7a4d 100644 --- a/scripts/cligen/proctor_tile_edit.py +++ b/scripts/cligen/proctor_tile_edit.py @@ -76,8 +76,8 @@ def assemble_geotiffs(dt: date): 0.01, 0.0, IEMRE.left - 0.005, 0.0, -0.01, north - 0.005 ), ) as dst: - dst._set_all_scales([0.01]) - dst._set_all_offsets([0.0]) + dst._set_all_scales([0.01]) # skipcq + dst._set_all_offsets([0.0]) # skipcq dst.write(np.flipud(res), 1) @@ -119,7 +119,9 @@ def main(scenario, dt: datetime, domain: str): fn = get_fn(dt) if os.path.isfile(fn): filets = os.stat(fn)[stat.ST_MTIME] - LOG.warning("%s was last processed on %s", dt, time.ctime(filets)) + LOG.warning( + "%s[%s] was last processed on %s", dt, domain, time.ctime(filets) + ) jobs = [] # This is more convoluted than it should be, but our IEMRE grid is not # exactly even. This also needs to assume that the IEMRE corner is an int