Skip to content

Commit

Permalink
Merge pull request dailyerosion#265 from akrherz/240819
Browse files Browse the repository at this point in the history
Omnibus
  • Loading branch information
akrherz authored Aug 23, 2024
2 parents 5652336 + 4712ff3 commit 4d645b7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.2"
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
116 changes: 69 additions & 47 deletions scripts/cligen/daily_clifile_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tqdm import tqdm

from pydep.io.cli import (
BOUNDS,
check_has_clifiles,
compute_tile_bounds,
daily_formatter,
Expand Down Expand Up @@ -223,7 +224,7 @@ def load_stage4(data, dt: date, tile_affine):
LOG.debug("finished")


def qc_precip(data, dt: date, tile_affine: Affine):
def qc_precip(data, dt: date, tile_bounds: BOUNDS):
"""Make some adjustments to the `precip` grid
Not very sophisticated here, if the hires precip grid is within 33% of
Expand All @@ -232,15 +233,15 @@ def qc_precip(data, dt: date, tile_affine: Affine):
"""
hires_total = np.sum(data["precip"], 2)
# Only do this check for tiles west of -101, likely too crude for east
if dt.year >= 2015 and tile_affine.c < -101:
if dt.year >= 2015 and tile_bounds.west < -101:
# Do an assessment of the hires_total (A2M MRMS), it may have too many
# zeros where stage IV has actual data.
a2m_zeros = hires_total < 0.01
s4 = data["stage4"] > 5 # arb
score = np.sum(np.logical_and(a2m_zeros, s4)) / np.multiply(*s4.shape)
if score > 0.005: # 0.5% is arb, but good enough?
LOG.warning("MRMS had %.2f%% bad zeros, using legacy", score * 100)
load_precip_legacy(data, dt, tile_affine)
load_precip_legacy(data, dt, tile_bounds)
hires_total = np.sum(data["precip"], 2)

# prevent zeros
Expand All @@ -255,32 +256,32 @@ def qc_precip(data, dt: date, tile_affine: Affine):
data["precip"][np.isnan(data["precip"])] = 0.0


def _reader(utcnow: datetime, tidx, shp, tile_affine):
def _reader(utcnow: datetime, tidx, shp, west: float, north: float):
"""Read the legacy N0R data"""
ppath = utcnow.strftime("%Y/%m/%d/GIS/uscomp/n0r_%Y%m%d%H%M.png")
x0 = int((west + 126.0) * 100)
y0 = int((50.0 - north) * 100)
with archive_fetch(ppath) as fn:
if fn is None:
return None, None
# upper right corner is 50N, 126W
imgdata = gdal.Open(fn, 0).ReadAsArray()
with gdal.Open(fn, 0) as ds:
# Get the size of the raster so that we don't go out of bounds
imgshp = ds.RasterYSize, ds.RasterXSize
y1 = min(imgshp[0] - y0, shp[0])
x1 = min(imgshp[1] - x0, shp[1])
imgdata = ds.ReadAsArray(
xoff=x0, yoff=y0, xsize=x1, ysize=y1
).astype(np.float32)
# Convert the image data to dbz
dbz = (imgdata - 7.0) * 5.0
dbz = np.where(dbz < 255, ((10.0 ** (dbz / 10.0)) / 200.0) ** 0.625, 0)
grid = np.zeros(shp, np.float32)
reproject(
dbz,
grid,
src_transform=Affine(0.01, 0.0, -126.0, 0.0, -0.01, 50.0),
src_crs="EPSG:4326",
dst_transform=tile_affine,
dst_crs="EPSG:4326",
)
return tidx, grid
return tidx, np.flipud(dbz)


def load_precip_legacy(data, valid, tile_affine):
def load_precip_legacy(data, valid, tile_bounds: BOUNDS):
"""Compute a Legacy Precip product for dates prior to 1 Jan 2014"""
LOG.debug("called")
LOG.info("called")
ts = 12 * 24 # 5 minute

midnight, tomorrow = get_sts_ets_at_localhour(valid, 0)
Expand All @@ -289,24 +290,37 @@ def load_precip_legacy(data, valid, tile_affine):
# To avoid division by zero below when we have the strip of no-data 23-24N
m5 = np.ones((ts, *data["solar"].shape), np.float32) * 0.01

def _cb(args):
def _cb_pl(args):
"""callback"""
tidx, precip = args
if precip is not None:
m5[tidx] = precip
# sigh
if precip.shape[0] == 400:
m5[tidx, 100:, :] = precip
elif precip.shape[0] == 200:
m5[tidx, :200, :] = precip
else:
m5[tidx] = precip

tidx = 0
shp = data["solar"].shape
with ThreadPool(4) as pool:
with ThreadPool(CPUCOUNT) as pool:
# Load up the n0r data, every 5 minutes
while now < tomorrow:
if tidx >= ts:
# Abort as we are in CST->CDT
break
pool.apply_async(
_reader,
(now.astimezone(UTC), tidx, shp, tile_affine),
callback=_cb,
(
now.astimezone(UTC),
tidx,
shp,
tile_bounds.west,
tile_bounds.north,
),
callback=_cb_pl,
error_callback=LOG.error,
)
now += timedelta(minutes=5)
tidx += 1
Expand Down Expand Up @@ -341,32 +355,32 @@ def _compute(yidx, xidx):
for y in range(data["solar"].shape[0]):
_compute(y, x)

LOG.debug("finished precip calculation")
LOG.info("finished precip calculation")


def _mrms_reader(utcnow: datetime, tidx, shp, tile_affine, a2m_divisor):
def _mrms_reader(
utcnow: datetime, tidx, shp, west: float, north: float, a2m_divisor
):
"""Read the MRMS data."""
ppath = utcnow.strftime("%Y/%m/%d/GIS/mrms/a2m_%Y%m%d%H%M.png")
x0 = int((west + 130.0) * 100)
y0 = int((55.0 - north) * 100)
with archive_fetch(ppath) as fn:
if fn is None:
return None, None
imgdata = gdal.Open(fn, 0).ReadAsArray()
# Here lie dragons, going from uint8 to float32 uses special CPU
# so it is much faster to do a small window/exact extraction
with gdal.Open(fn, 0) as ds:
imgdata = ds.ReadAsArray(
xoff=x0, yoff=y0, xsize=shp[1], ysize=shp[0]
).astype(np.float32)
imgdata = np.where(imgdata < 255, imgdata / a2m_divisor, 0)
grid = np.zeros(shp, np.float32)
reproject(
imgdata,
grid,
src_transform=Affine(0.01, 0.0, -130.0, 0.0, -0.01, 55.0),
src_crs="EPSG:4326",
dst_transform=tile_affine,
dst_crs="EPSG:4326",
)
return tidx, grid
return tidx, np.flipud(imgdata)


def load_precip(data, dt: date, tile_affine: Affine):
def load_precip(data, dt: date, tile_bounds: BOUNDS):
"""Load the 2 minute precipitation data into our ginormus grid"""
LOG.debug("called")
LOG.info("called")
ts = 30 * 24 # 2 minute

midnight, tomorrow = get_sts_ets_at_localhour(dt, 0)
Expand All @@ -378,7 +392,7 @@ def load_precip(data, dt: date, tile_affine: Affine):
# Require at least 75% data coverage, if not, we will abort back to legacy
data["quorum"] = ts * 0.75

def _cb(args):
def _cb_lp(args):
"""callback"""
tidx, grid = args
if grid is not None:
Expand All @@ -387,12 +401,19 @@ def _cb(args):

tidx = 0
shp = data["solar"].shape
with ThreadPool(4) as pool:
with ThreadPool(CPUCOUNT) as pool:
while now < tomorrow:
pool.apply_async(
_mrms_reader,
(now.astimezone(UTC), tidx, shp, tile_affine, a2m_divisor),
callback=_cb,
(
now.astimezone(UTC),
tidx,
shp,
tile_bounds.west,
tile_bounds.north,
a2m_divisor,
),
callback=_cb_lp,
error_callback=LOG.error,
)
now += timedelta(minutes=2)
Expand All @@ -404,7 +425,8 @@ def _cb(args):
"Failed 75%% quorum with MRMS a2m %.1f, loading legacy",
data["quorum"],
)
load_precip_legacy(data, dt, tile_affine)
load_precip_legacy(data, dt, tile_bounds)
LOG.info("finished precip calculation")


