Skip to content

Commit

Permalink
feat: move ot2archive into a more realtime mode
Browse files Browse the repository at this point in the history
refs #603
  • Loading branch information
akrherz committed Dec 15, 2023
1 parent 658bdb3 commit 44f99f7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 35 deletions.
4 changes: 0 additions & 4 deletions scripts/RUN_12Z.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ cd ../../dbutil
python rwis2archive.py $(date -u --date '1 days ago' +'%Y %m %d')
python rwis2archive.py $(date -u +'%Y %m %d')

cd ../other
python ot2archive.py $(date -u --date '1 days ago' +'%Y %m %d')
python ot2archive.py $(date -u +'%Y %m %d')

cd ../util
csh BACKUP.csh

Expand Down
3 changes: 3 additions & 0 deletions scripts/RUN_20MIN.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
cd ingestors/madis
python to_iemaccess.py
python extract_metar.py

cd ../../other
python ot2archive.py
2 changes: 0 additions & 2 deletions scripts/RUN_MIDNIGHT.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ cd asos
python compute_daily.py

cd ../other
# Important that this runs first, so that srad is there.
python ot2archive.py $(date --date '1 day ago' +'%Y %m %d')
python update_daily_srad.py $(date --date '1 day ago' +'%Y %m %d')

# Need this done so that IEMRE daily grids are there for DEP
Expand Down
89 changes: 60 additions & 29 deletions scripts/other/ot2archive.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,103 @@
"""
Dump iem database of OT data to archive
Runs at midnight and 12 UTC for previous day
Migrate IEM Access data to the archive database.
Called from RUN_20MIN.sh
"""
import datetime
import sys

# third party
from pyiem.util import get_dbconnc, logger, utc
from pyiem.reference import ISO8601
from pyiem.util import get_dbconnc, get_properties, logger, set_property, utc

LOG = logger()
PROPERTY_NAME = "ot2archive_last"


def dowork(ts, ts2):
def dowork(sts, ets):
"""Process between these two timestamps please"""
# Delete any obs from yesterday
OTHER, ocursor = get_dbconnc("other")
IEM, icursor = get_dbconnc("iem")
ocursor.execute(
"DELETE from alldata WHERE valid >= %s and valid < %s", (ts, ts2)
)

# Get obs from Access
icursor.execute(
"SELECT c.*, t.id from current_log c JOIN stations t on "
"(t.iemid = c.iemid) WHERE valid >= %s and valid < %s "
"and t.network in ('OT', 'WMO_BUFR_SRF')",
(ts, ts2),
"""
SELECT c.*, t.id from current_log c JOIN stations t on
(t.iemid = c.iemid) WHERE t.network in ('OT', 'WMO_BUFR_SRF') and
updated >= %s and updated < %s ORDER by updated ASC
""",
(sts, ets),
)
if icursor.rowcount == 0:
LOG.warning("found no results for ts: %s ts2: %s", ts, ts2)
LOG.warning("found no results for ts: %s ts2: %s", sts, ets)

deletes = 0
inserts = 0
for row in icursor:
pday = 0
if row["pday"] is not None and float(row["pday"]) > 0:
pday = row["pday"]
alti = row["alti"]
if alti is None and row["mslp"] is not None:
alti = row["mslp"] * 0.03
# delete any previous obs
ocursor.execute(
"DELETE from alldata where station = %s and valid = %s",
(row["id"], row["valid"]),
)
deletes += ocursor.rowcount
args = (
row["id"],
row["valid"],
row["tmpf"],
row["dwpf"],
row["drct"],
row["sknt"],
alti,
pday,
row["alti"],
row["pday"],
row["gust"],
row["srad"],
row["relh"],
row["skyl1"],
row["skyl2"],
row["skyl3"],
row["skyl4"],
row["skyc1"],
row["skyc2"],
row["skyc3"],
row["skyc4"],
row["srad_1h_j"],
)
ocursor.execute(
"INSERT into alldata(station, valid, tmpf, dwpf, drct, sknt, "
"alti, pday, gust, srad, relh) values "
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
"""
INSERT into alldata(station, valid, tmpf, dwpf, drct, sknt,
alti, pday, gust, srad, relh, skyl1, skyl2, skyl3, skyl4,
skyc1, skyc2, skyc3, skyc4, srad_1h_j) values
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
args,
)
inserts += 1

LOG.info("%s->%s deletes: %s inserts: %s", sts, ets, deletes, inserts)
ocursor.close()
OTHER.commit()


def main(argv):
def get_first_updated():
"""Figure out which is the last updated timestamp we ran for."""
props = get_properties()
propvalue = props.get(PROPERTY_NAME)
if propvalue is None:
LOG.warning("iem property %s is not set, abort!", PROPERTY_NAME)
sys.exit()

dt = datetime.datetime.strptime(propvalue, ISO8601)
return dt.replace(tzinfo=datetime.timezone.utc)


def main():
"""Run for a given 6z to 6z period."""
ts = utc(int(argv[1]), int(argv[2]), int(argv[3]), 6)
ts2 = ts + datetime.timedelta(hours=24)
dowork(ts, ts2)
last_updated = utc()
first_updated = get_first_updated()

dowork(first_updated, last_updated)
set_property(PROPERTY_NAME, last_updated)


if __name__ == "__main__":
main(sys.argv)
main()

0 comments on commit 44f99f7

Please sign in to comment.