diff --git a/env/HERA.env b/env/HERA.env index 8ae3a147bc..ec49724842 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -115,14 +115,16 @@ elif [[ "${step}" = "snowanl" ]]; then export APRUN_CALCFIMS="${launcher} -n 1" export NTHREADS_SNOWANL=${NTHREADSmax} - export APRUN_SNOWANL="${APRUN_default} --cpus-per-task=${NTHREADS_SNOWANL}" + export APRUN_SNOWANL="${APRUN_default} --mem=0 --cpus-per-task=${NTHREADS_SNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" -elif [[ "${step}" = "esnowrecen" ]]; then +elif [[ "${step}" = "esnowanl" ]]; then - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWRECEN}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default} --mem=0 --cpus-per-task=${NTHREADS_ESNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" diff --git a/env/HERCULES.env b/env/HERCULES.env index 3a59b1992d..acfbe438ef 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -118,10 +118,12 @@ case ${step} in export APRUN_APPLY_INCR="${launcher} -n 6" ;; - "esnowrecen") + "esnowanl") - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWRECEN}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" ;; diff --git a/env/JET.env b/env/JET.env index 6465b69acd..7bfd912062 100755 --- a/env/JET.env +++ b/env/JET.env @@ -102,10 +102,12 @@ elif [[ "${step}" = "snowanl" ]]; then export APRUN_APPLY_INCR="${launcher} -n 6" -elif [[ "${step}" = "esnowrecen" ]]; then +elif [[ "${step}" = "esnowanl" ]]; then - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWRECEN}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" diff --git a/env/ORION.env b/env/ORION.env index 1dc49e9362..fbe00c153c 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -109,10 +109,12 @@ elif [[ "${step}" = "snowanl" ]]; then export APRUN_APPLY_INCR="${launcher} -n 6" -elif [[ "${step}" = "esnowrecen" ]]; then +elif [[ "${step}" = "esnowanl" ]]; then - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWRECEN}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" diff --git a/env/S4.env b/env/S4.env index 9a5baf29ed..39d24e19ec 100755 --- a/env/S4.env +++ b/env/S4.env @@ -102,10 +102,12 @@ elif [[ "${step}" = "snowanl" ]]; then export APRUN_APPLY_INCR="${launcher} -n 6" -elif [[ "${step}" = "esnowrecen" ]]; then +elif [[ "${step}" = "esnowanl" ]]; then - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWRECEN}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default} --cpus-per-task=${NTHREADS_ESNOWANL}" export APRUN_APPLY_INCR="${launcher} -n 6" diff --git a/env/WCOSS2.env b/env/WCOSS2.env index 4e8d1ddfea..e787202d66 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -95,10 +95,12 @@ elif [[ "${step}" = "snowanl" ]]; then export APRUN_APPLY_INCR="${launcher} -n 6" -elif [[ "${step}" = "esnowrecen" ]]; then +elif [[ "${step}" = "esnowanl" ]]; then - export NTHREADS_ESNOWRECEN=${NTHREADSmax} - export APRUN_ESNOWRECEN="${APRUN_default}" + export APRUN_CALCFIMS="${launcher} -n 1" + + export NTHREADS_ESNOWANL=${NTHREADSmax} + export APRUN_ESNOWANL="${APRUN_default}" export APRUN_APPLY_INCR="${launcher} -n 6" diff --git a/jobs/JGDAS_ENKF_ARCHIVE b/jobs/JGDAS_ENKF_ARCHIVE index 29ef9c1812..021c454afc 100755 --- a/jobs/JGDAS_ENKF_ARCHIVE +++ b/jobs/JGDAS_ENKF_ARCHIVE @@ -10,7 +10,8 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "earc" -c "base earc" YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COM_TOP MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ COMIN_ATMOS_ANALYSIS_ENSSTAT:COM_ATMOS_ANALYSIS_TMPL \ - COMIN_ATMOS_HISTORY_ENSSTAT:COM_ATMOS_HISTORY_TMPL + COMIN_ATMOS_HISTORY_ENSSTAT:COM_ATMOS_HISTORY_TMPL \ + COMIN_SNOW_ANALYSIS_ENSSTAT:COM_SNOW_ANALYSIS_TMPL ############################################################### # Run archive script diff --git a/jobs/JGDAS_ENKF_SNOW_RECENTER b/jobs/JGLOBAL_SNOWENS_ANALYSIS similarity index 83% rename from jobs/JGDAS_ENKF_SNOW_RECENTER rename to jobs/JGLOBAL_SNOWENS_ANALYSIS index 05d46cffc2..ca23347bca 100755 --- a/jobs/JGDAS_ENKF_SNOW_RECENTER +++ b/jobs/JGLOBAL_SNOWENS_ANALYSIS @@ -1,7 +1,7 @@ #! /usr/bin/env bash source "${HOMEgfs}/ush/preamble.sh" -source "${HOMEgfs}/ush/jjob_header.sh" -e "esnowrecen" -c "base esnowrecen" +source "${HOMEgfs}/ush/jjob_header.sh" -e "esnowanl" -c "base esnowanl" ############################################## # Set variables used in the script @@ -10,19 +10,18 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "esnowrecen" -c "base esnowrecen" # shellcheck disable=SC2153 GDUMP="gdas" export GDUMP +CDUMP=${RUN/enkf} +export CDUMP ############################################## # Begin JOB SPECIFIC work ############################################## # Generate COM variables from templates +RUN=${CDUMP} YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_OBS:COM_OBS_TMPL YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ - COMIN_OBS:COM_OBS_TMPL \ COMOUT_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ COMOUT_CONF:COM_CONF_TMPL -MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl \ - COMOUT_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL - -mkdir -p "${COMOUT_SNOW_ANALYSIS}" "${COMOUT_CONF}" for imem in $(seq 1 "${NMEM_ENS}"); do memchar="mem$(printf %03i "${imem}")" @@ -31,10 +30,15 @@ for imem in $(seq 1 "${NMEM_ENS}"); do mkdir -p "${COMOUT_SNOW_ANALYSIS}" done +MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl -x\ + COMOUT_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL + +mkdir -p "${COMOUT_SNOW_ANALYSIS}" "${COMOUT_CONF}" + ############################################################### # Run relevant script -EXSCRIPT=${SNOWANLPY:-${SCRgfs}/exgdas_enkf_snow_recenter.py} +EXSCRIPT=${SNOWANLPY:-${SCRgfs}/exglobal_snowens_analysis.py} ${EXSCRIPT} status=$? (( status != 0 )) && exit "${status}" diff --git a/jobs/JGLOBAL_SNOW_ANALYSIS b/jobs/JGLOBAL_SNOW_ANALYSIS index e0f24fa624..1642042b89 100755 --- a/jobs/JGLOBAL_SNOW_ANALYSIS +++ b/jobs/JGLOBAL_SNOW_ANALYSIS @@ -1,7 +1,6 @@ #! /usr/bin/env bash source "${HOMEgfs}/ush/preamble.sh" -export DATA=${DATA:-${DATAROOT}/${RUN}snowanl_${cyc}} source "${HOMEgfs}/ush/jjob_header.sh" -e "snowanl" -c "base snowanl" ############################################## @@ -18,12 +17,15 @@ GDUMP="gdas" # Begin JOB SPECIFIC work ############################################## # Generate COM variables from templates -YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COM_OBS COM_SNOW_ANALYSIS COM_CONF +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_OBS:COM_OBS_TMPL \ + COMOUT_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMOUT_CONF:COM_CONF_TMPL RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \ - COM_ATMOS_RESTART_PREV:COM_ATMOS_RESTART_TMPL + COMIN_ATMOS_RESTART_PREV:COM_ATMOS_RESTART_TMPL -mkdir -m 775 -p "${COM_SNOW_ANALYSIS}" "${COM_CONF}" +mkdir -m 775 -p "${COMOUT_SNOW_ANALYSIS}" "${COMOUT_CONF}" ############################################################### # Run relevant script diff --git a/jobs/rocoto/esnowanl.sh b/jobs/rocoto/esnowanl.sh new file mode 100755 index 0000000000..a6a87f8492 --- /dev/null +++ b/jobs/rocoto/esnowanl.sh @@ -0,0 +1,26 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +export job="esnowanl" +export jobid="${job}.$$" + +############################################################### +# setup python path for ioda utilities +# shellcheck disable=SC2311 +pyiodaPATH="${HOMEgfs}/sorc/gdas.cd/build/lib/python$(detect_py_ver)/" +gdasappPATH="${HOMEgfs}/sorc/gdas.cd/sorc/iodaconv/src:${pyiodaPATH}" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}:${gdasappPATH}" +export PYTHONPATH + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_SNOWENS_ANALYSIS" +status=$? +exit "${status}" diff --git a/jobs/rocoto/esnowrecen.sh b/jobs/rocoto/esnowrecen.sh deleted file mode 100755 index f8c4f8f7fc..0000000000 --- a/jobs/rocoto/esnowrecen.sh +++ /dev/null @@ -1,18 +0,0 @@ -#! /usr/bin/env bash - -source "${HOMEgfs}/ush/preamble.sh" - -############################################################### -# Source UFSDA workflow modules -. "${HOMEgfs}/ush/load_ufsda_modules.sh" -status=$? -[[ ${status} -ne 0 ]] && exit "${status}" - -export job="esnowrecen" -export jobid="${job}.$$" - -############################################################### -# Execute the JJOB -"${HOMEgfs}/jobs/JGDAS_ENKF_SNOW_RECENTER" -status=$? -exit "${status}" diff --git a/parm/archive/enkf.yaml.j2 b/parm/archive/enkf.yaml.j2 index f5662bc687..9f9ad296f8 100644 --- a/parm/archive/enkf.yaml.j2 +++ b/parm/archive/enkf.yaml.j2 @@ -70,6 +70,14 @@ enkf: - "{{ COMIN_ATMOS_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}{{ file }}" {% endfor %} + {% if DO_JEDISNOWDA %} + - "{{ COMIN_SNOW_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}snowstat.tgz" + {% for itile in range(1,7) %} + # Snow analysis is 3dvar + - "{{ COMIN_SNOW_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/snowinc.{{ cycle_YMD }}.{{ cycle_HH }}0000.sfc_data.tile{{ itile }}.nc" + {% endfor %} + {% endif %} + # Ensemble mean analyses/increments # 6-hr analysis/increment {% if do_calc_increment %} diff --git a/parm/archive/gdas_restarta.yaml.j2 b/parm/archive/gdas_restarta.yaml.j2 index fc5ce9478d..824010a0ee 100644 --- a/parm/archive/gdas_restarta.yaml.j2 +++ b/parm/archive/gdas_restarta.yaml.j2 @@ -47,7 +47,7 @@ gdas_restarta: # Snow configuration yaml {% if DO_JEDISNOWDA %} - - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}letkfoi.yaml" + - "{{ COMIN_CONF | relpath(ROTDIR) }}/{{ head }}snowanlvar.yaml" {% endif %} # Input BUFR files diff --git a/parm/config/gefs/config.ufs b/parm/config/gefs/config.ufs index 5b7ba4c0af..c46023aff6 100644 --- a/parm/config/gefs/config.ufs +++ b/parm/config/gefs/config.ufs @@ -83,6 +83,7 @@ case "${fv3_res}" in export xr_cnvcld=".true." # Pass conv. clouds to Xu-Randall cloud fraction export cdmbgwd="0.071,2.1,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="40.0,1.77,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export k_split=1 export n_split=4 export tau=10.0 @@ -107,6 +108,7 @@ case "${fv3_res}" in export xr_cnvcld=".true." # Pass conv. clouds to Xu-Randall cloud fraction export cdmbgwd="0.14,1.8,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="20.0,2.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=3.0e-3 # setting for UGWPv1 non-stationary GWD export k_split=1 export n_split=4 @@ -130,6 +132,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=2 export cdmbgwd="0.23,1.5,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="10.0,3.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=1.5e-3 # setting for UGWPv1 non-stationary GWD export xr_cnvcld=".true." # Pass conv. clouds to Xu-Randall cloud fraction export k_split=2 @@ -154,6 +157,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=2 export cdmbgwd="1.1,0.72,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="5.0,5.0,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=0.8e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=4 @@ -177,6 +181,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.15,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="2.5,7.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=0.5e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=4 @@ -200,6 +205,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.10,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="1.67,8.8,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=2.0 export knob_ugwp_tauamp=0.35e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=6 @@ -223,6 +229,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.05,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="0.625,14.1,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=2.0 export knob_ugwp_tauamp=0.13e-3 # setting for UGWPv1 non-stationary GWD export k_split=4 export n_split=5 @@ -268,7 +275,7 @@ case ${fv3_res} in "C384" | "C768" | "C1152" | "C3072") zstandard_level=0 ideflate=1 - quantize_nsd=5 + quantize_nsd=14 OUTPUT_FILETYPE_ATM="netcdf_parallel" if [[ "${fv3_res}" == "C384" ]]; then OUTPUT_FILETYPE_SFC="netcdf" # For C384, the write grid component is better off with serial netcdf diff --git a/parm/config/gfs/config.esnowanl b/parm/config/gfs/config.esnowanl new file mode 100644 index 0000000000..dde8970482 --- /dev/null +++ b/parm/config/gfs/config.esnowanl @@ -0,0 +1,38 @@ +#! /usr/bin/env bash + +########## config.esnowanl ########## +# configuration common to snow ensemble analysis tasks + +echo "BEGIN: config.esnowanl" + +# Get task specific resources +source "${EXPDIR}/config.resources" esnowanl + +export OBS_LIST="${PARMgfs}/gdas/snow/obs/lists/gdas_snow.yaml.j2" +export GTS_SNOW_STAGE_YAML="${PARMgfs}/gdas/snow/obs/config/bufr2ioda_mapping.yaml.j2" + +export JCB_BASE_YAML="${PARMgfs}/gdas/snow/jcb-base.yaml.j2" +export JCB_ALGO_YAML_VAR="${PARMgfs}/gdas/snow/jcb-prototype_2dvar.yaml.j2" + +# Process IMS snowcover into snow depth +export IMS_OBS_LIST="${PARMgfs}/gdas/snow/prep/prep_ims.yaml.j2" +export CALCFIMSEXE="${EXECgfs}/calcfIMS.exe" +export FIMS_NML_TMPL="${PARMgfs}/gdas/snow/prep/fims.nml.j2" +export IMS2IODACONV="${USHgfs}/imsfv3_scf2ioda.py" + +export JEDI_FIX_YAML="${PARMgfs}/gdas/snow_jedi_fix.yaml.j2" +export BERROR_STAGING_YAML="${PARMgfs}/gdas/staging/snow_berror.yaml.j2" +export SNOW_ENS_STAGE_TMPL="${PARMgfs}/gdas/snow_stage_ens_update.yaml.j2" +export SNOW_OROG_STAGE_TMPL="${PARMgfs}/gdas/snow_stage_orog.yaml.j2" +export SNOW_ENS_FINALIZE_TMPL="${PARMgfs}/gdas/snow_finalize_ens_update.yaml.j2" + +# Name of the executable that applies increment to bkg and its namelist template +export APPLY_INCR_EXE="${EXECgfs}/apply_incr.exe" +export ENS_APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/ens_apply_incr_nml.j2" + +export JEDI_CONFIG_YAML="${PARMgfs}/gdas/esnowanl_jedi_config.yaml.j2" + +export io_layout_x=@IO_LAYOUT_X@ +export io_layout_y=@IO_LAYOUT_Y@ + +echo "END: config.esnowanl" diff --git a/parm/config/gfs/config.esnowrecen b/parm/config/gfs/config.esnowrecen deleted file mode 100644 index adb039559a..0000000000 --- a/parm/config/gfs/config.esnowrecen +++ /dev/null @@ -1,29 +0,0 @@ -#! /usr/bin/env bash - -########## config.esnowrecen ########## -# configuration common to snow ensemble analysis tasks - -echo "BEGIN: config.esnowrecen" - -# Get task specific resources -source "${EXPDIR}/config.resources" esnowrecen - -export JCB_BASE_YAML="${PARMgfs}/gdas/snow/jcb-base.yaml.j2" -export JCB_ALGO_YAML="${PARMgfs}/gdas/snow/jcb-fv3jedi_land_ensrecenter.yaml.j2" - -export JEDI_FIX_YAML="${PARMgfs}/gdas/atm_jedi_fix.yaml.j2" -export SNOW_ENS_STAGE_TMPL="${PARMgfs}/gdas/snow_stage_ens_update.yaml.j2" -export SNOW_OROG_STAGE_TMPL="${PARMgfs}/gdas/snow_stage_orog.yaml.j2" -export SNOW_ENS_FINALIZE_TMPL="${PARMgfs}/gdas/snow_finalize_ens_update.yaml.j2" - -# Name of the executable that applies increment to bkg and its namelist template -export APPLY_INCR_EXE="${EXECgfs}/apply_incr.exe" -export ENS_APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/letkfoi/ens_apply_incr_nml.j2" - -export io_layout_x=@IO_LAYOUT_X@ -export io_layout_y=@IO_LAYOUT_Y@ - -export JEDIEXE=${EXECgfs}/gdasapp_land_ensrecenter.x -export FREGRID=${EXECgfs}/fregrid.x - -echo "END: config.esnowrecen" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index e642082290..230872b8f3 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -15,7 +15,7 @@ if (( $# != 1 )); then echo "prep prepatmiodaobs" echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal" - echo "snowanl esnowrecen" + echo "snowanl esnowanl" echo "prepobsaero aeroanlinit aeroanlvar aeroanlfinal aeroanlgenb" echo "anal sfcanl analcalc analdiag fcst echgres" echo "upp atmos_products" @@ -346,11 +346,11 @@ case ${step} in walltime="00:15:00" ntasks=$(( layout_x * layout_y * 6 )) - threads_per_task=1 + threads_per_task=2 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) ;; - "esnowrecen") + "esnowanl") # below lines are for creating JEDI YAML case ${CASE} in "C768") @@ -373,9 +373,9 @@ case ${step} in export layout_x export layout_y - walltime="00:15:00" + walltime="00:30:00" ntasks=$(( layout_x * layout_y * 6 )) - threads_per_task=1 + threads_per_task=2 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) ;; @@ -1213,7 +1213,7 @@ case ${step} in ;; "esfc") - walltime="00:15:00" + walltime="00:25:00" ntasks=80 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) diff --git a/parm/config/gfs/config.snowanl b/parm/config/gfs/config.snowanl index 1aeaf58e46..67a4fc012f 100644 --- a/parm/config/gfs/config.snowanl +++ b/parm/config/gfs/config.snowanl @@ -11,13 +11,8 @@ source "${EXPDIR}/config.resources" snowanl export OBS_LIST="${PARMgfs}/gdas/snow/obs/lists/gdas_snow.yaml.j2" export GTS_SNOW_STAGE_YAML="${PARMgfs}/gdas/snow/obs/config/bufr2ioda_mapping.yaml.j2" -# Name of the JEDI executable and its yaml template -export JEDIEXE="${EXECgfs}/gdas.x" -export JEDIYAML="${PARMgfs}/gdas/snow/letkfoi/letkfoi.yaml.j2" - -# Ensemble member properties -export SNOWDEPTHVAR="snodl" -export BESTDDEV="30." # Background Error Std. Dev. for LETKFOI +export JCB_BASE_YAML="${PARMgfs}/gdas/snow/jcb-base.yaml.j2" +export JCB_ALGO_YAML_VAR="${PARMgfs}/gdas/snow/jcb-prototype_2dvar.yaml.j2" # Process IMS snowcover into snow depth export IMS_OBS_LIST="${PARMgfs}/gdas/snow/prep/prep_ims.yaml.j2" @@ -27,9 +22,15 @@ export IMS2IODACONV="${USHgfs}/imsfv3_scf2ioda.py" # Name of the executable that applies increment to bkg and its namelist template export APPLY_INCR_EXE="${EXECgfs}/apply_incr.exe" -export APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/letkfoi/apply_incr_nml.j2" +export APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/apply_incr_nml.j2" export JEDI_FIX_YAML="${PARMgfs}/gdas/snow_jedi_fix.yaml.j2" +export VAR_BKG_STAGING_YAML="${PARMgfs}/gdas/staging/snow_var_bkg.yaml.j2" +export BERROR_STAGING_YAML="${PARMgfs}/gdas/staging/snow_berror.yaml.j2" + +export JEDI_CONFIG_YAML="${PARMgfs}/gdas/snowanl_jedi_config.yaml.j2" + +export JEDIEXE=${EXECgfs}/gdas.x export io_layout_x=@IO_LAYOUT_X@ export io_layout_y=@IO_LAYOUT_Y@ diff --git a/parm/config/gfs/config.ufs b/parm/config/gfs/config.ufs index 3f8e7022fa..9737404dd1 100644 --- a/parm/config/gfs/config.ufs +++ b/parm/config/gfs/config.ufs @@ -99,6 +99,7 @@ case "${fv3_res}" in export xr_cnvcld=".false." # Do not pass conv. clouds to Xu-Randall cloud fraction export cdmbgwd="0.071,2.1,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="40.0,1.77,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=6.0e-3 # setting for UGWPv1 non-stationary GWD export k_split=1 export n_split=4 @@ -124,6 +125,7 @@ case "${fv3_res}" in export npy_nest=241 export NEST_DLON=0.25 export NEST_DLAT=0.25 + export psl_gwd_dx_factor=6.0 export WRITE_GROUP_GDAS=2 export WRTTASK_PER_GROUP_PER_THREAD_PER_TILE_GDAS=2 export WRITE_GROUP_GFS=2 @@ -141,6 +143,7 @@ case "${fv3_res}" in export xr_cnvcld=.false. # Do not pass conv. clouds to Xu-Randall cloud fraction export cdmbgwd="0.14,1.8,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="20.0,2.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=3.0e-3 # setting for UGWPv1 non-stationary GWD export k_split=1 export n_split=4 @@ -167,6 +170,7 @@ case "${fv3_res}" in export npy_nest=481 export NEST_DLON=0.125 export NEST_DLAT=0.125 + export psl_gwd_dx_factor=6.0 export WRITE_GROUP_GDAS=2 export WRTTASK_PER_GROUP_PER_THREAD_PER_TILE_GDAS=15 export WRITE_GROUP_GFS=2 @@ -183,6 +187,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=2 export cdmbgwd="0.23,1.5,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="10.0,3.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=1.5e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=4 @@ -211,6 +216,7 @@ case "${fv3_res}" in export npy_nest=961 export NEST_DLON=0.0625 export NEST_DLAT=0.0625 + export psl_gwd_dx_factor=2.0 export WRITE_GROUP_GDAS=2 export WRTTASK_PER_GROUP_PER_THREAD_PER_TILE_GDAS=20 export WRITE_GROUP_GFS=2 @@ -227,6 +233,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=2 export cdmbgwd="1.1,0.72,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="5.0,5.0,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=0.8e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=4 @@ -258,6 +265,7 @@ case "${fv3_res}" in export npy_nest=1921 export NEST_DLON=0.0325 export NEST_DLAT=0.0325 + export psl_gwd_dx_factor=2.0 export WRITE_GROUP_GDAS=2 export WRTTASK_PER_GROUP_PER_THREAD_PER_TILE_GDAS=90 export WRITE_GROUP_GFS=2 @@ -274,6 +282,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.15,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="2.5,7.5,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=6.0 export knob_ugwp_tauamp=0.5e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=4 @@ -298,6 +307,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.10,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="1.67,8.8,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=2.0 export knob_ugwp_tauamp=0.35e-3 # setting for UGWPv1 non-stationary GWD export k_split=2 export n_split=6 @@ -321,6 +331,7 @@ case "${fv3_res}" in export nthreads_ufs_gfs=4 export cdmbgwd="4.0,0.05,1.0,1.0" # mountain blocking, ogwd, cgwd, cgwd src scaling export cdmbgwd_gsl="0.625,14.1,1.0,1.0" # settings for GSL drag suite + export psl_gwd_dx_factor=2.0 export knob_ugwp_tauamp=0.13e-3 # setting for UGWPv1 non-stationary GWD export k_split=4 export n_split=5 diff --git a/parm/gdas/esnowanl_jedi_config.yaml.j2 b/parm/gdas/esnowanl_jedi_config.yaml.j2 new file mode 100644 index 0000000000..ee0909f6db --- /dev/null +++ b/parm/gdas/esnowanl_jedi_config.yaml.j2 @@ -0,0 +1,14 @@ +esnowanlensmean: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas.x' + mpi_cmd: '{{ APRUN_ESNOWANL }}' + jedi_args: ['fv3jedi', 'ensmean'] + jcb_base_yaml: '{{ PARMgfs }}/gdas/snow/jcb-base.yaml.j2' + jcb_algo: 'fv3jedi_snow_ensmean' +snowanlvar: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas.x' + mpi_cmd: '{{ APRUN_ESNOWANL }}' + jedi_args: ['fv3jedi', 'variational'] + jcb_base_yaml: '{{ PARMgfs }}/gdas/snow/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML_VAR }}' \ No newline at end of file diff --git a/parm/gdas/snow_stage_ens_update.yaml.j2 b/parm/gdas/snow_stage_ens_update.yaml.j2 index 4ad5499751..d8b1d42d00 100644 --- a/parm/gdas/snow_stage_ens_update.yaml.j2 +++ b/parm/gdas/snow_stage_ens_update.yaml.j2 @@ -10,45 +10,15 @@ # create working directories ###################################### mkdir: -- "{{ DATA }}/bkg/det" -- "{{ DATA }}/bkg/det_ensres" -- "{{ DATA }}/inc/det" -- "{{ DATA }}/inc/det_ensres" -- "{{ DATA }}//inc/ensmean" +- "{{ DATA }}/obs" +- "{{ DATA }}/bkg/ensmean" +- "{{ DATA }}/anl/ensmean" {% for mem in range(1, NMEM_ENS + 1) %} - "{{ DATA }}/bkg/mem{{ '%03d' % mem }}" - "{{ DATA }}/anl/mem{{ '%03d' % mem }}" {% endfor %} copy: ###################################### -# copy deterministic background files -###################################### -# define variables -# Declare a dict of search and replace terms to run on each template -{% set tmpl_dict = {'${ROTDIR}':ROTDIR, - '${RUN}':GDUMP, - '${YMD}':previous_cycle | to_YMD, - '${HH}':previous_cycle | strftime("%H"), - '${MEMDIR}':""} %} - -{% for tile in range(1, ntiles+1) %} -- ["{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ bkg_time }}.sfc_data.tile{{ tile }}.nc", "{{ DATA }}/bkg/det/{{ bkg_time }}.sfc_data.tile{{ tile }}.nc"] -{% endfor %} -###################################### -# copy deterministic increment files -###################################### -# define variables -# Declare a dict of search and replace terms to run on each template -{% set tmpl_dict = {'${ROTDIR}':ROTDIR, - '${RUN}':GDUMP, - '${YMD}':current_cycle | to_YMD, - '${HH}':current_cycle | strftime("%H"), - '${MEMDIR}':""} %} - -{% for tile in range(1, ntiles+1) %} -- ["{{ COM_SNOW_ANALYSIS_TMPL | replace_tmpl(tmpl_dict) }}/snowinc.{{ current_cycle | to_fv3time }}.sfc_data.tile{{ tile }}.nc", "{{ DATA }}/inc/det/snowinc.{{ bkg_time }}.sfc_data.tile{{ tile }}.nc"] -{% endfor %} -###################################### # copy ensemble background files ###################################### {% for mem in range(1, NMEM_ENS + 1) %} @@ -60,6 +30,8 @@ copy: '${HH}':previous_cycle | strftime("%H"), '${MEMDIR}':"mem" + '%03d' % mem} %} + # copy coupler file +- ["{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ current_cycle | to_fv3time }}.coupler.res", "{{ DATA }}/bkg/mem{{ '%03d' % mem }}/{{ current_cycle | to_fv3time }}.coupler.res"] # we need to copy them to two places, one serves as the basis for the analysis {% for tile in range(1, ntiles+1) %} - ["{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ current_cycle | to_fv3time }}.sfc_data.tile{{ tile }}.nc", "{{ DATA }}/bkg/mem{{ '%03d' % mem }}/{{ current_cycle | to_fv3time }}.sfc_data.tile{{ tile }}.nc"] diff --git a/parm/gdas/snow_stage_orog.yaml.j2 b/parm/gdas/snow_stage_orog.yaml.j2 index 3cd7d5c327..f915b36d1f 100644 --- a/parm/gdas/snow_stage_orog.yaml.j2 +++ b/parm/gdas/snow_stage_orog.yaml.j2 @@ -1,12 +1,8 @@ mkdir: -- "{{ DATA }}/orog/det" - "{{ DATA }}/orog/ens" copy: -- ["{{ FIXorog }}/{{ CASE }}/{{ CASE }}_mosaic.nc", "{{ DATA }}/orog/det/{{ CASE }}_mosaic.nc"] - ["{{ FIXorog }}/{{ CASE_ENS }}/{{ CASE_ENS }}_mosaic.nc", "{{ DATA }}/orog/ens/{{ CASE_ENS }}_mosaic.nc"] {% for tile in range(1, ntiles+1) %} -- ["{{ FIXorog }}/{{ CASE }}/{{ CASE }}_grid.tile{{ tile }}.nc", "{{ DATA }}/orog/det/{{ CASE }}_grid.tile{{ tile }}.nc"] - ["{{ FIXorog }}/{{ CASE_ENS }}/{{ CASE_ENS }}_grid.tile{{ tile }}.nc", "{{ DATA }}/orog/ens/{{ CASE_ENS }}_grid.tile{{ tile }}.nc"] -- ["{{ FIXorog }}/{{ CASE }}/{{ CASE }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc", "{{ DATA }}/orog/det/{{ CASE }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc" ] - ["{{ FIXorog }}/{{ CASE_ENS }}/{{ CASE_ENS }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc", "{{ DATA }}/orog/ens/{{ CASE_ENS }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc" ] {% endfor %} diff --git a/parm/gdas/snowanl_jedi_config.yaml.j2 b/parm/gdas/snowanl_jedi_config.yaml.j2 new file mode 100644 index 0000000000..c599787592 --- /dev/null +++ b/parm/gdas/snowanl_jedi_config.yaml.j2 @@ -0,0 +1,7 @@ +snowanlvar: + rundir: '{{ DATA }}' + exe_src: '{{ EXECgfs }}/gdas.x' + mpi_cmd: '{{ APRUN_SNOWANL }}' + jedi_args: ['fv3jedi', 'variational'] + jcb_base_yaml: '{{ PARMgfs }}/gdas/snow/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML_VAR }}' \ No newline at end of file diff --git a/parm/gdas/staging/snow_berror.yaml.j2 b/parm/gdas/staging/snow_berror.yaml.j2 new file mode 100644 index 0000000000..e230217300 --- /dev/null +++ b/parm/gdas/staging/snow_berror.yaml.j2 @@ -0,0 +1,4 @@ +mkdir: +- '{{ DATA }}/berror' +copy: +- ['{{ HOMEgfs }}/fix/gdas/snow/snow_bump_nicas_250km_shadowlevels_nicas.nc', '{{ DATA }}/berror'] diff --git a/parm/gdas/staging/snow_var_bkg.yaml.j2 b/parm/gdas/staging/snow_var_bkg.yaml.j2 new file mode 100644 index 0000000000..164fb3945e --- /dev/null +++ b/parm/gdas/staging/snow_var_bkg.yaml.j2 @@ -0,0 +1,8 @@ +mkdir: +- '{{ DATA }}/bkg' +copy: +- ['{{ COMIN_ATMOS_RESTART_PREV }}/{{ current_cycle | to_fv3time }}.coupler.res', '{{ DATA }}/bkg/'] +{% for tile in range(1, ntiles+1) %} +- ['{{ COMIN_ATMOS_RESTART_PREV }}/{{ current_cycle | to_fv3time }}.sfc_data.tile{{ tile }}.nc', '{{ DATA }}/bkg/'] +- ["{{ FIXorog }}/{{ CASE }}/{{ CASE }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc", "{{ DATA }}/bkg/{{ CASE }}.mx{{ OCNRES }}_oro_data.tile{{ tile }}.nc" ] +{% endfor %} \ No newline at end of file diff --git a/scripts/exgdas_enkf_earc.py b/scripts/exgdas_enkf_earc.py index 467cfa88dc..535dd2ea37 100755 --- a/scripts/exgdas_enkf_earc.py +++ b/scripts/exgdas_enkf_earc.py @@ -26,7 +26,7 @@ def main(): 'FHOUT_ENKF_GFS', 'FHMAX_ENKF', 'FHOUT_ENKF', 'ENKF_SPREAD', 'restart_interval_enkfgdas', 'restart_interval_enkfgfs', 'DOHYBVAR', 'DOIAU_ENKF', 'IAU_OFFSET', 'DOIAU', 'DO_CA', - 'DO_CALC_INCREMENT', 'assim_freq', 'ARCH_CYC', + 'DO_CALC_INCREMENT', 'assim_freq', 'ARCH_CYC', 'DO_JEDISNOWDA', 'ARCH_WARMICFREQ', 'ARCH_FCSTICFREQ', 'IAUFHRS_ENKF', 'NET'] diff --git a/scripts/exgdas_enkf_snow_recenter.py b/scripts/exgdas_enkf_snow_recenter.py deleted file mode 100755 index fcd501860c..0000000000 --- a/scripts/exgdas_enkf_snow_recenter.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 -# exgdas_enkf_snow_recenter.py -# This script creates an SnowEnsAnalysis class -# and will recenter the ensemble mean to the -# deterministic analysis and provide increments -# to create an ensemble of snow analyses -import os - -from wxflow import Logger, cast_strdict_as_dtypedict -from pygfs.task.snowens_analysis import SnowEnsAnalysis - -# Initialize root logger -logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) - - -if __name__ == '__main__': - - # Take configuration from environment and cast it as python dictionary - config = cast_strdict_as_dtypedict(os.environ) - - # Instantiate the snow ensemble analysis task - anl = SnowEnsAnalysis(config) - anl.initialize() - anl.genWeights() - anl.genMask() - anl.regridDetBkg() - anl.regridDetInc() - anl.recenterEns() - anl.addEnsIncrements() - anl.finalize() diff --git a/scripts/exglobal_snow_analysis.py b/scripts/exglobal_snow_analysis.py index dd52b699dc..df2c17530d 100755 --- a/scripts/exglobal_snow_analysis.py +++ b/scripts/exglobal_snow_analysis.py @@ -18,9 +18,20 @@ config = cast_strdict_as_dtypedict(os.environ) # Instantiate the snow analysis task - anl = SnowAnalysis(config) - if anl.task_config.cyc == 0: - anl.prepare_IMS() - anl.initialize() - anl.execute() - anl.finalize() + snow_anl = SnowAnalysis(config) + + # Initialize JEDI 2DVar snow analysis + snow_anl.initialize() + + # Process IMS snow cover (if applicable) + if snow_anl.task_config.cyc == 0: + snow_anl.prepare_IMS() + + # Execute JEDI snow analysis + snow_anl.execute('snowanlvar') + + # Add increments + snow_anl.add_increments() + + # Finalize JEDI snow analysis + snow_anl.finalize() diff --git a/scripts/exglobal_snowens_analysis.py b/scripts/exglobal_snowens_analysis.py new file mode 100755 index 0000000000..6e02fd0a16 --- /dev/null +++ b/scripts/exglobal_snowens_analysis.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# exglobal_snowens_analysis.py +# This script creates an SnowEnsAnalysis class, +# which will compute the ensemble mean of the snow forecast, +# run a 2DVar analysis, and provide increments +# to create an ensemble of snow analyses +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs.task.snowens_analysis import SnowEnsAnalysis + +# Initialize root logger +logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the snow ensemble analysis task + snow_ens_anl = SnowEnsAnalysis(config) + + # Initialize JEDI 2DVar snow analysis + snow_ens_anl.initialize() + + # Calculate ensemble mean + snow_ens_anl.execute('esnowanlensmean') + + # stage ensemble mean backgrounds + + # Process IMS snow cover (if applicable) + if snow_ens_anl.task_config.cyc == 0: + snow_ens_anl.prepare_IMS() + + # Execute JEDI snow analysis + snow_ens_anl.execute('snowanlvar') + + # Add increments + snow_ens_anl.add_increments() + + # Finalize JEDI snow analysis + snow_ens_anl.finalize() diff --git a/sorc/gdas.cd b/sorc/gdas.cd index a2ea3770ae..d6097afdd4 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit a2ea3770aeb9d4308bde51bb1d8c9c94cc9534c8 +Subproject commit d6097afdd435fe73cc99d8ddb594c3143b72820a diff --git a/sorc/link_workflow.sh b/sorc/link_workflow.sh index a89f070d41..b70b9e894f 100755 --- a/sorc/link_workflow.sh +++ b/sorc/link_workflow.sh @@ -209,7 +209,7 @@ if [[ -d "${HOMEgfs}/sorc/gdas.cd" ]]; then cd "${HOMEgfs}/fix" || exit 1 [[ ! -d gdas ]] && mkdir -p gdas cd gdas || exit 1 - for gdas_sub in fv3jedi gsibec obs soca aero; do + for gdas_sub in fv3jedi gsibec obs soca aero snow; do if [[ -d "${gdas_sub}" ]]; then rm -rf "${gdas_sub}" fi diff --git a/ush/python/pygfs/task/snow_analysis.py b/ush/python/pygfs/task/snow_analysis.py index 4b991d2b34..4e04799f3f 100644 --- a/ush/python/pygfs/task/snow_analysis.py +++ b/ush/python/pygfs/task/snow_analysis.py @@ -2,39 +2,59 @@ import os from logging import getLogger -from typing import Dict, List +from typing import Dict, List, Optional, Any from pprint import pformat +import glob +import gzip +import tarfile import numpy as np from netCDF4 import Dataset from wxflow import (AttrDict, FileHandler, to_fv3time, to_YMD, to_YMDH, to_timedelta, add_to_datetime, - rm_p, + rm_p, cp, parse_j2yaml, save_as_yaml, Jinja, + Task, logit, Executable, WorkflowException) -from pygfs.task.analysis import Analysis +from pygfs.jedi import Jedi logger = getLogger(__name__.split('.')[-1]) -class SnowAnalysis(Analysis): +class SnowAnalysis(Task): """ - Class for global snow analysis tasks + Class for JEDI-based global snow analysis tasks """ - NMEM_SNOWENS = 2 - @logit(logger, name="SnowAnalysis") - def __init__(self, config): + def __init__(self, config: Dict[str, Any]): + """Constructor global snow analysis task + + This method will construct a global snow analysis task. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task + - instantiate the Jedi attribute object + + Parameters + ---------- + config: Dict + dictionary object containing task configuration + + Returns + ---------- + None + """ super().__init__(config) _res = int(self.task_config['CASE'][1:]) _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - _letkfoi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.letkfoi.yaml") + + # fix ocnres + self.task_config.OCNRES = f"{self.task_config.OCNRES:03d}" # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( @@ -47,13 +67,82 @@ def __init__(self, config): 'SNOW_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'jedi_yaml': _letkfoi_yaml + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", + 'snow_obsdatain_path': os.path.join(self.task_config.DATA, 'obs'), + 'snow_obsdataout_path': os.path.join(self.task_config.DATA, 'diags'), + 'snow_bkg_path': os.path.join('.', 'bkg/'), } ) # Extend task_config with local_dict self.task_config = AttrDict(**self.task_config, **local_dict) + # Create JEDI object dictionary + expected_keys = ['snowanlvar'] + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + + @logit(logger) + def initialize(self) -> None: + """Initialize a global snow analysis + + This method will initialize a global snow analysis. + This includes: + - initialize JEDI application + - staging model backgrounds + - staging observation files + - staging FV3-JEDI fix files + - staging B error files + - creating output directories + + Parameters + ---------- + None + + Returns + ---------- + None + """ + # initialize JEDI variational application + logger.info(f"Initializing JEDI variational DA application") + self.jedi_dict['snowanlvar'].initialize(self.task_config) + + # stage backgrounds + logger.info(f"Staging background files from {self.task_config.VAR_BKG_STAGING_YAML}") + bkg_staging_dict = parse_j2yaml(self.task_config.VAR_BKG_STAGING_YAML, self.task_config) + FileHandler(bkg_staging_dict).sync() + logger.debug(f"Background files:\n{pformat(bkg_staging_dict)}") + + # stage observations + logger.info(f"Staging list of observation files generated from JEDI config") + obs_dict = self.jedi_dict['snowanlvar'].render_jcb(self.task_config, 'snow_obs_staging') + FileHandler(obs_dict).sync() + logger.debug(f"Observation files:\n{pformat(obs_dict)}") + + # stage GTS bufr2ioda mapping YAML files + logger.info(f"Staging GTS bufr2ioda mapping YAML files from {self.task_config.GTS_SNOW_STAGE_YAML}") + gts_mapping_list = parse_j2yaml(self.task_config.GTS_SNOW_STAGE_YAML, self.task_config) + FileHandler(gts_mapping_list).sync() + + # stage FV3-JEDI fix files + logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") + jedi_fix_dict = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) + FileHandler(jedi_fix_dict).sync() + logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") + + # staging B error files + logger.info("Stage files for static background error") + berror_staging_dict = parse_j2yaml(self.task_config.BERROR_STAGING_YAML, self.task_config) + FileHandler(berror_staging_dict).sync() + logger.debug(f"Background error files:\n{pformat(berror_staging_dict)}") + + # need output dir for diags and anl + logger.debug("Create empty output [anl, diags] directories to receive output from executable") + newdirs = [ + os.path.join(self.task_config.DATA, 'anl'), + os.path.join(self.task_config.DATA, 'diags'), + ] + FileHandler({'mkdir': newdirs}).sync() + @logit(logger) def prepare_IMS(self) -> None: """Prepare the IMS data for a global snow analysis @@ -75,21 +164,19 @@ def prepare_IMS(self) -> None: # create a temporary dict of all keys needed in this method localconf = AttrDict() - keys = ['DATA', 'current_cycle', 'COM_OBS', 'COM_ATMOS_RESTART_PREV', + keys = ['DATA', 'current_cycle', 'COMIN_OBS', 'COMIN_ATMOS_RESTART_PREV', 'OPREFIX', 'CASE', 'OCNRES', 'ntiles', 'FIXgfs'] for key in keys: localconf[key] = self.task_config[key] - # stage backgrounds - logger.info("Staging backgrounds") - FileHandler(self.get_bkg_dict(localconf)).sync() + localconf['ims_fcst_path'] = self.task_config['snow_bkg_path'] # Read and render the IMS_OBS_LIST yaml logger.info(f"Reading {self.task_config.IMS_OBS_LIST}") prep_ims_config = parse_j2yaml(self.task_config.IMS_OBS_LIST, localconf) logger.debug(f"{self.task_config.IMS_OBS_LIST}:\n{pformat(prep_ims_config)}") - # copy the IMS obs files from COM_OBS to DATA/obs + # copy the IMS obs files from COMIN_OBS to DATA/obs logger.info("Copying IMS obs for CALCFIMSEXE") FileHandler(prep_ims_config.calcfims).sync() @@ -116,9 +203,11 @@ def prepare_IMS(self) -> None: try: exe() except OSError: - raise OSError(f"Failed to execute {exe}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exe}") + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err # Ensure the snow depth IMS file is produced by the above executable input_file = f"IMSscf.{to_YMD(localconf.current_cycle)}.{localconf.CASE}_oro_data.nc" @@ -140,121 +229,38 @@ def prepare_IMS(self) -> None: logger.debug(f"Executing {exe}") exe() except OSError: - raise OSError(f"Failed to execute {exe}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exe}") + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err # Ensure the IODA snow depth IMS file is produced by the IODA converter - # If so, copy to COM_OBS/ + # If so, copy to DATA/obs/ if not os.path.isfile(f"{os.path.join(localconf.DATA, output_file)}"): logger.exception(f"{self.task_config.IMS2IODACONV} failed to produce {output_file}") raise FileNotFoundError(f"{os.path.join(localconf.DATA, output_file)}") else: - logger.info(f"Copy {output_file} to {self.task_config.COM_OBS}") + logger.info(f"Copy {output_file} to {os.path.join(localconf.DATA, 'obs')}") FileHandler(prep_ims_config.ims2ioda).sync() @logit(logger) - def initialize(self) -> None: - """Initialize method for snow analysis - This method: - - creates artifacts in the DATA directory by copying fix files - - creates the JEDI LETKF yaml from the template - - stages backgrounds, observations and ensemble members + def execute(self, jedi_dict_key: str) -> None: + """Run JEDI executable + + This method will run JEDI executables for the global snow analysis Parameters ---------- - self : Analysis - Instance of the SnowAnalysis object - """ - - super().initialize() - - # create a temporary dict of all keys needed in this method - localconf = AttrDict() - keys = ['PARMgfs', 'DATA', 'current_cycle', 'COM_OBS', 'COM_ATMOS_RESTART_PREV', - 'OPREFIX', 'CASE', 'OCNRES', 'ntiles'] - for key in keys: - localconf[key] = self.task_config[key] - - # Make member directories in DATA for background - dirlist = [] - for imem in range(1, SnowAnalysis.NMEM_SNOWENS + 1): - dirlist.append(os.path.join(localconf.DATA, 'bkg', f'mem{imem:03d}')) - FileHandler({'mkdir': dirlist}).sync() - - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") - jedi_fix_list = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_list).sync() - - # stage backgrounds - logger.info("Staging ensemble backgrounds") - FileHandler(self.get_ens_bkg_dict(localconf)).sync() - - # stage GTS bufr2ioda mapping YAML files - logger.info(f"Staging GTS bufr2ioda mapping YAML files from {self.task_config.GTS_SNOW_STAGE_YAML}") - gts_mapping_list = parse_j2yaml(self.task_config.GTS_SNOW_STAGE_YAML, localconf) - FileHandler(gts_mapping_list).sync() - - # Write out letkfoi YAML file - save_as_yaml(self.task_config.jedi_config, self.task_config.jedi_yaml) - logger.info(f"Wrote letkfoi YAML to: {self.task_config.jedi_yaml}") - - # need output dir for diags and anl - logger.info("Create empty output [anl, diags] directories to receive output from executable") - newdirs = [ - os.path.join(localconf.DATA, "anl"), - os.path.join(localconf.DATA, "diags"), - ] - FileHandler({'mkdir': newdirs}).sync() + jedi_dict_key + key specifying particular Jedi object in self.jedi_dict - @logit(logger) - def execute(self) -> None: - """Run a series of tasks to create Snow analysis - This method: - - creates an 2 member ensemble - - runs the JEDI LETKF executable to produce increments - - creates analysis from increments - - Parameters + Returns ---------- - self : Analysis - Instance of the SnowAnalysis object + None """ - # create a temporary dict of all keys needed in this method - localconf = AttrDict() - keys = ['HOMEgfs', 'DATA', 'current_cycle', - 'COM_ATMOS_RESTART_PREV', 'COM_SNOW_ANALYSIS', 'APREFIX', - 'SNOWDEPTHVAR', 'BESTDDEV', 'CASE', 'OCNRES', 'ntiles', - 'APRUN_SNOWANL', 'JEDIEXE', 'jedi_yaml', 'DOIAU', 'SNOW_WINDOW_BEGIN', - 'APPLY_INCR_NML_TMPL', 'APPLY_INCR_EXE', 'APRUN_APPLY_INCR'] - for key in keys: - localconf[key] = self.task_config[key] - - logger.info("Creating ensemble") - self.create_ensemble(localconf.SNOWDEPTHVAR, - localconf.BESTDDEV, - AttrDict({key: localconf[key] for key in ['DATA', 'ntiles', 'current_cycle']})) - - logger.info("Running JEDI LETKF") - exec_cmd = Executable(localconf.APRUN_SNOWANL) - exec_name = os.path.join(localconf.DATA, 'gdas.x') - exec_cmd.add_default_arg(exec_name) - exec_cmd.add_default_arg('fv3jedi') - exec_cmd.add_default_arg('localensembleda') - exec_cmd.add_default_arg(localconf.jedi_yaml) - - try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd() - except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") - - logger.info("Creating analysis from backgrounds and increments") - self.add_increments(localconf) + self.jedi_dict[jedi_dict_key].execute() @logit(logger) def finalize(self) -> None: @@ -271,18 +277,41 @@ def finalize(self) -> None: Instance of the SnowAnalysis object """ - logger.info("Create diagnostic tarball of diag*.nc4 files") - statfile = os.path.join(self.task_config.COM_SNOW_ANALYSIS, f"{self.task_config.APREFIX}snowstat.tgz") - self.tgz_diags(statfile, self.task_config.DATA) - - logger.info("Copy full YAML to COM") - src = os.path.join(self.task_config['DATA'], f"{self.task_config.APREFIX}letkfoi.yaml") - dest = os.path.join(self.task_config.COM_CONF, f"{self.task_config.APREFIX}letkfoi.yaml") - yaml_copy = { - 'mkdir': [self.task_config.COM_CONF], - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() + # ---- tar up diags + # path of output tar statfile + snowstat = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, f"{self.task_config.APREFIX}snowstat.tgz") + + # get list of diag files to put in tarball + diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) + + logger.info(f"Compressing {len(diags)} diag files to {snowstat}") + + # gzip the files first + logger.debug(f"Gzipping {len(diags)} diag files") + for diagfile in diags: + with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + # open tar file for writing + logger.debug(f"Creating tar file {snowstat} with {len(diags)} gzipped diag files") + with tarfile.open(snowstat, "w|gz") as archive: + for diagfile in diags: + diaggzip = f"{diagfile}.gz" + archive.add(diaggzip, arcname=os.path.basename(diaggzip)) + + # get list of yamls to copy to ROTDIR + yamls = glob.glob(os.path.join(self.task_config.DATA, '*snow*yaml')) + + # copy full YAML from executable to ROTDIR + for src in yamls: + yaml_base = os.path.splitext(os.path.basename(src))[0] + dest_yaml_name = f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.{yaml_base}.yaml" + dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) + logger.debug(f"Copying {src} to {dest}") + yaml_copy = { + 'copy': [[src, dest]] + } + FileHandler(yaml_copy).sync() logger.info("Copy analysis to COM") bkgtimes = [] @@ -296,7 +325,7 @@ def finalize(self) -> None: for itile in range(1, self.task_config.ntiles + 1): filename = template.format(tilenum=itile) src = os.path.join(self.task_config.DATA, 'anl', filename) - dest = os.path.join(self.task_config.COM_SNOW_ANALYSIS, filename) + dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) anllist.append([src, dest]) FileHandler({'copy': anllist}).sync() @@ -306,214 +335,47 @@ def finalize(self) -> None: for itile in range(1, self.task_config.ntiles + 1): filename = template.format(tilenum=itile) src = os.path.join(self.task_config.DATA, 'anl', filename) - dest = os.path.join(self.task_config.COM_SNOW_ANALYSIS, filename) + dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) inclist.append([src, dest]) FileHandler({'copy': inclist}).sync() - @staticmethod - @logit(logger) - def get_bkg_dict(config: Dict) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 RESTART files (coupler, sfc_data) - that are needed for global snow DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - config: Dict - Dictionary of key-value pairs needed in this method - Should contain the following keys: - COM_ATMOS_RESTART_PREV - DATA - current_cycle - ntiles - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - # NOTE for now this is FV3 RESTART files and just assumed to be fh006 - - # get FV3 sfc_data RESTART files, this will be a lot simpler when using history files - rst_dir = os.path.join(config.COM_ATMOS_RESTART_PREV) # for now, option later? - run_dir = os.path.join(config.DATA, 'bkg') - - # Start accumulating list of background files to copy - bkglist = [] - - # snow DA needs coupler - basename = f'{to_fv3time(config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - # snow DA only needs sfc_data - for ftype in ['sfc_data']: - template = f'{to_fv3time(config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': [run_dir], - 'copy': bkglist - } - return bkg_dict - - @staticmethod - @logit(logger) - def get_ens_bkg_dict(config: Dict) -> Dict: - """Compile a dictionary of model background files to copy for the ensemble - Note that a "Fake" 2-member ensemble backgroud is being created by copying FV3 RESTART files (coupler, sfc_data) - from the deterministic background to DATA/bkg/mem001, 002. - - Parameters - ---------- - config: Dict - Dictionary of key-value pairs needed in this method - Should contain the following keys: - COM_ATMOS_RESTART_PREV - DATA - current_cycle - ntiles - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - - dirlist = [] - bkglist = [] - - # get FV3 sfc_data RESTART files; Note an ensemble is being created - rst_dir = os.path.join(config.COM_ATMOS_RESTART_PREV) - - for imem in range(1, SnowAnalysis.NMEM_SNOWENS + 1): - memchar = f"mem{imem:03d}" - - run_dir = os.path.join(config.DATA, 'bkg', memchar, 'RESTART') - dirlist.append(run_dir) - - # Snow DA needs coupler - basename = f'{to_fv3time(config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - # Snow DA only needs sfc_data - for ftype in ['sfc_data']: - template = f'{to_fv3time(config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': dirlist, - 'copy': bkglist - } - - return bkg_dict - - @staticmethod - @logit(logger) - def create_ensemble(vname: str, bestddev: float, config: Dict) -> None: - """Create a 2-member ensemble for Snow Depth analysis by perturbing snow depth with a prescribed variance. - Additionally, remove glacier locations - - Parameters - ---------- - vname : str - snow depth variable to perturb: "snodl" - bestddev : float - Background Error Standard Deviation to perturb around to create ensemble - config: Dict - Dictionary of key-value pairs needed in this method. It must contain the following keys: - DATA - current_cycle - ntiles - """ - - # 2 ens members - offset = bestddev / np.sqrt(SnowAnalysis.NMEM_SNOWENS) - - logger.info(f"Creating ensemble for LETKFOI by offsetting with {offset}") - - workdir = os.path.join(config.DATA, 'bkg') - - sign = [1, -1] - ens_dirs = ['mem001', 'mem002'] - - for (memchar, value) in zip(ens_dirs, sign): - logger.debug(f"creating ensemble member {memchar} with sign {value}") - for tt in range(1, config.ntiles + 1): - logger.debug(f"perturbing tile {tt}") - # open file - out_netcdf = os.path.join(workdir, memchar, 'RESTART', f"{to_fv3time(config.current_cycle)}.sfc_data.tile{tt}.nc") - logger.debug(f"creating member {out_netcdf}") - with Dataset(out_netcdf, "r+") as ncOut: - slmsk_array = ncOut.variables['slmsk'][:] - vtype_array = ncOut.variables['vtype'][:] - slmsk_array[vtype_array == 15] = 0 # remove glacier locations - var_array = ncOut.variables[vname][:] - var_array[slmsk_array == 1] = var_array[slmsk_array == 1] + value * offset - ncOut.variables[vname][0, :, :] = var_array[:] - - @staticmethod @logit(logger) - def add_increments(config: Dict) -> None: + def add_increments(self) -> None: """Executes the program "apply_incr.exe" to create analysis "sfc_data" files by adding increments to backgrounds Parameters ---------- - config: Dict - Dictionary of key-value pairs needed in this method - Should contain the following keys: - HOMEgfs - COM_ATMOS_RESTART_PREV - DATA - current_cycle - CASE - OCNRES - ntiles - APPLY_INCR_NML_TMPL - APPLY_INCR_EXE - APRUN_APPLY_INCR - DOIAU - SNOW_WINDOW_BEGIN - - Raises - ------ - OSError - Failure due to OS issues - WorkflowException - All other exceptions + self : Analysis + Instance of the SnowAnalysis object """ # need backgrounds to create analysis from increments after LETKF logger.info("Copy backgrounds into anl/ directory for creating analysis from increments") bkgtimes = [] - if config.DOIAU: + if self.task_config.DOIAU: # want analysis at beginning and middle of window - bkgtimes.append(config.SNOW_WINDOW_BEGIN) - bkgtimes.append(config.current_cycle) + bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) + bkgtimes.append(self.task_config.current_cycle) anllist = [] for bkgtime in bkgtimes: template = f'{to_fv3time(bkgtime)}.sfc_data.tile{{tilenum}}.nc' - for itile in range(1, config.ntiles + 1): + for itile in range(1, self.task_config.ntiles + 1): filename = template.format(tilenum=itile) - src = os.path.join(config.COM_ATMOS_RESTART_PREV, filename) - dest = os.path.join(config.DATA, "anl", filename) + src = os.path.join(self.task_config.COMIN_ATMOS_RESTART_PREV, filename) + dest = os.path.join(self.task_config.DATA, "anl", filename) anllist.append([src, dest]) FileHandler({'copy': anllist}).sync() - if config.DOIAU: + if self.task_config.DOIAU: logger.info("Copying increments to beginning of window") - template_in = f'snowinc.{to_fv3time(config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' - template_out = f'snowinc.{to_fv3time(config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' + template_in = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' + template_out = f'snowinc.{to_fv3time(self.task_config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' inclist = [] - for itile in range(1, config.ntiles + 1): + for itile in range(1, self.task_config.ntiles + 1): filename_in = template_in.format(tilenum=itile) filename_out = template_out.format(tilenum=itile) - src = os.path.join(config.DATA, 'anl', filename_in) - dest = os.path.join(config.DATA, 'anl', filename_out) + src = os.path.join(self.task_config.DATA, 'anl', filename_in) + dest = os.path.join(self.task_config.DATA, 'anl', filename_out) inclist.append([src, dest]) FileHandler({'copy': inclist}).sync() @@ -521,35 +383,37 @@ def add_increments(config: Dict) -> None: for bkgtime in bkgtimes: logger.info("Processing analysis valid: {bkgtime}") logger.info("Create namelist for APPLY_INCR_EXE") - nml_template = config.APPLY_INCR_NML_TMPL + nml_template = self.task_config.APPLY_INCR_NML_TMPL nml_config = { 'current_cycle': bkgtime, - 'CASE': config.CASE, - 'DATA': config.DATA, - 'HOMEgfs': config.HOMEgfs, - 'OCNRES': config.OCNRES, + 'CASE': self.task_config.CASE, + 'DATA': self.task_config.DATA, + 'HOMEgfs': self.task_config.HOMEgfs, + 'OCNRES': self.task_config.OCNRES, } nml_data = Jinja(nml_template, nml_config).render logger.debug(f"apply_incr_nml:\n{nml_data}") - nml_file = os.path.join(config.DATA, "apply_incr_nml") + nml_file = os.path.join(self.task_config.DATA, "apply_incr_nml") with open(nml_file, "w") as fho: fho.write(nml_data) logger.info("Link APPLY_INCR_EXE into DATA/") - exe_src = config.APPLY_INCR_EXE - exe_dest = os.path.join(config.DATA, os.path.basename(exe_src)) + exe_src = self.task_config.APPLY_INCR_EXE + exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) if os.path.exists(exe_dest): rm_p(exe_dest) os.symlink(exe_src, exe_dest) # execute APPLY_INCR_EXE to create analysis files - exe = Executable(config.APRUN_APPLY_INCR) - exe.add_default_arg(os.path.join(config.DATA, os.path.basename(exe_src))) + exe = Executable(self.task_config.APRUN_APPLY_INCR) + exe.add_default_arg(os.path.join(self.task_config.DATA, os.path.basename(exe_src))) logger.info(f"Executing {exe}") try: exe() except OSError: - raise OSError(f"Failed to execute {exe}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exe}") + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err diff --git a/ush/python/pygfs/task/snowens_analysis.py b/ush/python/pygfs/task/snowens_analysis.py index 982f74130c..18073af6b9 100644 --- a/ush/python/pygfs/task/snowens_analysis.py +++ b/ush/python/pygfs/task/snowens_analysis.py @@ -2,283 +2,371 @@ import os from logging import getLogger -from typing import Dict, List, Any -import netCDF4 as nc +from typing import Dict, List, Optional, Any +from pprint import pformat +import glob +import gzip +import tarfile import numpy as np +from netCDF4 import Dataset from wxflow import (AttrDict, FileHandler, - to_fv3time, to_timedelta, add_to_datetime, - rm_p, chdir, + to_fv3time, to_YMD, to_YMDH, to_timedelta, add_to_datetime, + rm_p, cp, parse_j2yaml, save_as_yaml, Jinja, + Task, logit, Executable, WorkflowException) -from pygfs.task.analysis import Analysis +from pygfs.jedi import Jedi logger = getLogger(__name__.split('.')[-1]) -class SnowEnsAnalysis(Analysis): +class SnowEnsAnalysis(Task): """ - Class for global ensemble snow analysis tasks + Class for JEDI-based global snow ensemble analysis tasks """ @logit(logger, name="SnowEnsAnalysis") - def __init__(self, config): + def __init__(self, config: Dict[str, Any]): + """Constructor global snow ensemble analysis task + + This method will construct a global snow ensemble analysis task. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task + - instantiate the Jedi attribute object + + Parameters + ---------- + config: Dict + dictionary object containing task configuration + + Returns + ---------- + None + """ super().__init__(config) - _res_det = int(self.task_config['CASE'][1:]) - _res_ens = int(self.task_config['CASE_ENS'][1:]) + _res = int(self.task_config['CASE_ENS'][1:]) + self.task_config['CASE'] = self.task_config['CASE_ENS'] _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - _recenter_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.land_recenter.yaml") + + # fix ocnres + self.task_config.OCNRES = f"{self.task_config.OCNRES :03d}" # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( { - 'npx_ges': _res_ens + 1, - 'npy_ges': _res_ens + 1, + 'npx_ges': _res + 1, + 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, 'npz': self.task_config.LEVS - 1, 'SNOW_WINDOW_BEGIN': _window_begin, 'SNOW_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'ATM_WINDOW_BEGIN': _window_begin, - 'ATM_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'OPREFIX': f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.", 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'jedi_yaml': _recenter_yaml, + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", + 'snow_obsdatain_path': os.path.join(self.task_config.DATA, 'obs'), + 'snow_obsdataout_path': os.path.join(self.task_config.DATA, 'diags'), + 'snow_bkg_path': os.path.join('.', 'bkg', 'ensmean/'), } ) - bkg_time = _window_begin if self.task_config.DOIAU else self.task_config.current_cycle - local_dict['bkg_time'] = bkg_time - # task_config is everything that this task should need + # Extend task_config with local_dict self.task_config = AttrDict(**self.task_config, **local_dict) + # Create JEDI object dictionary + expected_keys = ['snowanlvar', 'esnowanlensmean'] + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + @logit(logger) def initialize(self) -> None: - """Initialize method for snow ensemble analysis - This method: + """Initialize a global snow ensemble analysis + This method will initialize a global snow ensemble analysis. + This includes: + - initialize JEDI applications + - staging model backgrounds + - staging observation files + - staging FV3-JEDI fix files + - staging B error files + - creating output directories Parameters ---------- - self : Analysis - Instance of the SnowEnsAnalysis object - """ - - super().initialize() + None - # stage background and increment files - logger.info(f"Staging files from {self.task_config.SNOW_ENS_STAGE_TMPL}") - snow_stage_list = parse_j2yaml(self.task_config.SNOW_ENS_STAGE_TMPL, self.task_config) - FileHandler(snow_stage_list).sync() - - # stage orography files - logger.info(f"Staging orography files specified in {self.task_config.SNOW_OROG_STAGE_TMPL}") - snow_orog_stage_list = parse_j2yaml(self.task_config.SNOW_OROG_STAGE_TMPL, self.task_config) - FileHandler(snow_orog_stage_list).sync() - - # stage fix files for fv3-jedi + Returns + ---------- + None + """ + # initialize JEDI variational application + logger.info(f"Initializing JEDI variational DA application") + self.jedi_dict['snowanlvar'].initialize(self.task_config) + + # initialize ensemble mean computation + logger.info(f"Initializing JEDI ensemble mean application") + self.jedi_dict['esnowanlensmean'].initialize(self.task_config) + + # stage backgrounds + logger.info(f"Staging background files from {self.task_config.SNOW_ENS_STAGE_TMPL}") + bkg_staging_dict = parse_j2yaml(self.task_config.SNOW_ENS_STAGE_TMPL, self.task_config) + FileHandler(bkg_staging_dict).sync() + logger.debug(f"Background files:\n{pformat(bkg_staging_dict)}") + + # stage orography + logger.info(f"Staging orography files from {self.task_config.SNOW_OROG_STAGE_TMPL}") + orog_staging_dict = parse_j2yaml(self.task_config.SNOW_OROG_STAGE_TMPL, self.task_config) + FileHandler(orog_staging_dict).sync() + logger.debug(f"Orography files:\n{pformat(orog_staging_dict)}") + # note JEDI will try to read the orog files for each member, let's just symlink + logger.info("Linking orography files for each member") + oro_files = glob.glob(os.path.join(self.task_config.DATA, 'orog', 'ens', '*')) + for mem in range(1, self.task_config.NMEM_ENS + 1): + dest = os.path.join(self.task_config.DATA, 'bkg', f"mem{mem:03}") + for oro_file in oro_files: + os.symlink(oro_file, os.path.join(dest, os.path.basename(oro_file))) + # need to symlink orography files for the ensmean too + dest = os.path.join(self.task_config.DATA, 'bkg', 'ensmean') + for oro_file in oro_files: + os.symlink(oro_file, os.path.join(dest, os.path.basename(oro_file))) + + # stage observations + logger.info(f"Staging list of observation files generated from JEDI config") + obs_dict = self.jedi_dict['snowanlvar'].render_jcb(self.task_config, 'snow_obs_staging') + FileHandler(obs_dict).sync() + logger.debug(f"Observation files:\n{pformat(obs_dict)}") + + # stage GTS bufr2ioda mapping YAML files + logger.info(f"Staging GTS bufr2ioda mapping YAML files from {self.task_config.GTS_SNOW_STAGE_YAML}") + gts_mapping_list = parse_j2yaml(self.task_config.GTS_SNOW_STAGE_YAML, self.task_config) + FileHandler(gts_mapping_list).sync() + + # stage FV3-JEDI fix files logger.info(f"Staging JEDI fix files from {self.task_config.JEDI_FIX_YAML}") - jedi_fix_list = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_list).sync() - - # write land ensemble recentering YAML - save_as_yaml(self.task_config.jedi_config, self.task_config.jedi_yaml) - logger.info(f"Wrote recentering YAML to: {self.task_config.jedi_yaml}") - - # link recentering executable - # placeholder, currently already done by the analysis parent class - - # copy fregrid executable - fregrid_copy = {'copy': [[os.path.join(self.task_config.EXECgfs, 'fregrid'), os.path.join(self.task_config.DATA, 'fregrid.x')]]} - FileHandler(fregrid_copy).sync() + jedi_fix_dict = parse_j2yaml(self.task_config.JEDI_FIX_YAML, self.task_config) + FileHandler(jedi_fix_dict).sync() + logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") + + # staging B error files + logger.info("Stage files for static background error") + berror_staging_dict = parse_j2yaml(self.task_config.BERROR_STAGING_YAML, self.task_config) + FileHandler(berror_staging_dict).sync() + logger.debug(f"Background error files:\n{pformat(berror_staging_dict)}") + + # need output dir for diags and anl + logger.debug("Create empty output [anl, diags] directories to receive output from executable") + newdirs = [ + os.path.join(self.task_config.DATA, 'anl'), + os.path.join(self.task_config.DATA, 'diags'), + ] + FileHandler({'mkdir': newdirs}).sync() @logit(logger) - def genWeights(self) -> None: - """Create a modified land_frac file for use by fregrid - to interpolate the snow background from det to ensres + def prepare_IMS(self) -> None: + """Prepare the IMS data for a global snow analysis + + This method will prepare IMS data for a global snow analysis using JEDI. + This includes: + - staging model backgrounds + - processing raw IMS observation data and prepare for conversion to IODA + - creating IMS snowdepth data in IODA format. Parameters ---------- - self : Analysis - Instance of the SnowEnsAnalysis object - """ - - chdir(self.task_config.DATA) - - # loop through tiles - for tile in range(1, self.task_config.ntiles + 1): - # open the restart and get the vegetation type - rst = nc.Dataset(f"./bkg/det/{to_fv3time(self.task_config.bkg_time)}.sfc_data.tile{tile}.nc") - vtype = rst.variables['vtype'][:] - rst.close() - # open the oro data and get the land fraction - oro = nc.Dataset(f"./orog/det/{self.task_config.CASE}.mx{self.task_config.OCNRES}_oro_data.tile{tile}.nc") - land_frac = oro.variables['land_frac'][:] - oro.close() - # create an output file - ncfile = nc.Dataset(f"./orog/det/{self.task_config.CASE}.mx{self.task_config.OCNRES}_interp_weight.tile{tile}.nc", mode='w', format='NETCDF4') - case_int = int(self.task_config.CASE[1:]) - lon = ncfile.createDimension('lon', case_int) - lat = ncfile.createDimension('lat', case_int) - lsm_frac_out = ncfile.createVariable('lsm_frac', np.float32, ('lon', 'lat')) - # set the land fraction to 0 on glaciers to not interpolate that snow - glacier = 15 - land_frac[np.where(vtype[0, ...] == glacier)] = 0 - lsm_frac_out[:] = land_frac - # write out and close the file - ncfile.close() - - @logit(logger) - def genMask(self) -> None: - """Create a mask for use by JEDI - to mask out snow increments on non-LSM gridpoints + Analysis: parent class for GDAS task - Parameters + Returns ---------- - self : Analysis - Instance of the SnowEnsAnalysis object + None """ - chdir(self.task_config.DATA) - - # loop through tiles - for tile in range(1, self.task_config.ntiles + 1): - # open the restart and get the vegetation type - rst = nc.Dataset(f"./bkg/mem001/{to_fv3time(self.task_config.bkg_time)}.sfc_data.tile{tile}.nc", mode="r+") - vtype = rst.variables['vtype'][:] - slmsk = rst.variables['slmsk'][:] - # slmsk(Time, yaxis_1, xaxis_1) - # set the mask to 3 on glaciers - glacier = 15 - slmsk[np.where(vtype == glacier)] = 3 - # write out and close the file - rst.variables['slmsk'][:] = slmsk - rst.close() - - @logit(logger) - def regridDetBkg(self) -> None: - """Run fregrid to regrid the deterministic snow background - to the ensemble resolution + # create a temporary dict of all keys needed in this method + localconf = AttrDict() + keys = ['DATA', 'current_cycle', 'COMIN_OBS', + 'OPREFIX', 'CASE', 'OCNRES', 'ntiles', 'FIXgfs'] + for key in keys: + localconf[key] = self.task_config[key] + + localconf['ims_fcst_path'] = self.task_config['snow_bkg_path'] + # Read and render the IMS_OBS_LIST yaml + logger.info(f"Reading {self.task_config.IMS_OBS_LIST}") + prep_ims_config = parse_j2yaml(self.task_config.IMS_OBS_LIST, localconf) + logger.debug(f"{self.task_config.IMS_OBS_LIST}:\n{pformat(prep_ims_config)}") + + # copy the IMS obs files from COMIN_OBS to DATA/obs + logger.info("Copying IMS obs for CALCFIMSEXE") + FileHandler(prep_ims_config.calcfims).sync() + + logger.info("Create namelist for CALCFIMSEXE") + nml_template = self.task_config.FIMS_NML_TMPL + nml_data = Jinja(nml_template, localconf).render + logger.debug(f"fims.nml:\n{nml_data}") + + nml_file = os.path.join(localconf.DATA, "fims.nml") + with open(nml_file, "w") as fho: + fho.write(nml_data) - Parameters - ---------- - self : Analysis - Instance of the SnowEnsAnalysis object - """ + logger.info("Link CALCFIMSEXE into DATA/") + exe_src = self.task_config.CALCFIMSEXE + exe_dest = os.path.join(localconf.DATA, os.path.basename(exe_src)) + if os.path.exists(exe_dest): + rm_p(exe_dest) + os.symlink(exe_src, exe_dest) - chdir(self.task_config.DATA) - - arg_list = [ - "--input_mosaic", f"./orog/det/{self.task_config.CASE}_mosaic.nc", - "--input_dir", f"./bkg/det/", - "--input_file", f"{to_fv3time(self.task_config.bkg_time)}.sfc_data", - "--scalar_field", f"snodl", - "--output_dir", f"./bkg/det_ensres/", - "--output_file", f"{to_fv3time(self.task_config.bkg_time)}.sfc_data", - "--output_mosaic", f"./orog/ens/{self.task_config.CASE_ENS}_mosaic.nc", - "--interp_method", f"conserve_order1", - "--weight_file", f"./orog/det/{self.task_config.CASE}.mx{self.task_config.OCNRES}_interp_weight", - "--weight_field", f"lsm_frac", - "--remap_file", f"./remap", - ] - fregrid_exe = os.path.join(self.task_config.DATA, 'fregrid.x') - exec_cmd = Executable(fregrid_exe) + # execute CALCFIMSEXE to calculate IMS snowdepth + exe = Executable(self.task_config.APRUN_CALCFIMS) + exe.add_default_arg(exe_dest) try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd(*arg_list) + logger.debug(f"Executing {exe}") + exe() except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err - @logit(logger) - def regridDetInc(self) -> None: - """Run fregrid to regrid the deterministic snow increment - to the ensemble resolution + # Ensure the snow depth IMS file is produced by the above executable + input_file = f"IMSscf.{to_YMD(localconf.current_cycle)}.{localconf.CASE}_oro_data.nc" + if not os.path.isfile(f"{os.path.join(localconf.DATA, input_file)}"): + logger.exception(f"{self.task_config.CALCFIMSEXE} failed to produce {input_file}") + raise FileNotFoundError(f"{os.path.join(localconf.DATA, input_file)}") - Parameters - ---------- - self : Analysis - Instance of the SnowEnsAnalysis object - """ + # Execute imspy to create the IMS obs data in IODA format + logger.info("Create IMS obs data in IODA format") - chdir(self.task_config.DATA) - - arg_list = [ - "--input_mosaic", f"./orog/det/{self.task_config.CASE}_mosaic.nc", - "--input_dir", f"./inc/det/", - "--input_file", f"snowinc.{to_fv3time(self.task_config.bkg_time)}.sfc_data", - "--scalar_field", f"snodl", - "--output_dir", f"./inc/det_ensres/", - "--output_file", f"snowinc.{to_fv3time(self.task_config.bkg_time)}.sfc_data", - "--output_mosaic", f"./orog/ens/{self.task_config.CASE_ENS}_mosaic.nc", - "--interp_method", f"conserve_order1", - "--weight_file", f"./orog/det/{self.task_config.CASE}.mx{self.task_config.OCNRES}_interp_weight", - "--weight_field", f"lsm_frac", - "--remap_file", f"./remap", - ] - fregrid_exe = os.path.join(self.task_config.DATA, 'fregrid.x') - exec_cmd = Executable(fregrid_exe) + output_file = f"ims_snow_{to_YMDH(localconf.current_cycle)}.nc4" + if os.path.isfile(f"{os.path.join(localconf.DATA, output_file)}"): + rm_p(output_file) + + exe = Executable(self.task_config.IMS2IODACONV) + exe.add_default_arg(["-i", f"{os.path.join(localconf.DATA, input_file)}"]) + exe.add_default_arg(["-o", f"{os.path.join(localconf.DATA, output_file)}"]) try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd(*arg_list) + logger.debug(f"Executing {exe}") + exe() except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err + + # Ensure the IODA snow depth IMS file is produced by the IODA converter + # If so, copy to DATA/obs/ + if not os.path.isfile(f"{os.path.join(localconf.DATA, output_file)}"): + logger.exception(f"{self.task_config.IMS2IODACONV} failed to produce {output_file}") + raise FileNotFoundError(f"{os.path.join(localconf.DATA, output_file)}") + else: + logger.info(f"Copy {output_file} to {os.path.join(localconf.DATA, 'obs')}") + FileHandler(prep_ims_config.ims2ioda).sync() @logit(logger) - def recenterEns(self) -> None: - """Run recentering code to create an ensemble of snow increments - based on the deterministic increment, and the difference - between the determinstic and ensemble mean forecast + def execute(self, jedi_dict_key: str) -> None: + """Run JEDI executable + + This method will run JEDI executables for the global snow analysis Parameters ---------- - self : Analysis - Instance of the SnowEnsAnalysis object + jedi_dict_key + key specifying particular Jedi object in self.jedi_dict + + Returns + ---------- + None """ - logger.info("Running recentering code") - exec_cmd = Executable(self.task_config.APRUN_ESNOWRECEN) - exec_name = os.path.join(self.task_config.DATA, 'gdasapp_land_ensrecenter.x') - exec_cmd.add_default_arg(exec_name) - exec_cmd.add_default_arg(self.task_config.jedi_yaml) - try: - logger.debug(f"Executing {exec_cmd}") - exec_cmd() - except OSError: - raise OSError(f"Failed to execute {exec_cmd}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exec_cmd}") + self.jedi_dict[jedi_dict_key].execute() @logit(logger) def finalize(self) -> None: - """Performs closing actions of the snow ensemble analysis task + """Performs closing actions of the Snow analysis task This method: - - copies the ensemble snow analyses to the proper locations - - copies the ensemble mean increment to COM + - tar and gzip the output diag files and place in COM/ + - copy the generated YAML file from initialize to the COM/ + - copy the analysis files to the COM/ + - copy the increment files to the COM/ Parameters ---------- self : Analysis Instance of the SnowEnsAnalysis object """ - # save files to COM - logger.info(f"Copying files described in {self.task_config.SNOW_ENS_FINALIZE_TMPL}") - snow_final_list = parse_j2yaml(self.task_config.SNOW_ENS_FINALIZE_TMPL, self.task_config) - FileHandler(snow_final_list).sync() + + # ---- tar up diags + # path of output tar statfile + snowstat = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, f"{self.task_config.APREFIX}snowstat.tgz") + + # get list of diag files to put in tarball + diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) + + logger.info(f"Compressing {len(diags)} diag files to {snowstat}") + + # gzip the files first + logger.debug(f"Gzipping {len(diags)} diag files") + for diagfile in diags: + with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + # open tar file for writing + logger.debug(f"Creating tar file {snowstat} with {len(diags)} gzipped diag files") + with tarfile.open(snowstat, "w|gz") as archive: + for diagfile in diags: + diaggzip = f"{diagfile}.gz" + archive.add(diaggzip, arcname=os.path.basename(diaggzip)) + + # get list of yamls to copy to ROTDIR + yamls = glob.glob(os.path.join(self.task_config.DATA, '*snow*yaml')) + + # copy full YAML from executable to ROTDIR + for src in yamls: + yaml_base = os.path.splitext(os.path.basename(src))[0] + dest_yaml_name = f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.{yaml_base}.yaml" + dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) + logger.debug(f"Copying {src} to {dest}") + yaml_copy = { + 'copy': [[src, dest]] + } + FileHandler(yaml_copy).sync() + + logger.info("Copy analysis to COM") + bkgtimes = [] + if self.task_config.DOIAU: + # need both beginning and middle of window + bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) + bkgtimes.append(self.task_config.current_cycle) + anllist = [] + for mem in range(1, self.task_config.NMEM_ENS + 1): + for bkgtime in bkgtimes: + template = f'{to_fv3time(bkgtime)}.sfc_data.tile{{tilenum}}.nc' + for itile in range(1, self.task_config.ntiles + 1): + filename = template.format(tilenum=itile) + src = os.path.join(self.task_config.DATA, 'anl', f"mem{mem:03d}", filename) + COMOUT_SNOW_ANALYSIS = self.task_config.COMOUT_SNOW_ANALYSIS.replace('ensstat', f"mem{mem:03d}") + dest = os.path.join(COMOUT_SNOW_ANALYSIS, filename) + anllist.append([src, dest]) + FileHandler({'copy': anllist}).sync() + + logger.info('Copy increments to COM') + template = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' + inclist = [] + for itile in range(1, self.task_config.ntiles + 1): + filename = template.format(tilenum=itile) + src = os.path.join(self.task_config.DATA, 'anl', filename) + dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) + inclist.append([src, dest]) + FileHandler({'copy': inclist}).sync() @logit(logger) - def addEnsIncrements(self) -> None: - """Loop through all ensemble members and apply increment to create - a surface analysis for snow + def add_increments(self) -> None: + """Executes the program "apply_incr.exe" to create analysis "sfc_data" files by adding increments to backgrounds Parameters ---------- @@ -286,145 +374,69 @@ def addEnsIncrements(self) -> None: Instance of the SnowEnsAnalysis object """ - bkg_times = [] - # no matter what, we want to process the center of the window - bkg_times.append(self.task_config.current_cycle) - # if DOIAU, we need to copy the increment to be valid at the center of the window - # and compute the analysis there to restart the model if self.task_config.DOIAU: logger.info("Copying increments to beginning of window") - template_in = f'snowinc.{to_fv3time(self.task_config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' - template_out = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' + template_in = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' + template_out = f'snowinc.{to_fv3time(self.task_config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' inclist = [] - for itile in range(1, 7): + for itile in range(1, self.task_config.ntiles + 1): filename_in = template_in.format(tilenum=itile) filename_out = template_out.format(tilenum=itile) - src = os.path.join(self.task_config.DATA, 'inc', 'ensmean', filename_in) - dest = os.path.join(self.task_config.DATA, 'inc', 'ensmean', filename_out) + src = os.path.join(self.task_config.DATA, 'anl', filename_in) + dest = os.path.join(self.task_config.DATA, 'anl', filename_out) inclist.append([src, dest]) FileHandler({'copy': inclist}).sync() - # if running with IAU, we also need an analysis at the beginning of the window - bkg_times.append(self.task_config.SNOW_WINDOW_BEGIN) - - for bkg_time in bkg_times: - for mem in range(1, self.task_config.NMEM_ENS + 1): - # for now, just looping serially, should parallelize this eventually - logger.info(f"Now applying increment to member mem{mem:03}") - logger.info(f'{os.path.join(self.task_config.DATA, "anl", f"mem{mem:03}")}') - memdict = AttrDict( - { - 'HOMEgfs': self.task_config.HOMEgfs, - 'DATA': os.path.join(self.task_config.DATA, "anl", f"mem{mem:03}"), - 'DATAROOT': self.task_config.DATA, - 'current_cycle': bkg_time, - 'CASE_ENS': self.task_config.CASE_ENS, - 'OCNRES': self.task_config.OCNRES, - 'ntiles': self.task_config.ntiles, - 'ENS_APPLY_INCR_NML_TMPL': self.task_config.ENS_APPLY_INCR_NML_TMPL, - 'APPLY_INCR_EXE': self.task_config.APPLY_INCR_EXE, - 'APRUN_APPLY_INCR': self.task_config.APRUN_APPLY_INCR, - 'MYMEM': f"{mem:03}", - } - ) - self.add_increments(memdict) - - @staticmethod - @logit(logger) - def get_bkg_dict(config: Dict) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 RESTART files (coupler, sfc_data) - that are needed for global snow DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - config: Dict - Dictionary of key-value pairs needed in this method - Should contain the following keys: - COMIN_ATMOS_RESTART_PREV - DATA - current_cycle - ntiles - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - - bkg_dict = { - 'mkdir': [], - 'copy': [], - } - return bkg_dict - - @staticmethod - @logit(logger) - def add_increments(config: Dict) -> None: - """Executes the program "apply_incr.exe" to create analysis "sfc_data" files by adding increments to backgrounds - - Parameters - ---------- - config: Dict - Dictionary of key-value pairs needed in this method - Should contain the following keys: - HOMEgfs - DATA - DATAROOT - current_cycle - CASE - OCNRES - ntiles - APPLY_INCR_NML_TMPL - APPLY_INCR_EXE - APRUN_APPLY_INCR - - Raises - ------ - OSError - Failure due to OS issues - WorkflowException - All other exceptions - """ - os.chdir(config.DATA) - - logger.info("Create namelist for APPLY_INCR_EXE") - nml_template = config.ENS_APPLY_INCR_NML_TMPL - nml_data = Jinja(nml_template, config).render - logger.debug(f"apply_incr_nml:\n{nml_data}") - - nml_file = os.path.join(config.DATA, "apply_incr_nml") - with open(nml_file, "w") as fho: - fho.write(nml_data) - - logger.info("Link APPLY_INCR_EXE into DATA/") - exe_src = config.APPLY_INCR_EXE - exe_dest = os.path.join(config.DATA, os.path.basename(exe_src)) - if os.path.exists(exe_dest): - rm_p(exe_dest) - os.symlink(exe_src, exe_dest) - - # execute APPLY_INCR_EXE to create analysis files - exe = Executable(config.APRUN_APPLY_INCR) - exe.add_default_arg(os.path.join(config.DATA, os.path.basename(exe_src))) - logger.info(f"Executing {exe}") - try: - exe() - except OSError: - raise OSError(f"Failed to execute {exe}") - except Exception: - raise WorkflowException(f"An error occured during execution of {exe}") - - def get_obs_dict(self) -> Dict[str, Any]: - obs_dict = { - 'mkdir': [], - 'copy': [], - } - return obs_dict - - def get_bias_dict(self) -> Dict[str, Any]: - bias_dict = { - 'mkdir': [], - 'copy': [], - } - return bias_dict + bkgtimes = [] + if self.task_config.DOIAU: + # need both beginning and middle of window + bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) + bkgtimes.append(self.task_config.current_cycle) + + # loop over members + # TODO, make this better, or rewrite code to run in parallel + for mem in range(1, self.task_config.NMEM_ENS + 1): + logger.info(f"Processing member mem{mem:03d}") + # loop over times to apply increments + for bkgtime in bkgtimes: + logger.info(f"Processing analysis valid: {bkgtime}") + logger.info("Create namelist for APPLY_INCR_EXE") + nml_template = self.task_config.ENS_APPLY_INCR_NML_TMPL + nml_config = { + 'current_cycle': bkgtime, + 'CASE': self.task_config.CASE, + 'DATA': self.task_config.DATA, + 'HOMEgfs': self.task_config.HOMEgfs, + 'OCNRES': self.task_config.OCNRES, + 'MYMEM': f"{mem:03d}", + 'CASE_ENS': self.task_config.CASE_ENS, + } + nml_data = Jinja(nml_template, nml_config).render + logger.debug(f"apply_incr_nml:\n{nml_data}") + + nml_file = os.path.join(self.task_config.DATA, "apply_incr_nml") + if os.path.exists(nml_file): + rm_p(nml_file) + with open(nml_file, "w") as fho: + fho.write(nml_data) + + logger.info("Link APPLY_INCR_EXE into DATA/") + exe_src = self.task_config.APPLY_INCR_EXE + exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) + if os.path.exists(exe_dest): + rm_p(exe_dest) + os.symlink(exe_src, exe_dest) + + # execute APPLY_INCR_EXE to create analysis files + exe = Executable(self.task_config.APRUN_APPLY_INCR) + exe.add_default_arg(exe_dest) + logger.info(f"Executing {exe}") + try: + logger.debug(f"Executing {exe}") + exe() + except OSError: + logger.exception(f"Failed to execute {exe}") + raise + except Exception as err: + logger.exception(f"An error occured during execution of {exe}") + raise WorkflowException(f"An error occured during execution of {exe}") from err diff --git a/versions/fix.ver b/versions/fix.ver index 4739ce778a..991e0ce13a 100644 --- a/versions/fix.ver +++ b/versions/fix.ver @@ -13,6 +13,7 @@ export gdas_soca_ver=20240802 export gdas_gsibec_ver=20240416 export gdas_obs_ver=20240213 export gdas_aero_ver=20240806 +export gdas_snow_ver=20241210 export glwu_ver=20220805 export gsi_ver=20240208 export lut_ver=20220805 diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index e11f708aa6..543d7a9d8c 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -138,7 +138,7 @@ def _get_app_configs(self, run): if options['do_jedisnowda']: configs += ['snowanl'] if options['do_hybvar']: - configs += ['esnowrecen'] + configs += ['esnowanl'] if options['do_mos']: configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', @@ -316,7 +316,7 @@ def get_task_names(self): task_names[run] += ['eobs', 'eupd'] task_names[run].append('echgres') if 'gdas' in run else 0 task_names[run] += ['ediag'] if options['lobsdiag_forenkf'] else ['eomg'] - task_names[run].append('esnowrecen') if options['do_jedisnowda'] and 'gdas' in run else 0 + task_names[run].append('esnowanl') if options['do_jedisnowda'] and 'gdas' in run else 0 task_names[run] += ['stage_ic', 'ecen', 'esfc', 'efcs', 'epos', 'earc', 'cleanup'] diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 59b0951d44..54870b79cc 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -586,23 +586,23 @@ def snowanl(self): task = rocoto.create_task(task_dict) return task - def esnowrecen(self): + def esnowanl(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_snowanl'} - deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': f'{self.run}_epmn', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'task', 'name': f"{self.run.replace('enkf', '')}_prep"} + deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) - resources = self.get_resource('esnowrecen') - task_name = f'{self.run}_esnowrecen' + resources = self.get_resource('esnowanl') + task_name = f'{self.run}_esnowanl' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, 'envars': self.envars, 'cycledef': self.run.replace('enkf', ''), - 'command': f'{self.HOMEgfs}/jobs/rocoto/esnowrecen.sh', + 'command': f'{self.HOMEgfs}/jobs/rocoto/esnowanl.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' @@ -2731,7 +2731,7 @@ def esfc(self): dep_dict = {'type': 'task', 'name': f'{self.run}_eupd'} deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_jedisnowda']: - dep_dict = {'type': 'task', 'name': f'{self.run}_esnowrecen'} + dep_dict = {'type': 'task', 'name': f'{self.run}_esnowanl'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index c0496a4996..d9c769ffbe 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -20,7 +20,7 @@ class Tasks: 'eobs', 'eomg', 'epos', 'esfc', 'eupd', 'atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal', 'aeroanlgenb', - 'snowanl', 'esnowrecen', + 'snowanl', 'esnowanl', 'fcst', 'atmanlupp', 'atmanlprod', 'atmupp', 'goesupp', 'atmos_prod', 'ocean_prod', 'ice_prod',