def bpstr(ts, accum):
Expand Down Expand Up @@ -476,18 +498,18 @@ def write_grid(grid, valid, xtile, ytile, fnadd=""):
np.save(f"{basedir}/{valid:%Y%m%d}{fnadd}.tile_{xtile}_{ytile}", grid)


def precip_workflow(data, valid, tile_affine: Affine):
def precip_workflow(data, valid, tile_affine: Affine, tile_bounds: BOUNDS):
"""Drive the precipitation workflow"""
load_stage4(data, valid, tile_affine)
# We have MRMS a2m RASTER files prior to 1 Jan 2015, but these files used
# a very poor choice of data interval of 0.1mm, which is not large enough
# to capture low intensity events. Files after 1 Jan 2015 used a better
# 0.02mm resolution
if valid.year < 2015:
load_precip_legacy(data, valid, tile_affine)
load_precip_legacy(data, valid, tile_bounds)
else:
load_precip(data, valid, tile_affine)
qc_precip(data, valid, tile_affine)
load_precip(data, valid, tile_bounds)
qc_precip(data, valid, tile_bounds)


def edit_clifile(xidx, yidx, clifn, data, valid) -> bool:
Expand Down Expand Up @@ -593,7 +615,7 @@ def main(scenario, xtile, ytile, dt: datetime):
load_iemre(data, valid, tile_affine)
# 5. wind direction (always zero)
# 7. breakpoint precip mm
precip_workflow(data, valid, tile_affine)
precip_workflow(data, valid, tile_affine, tile_bounds)
write_grid(data["stage4"], valid, xtile, ytile, "stage4")
write_grid(np.sum(data["precip"], 2), valid, xtile, ytile)

Expand Down

0 comments on commit 4d645b7

Please sign in to comment.