Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved the current SDA workflow to reach the North American runs with 6400 sites. #3340

Open
wants to merge 51 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
81d6c05
add the template for the MCMC qsub job.
Jul 22, 2024
39a886b
added namespaces
Jul 22, 2024
cead5d5
added qsub parallel analysis functions.
Jul 22, 2024
6ce1b39
Merge branch 'develop' of https://github.com/DongchenZ/pecan into dev…
Jul 22, 2024
dc06316
include qsub analysis part into the main function.
Jul 23, 2024
7dc12d3
Update namespace.
Jul 23, 2024
a538129
Update dependencies.
Jul 23, 2024
4c172a1
Update dependencies.
Jul 23, 2024
c7d0ca9
Update namespace.
Jul 23, 2024
f6f1c2a
Apply suggestions.
Jul 23, 2024
cc025a4
Update dependency.
Jul 24, 2024
fecb781
Update dependency.
Jul 24, 2024
b407404
Pull from main branch.
Sep 18, 2024
e9edbd6
Removing the job file template.
Sep 18, 2024
9346ed3
automatic add-up.
Sep 18, 2024
08b93dc
Update the qsub option to submit analysis jobs.
Sep 18, 2024
3b1a250
Using foreach to create Y and R.
Sep 18, 2024
0fc424a
Resolve dimension issue.
Sep 18, 2024
b29173f
Add functions for initializing qsub job submissions during the genera…
Sep 18, 2024
bb8b142
Create a new workflow solely for the North American SDA runs.
Sep 18, 2024
75f02eb
Add qsub functions for parallel SDA workflows.
Sep 18, 2024
b7f72b4
Upgrade the weight calculation function and fix some bugs.
Sep 18, 2024
70d6a90
Update the script for matching the current progress with NA SDA runs.
Sep 18, 2024
12fa426
Rd file.
Sep 18, 2024
cd6360e
Add Rd files.
Sep 18, 2024
98d8046
Remove hard-coded path.
Sep 18, 2024
1870d98
Adding SDA workflow for NA SDA runs.
Sep 18, 2024
abdc1f8
Update documentation associated with the SDA batch job submission fea…
Sep 19, 2024
0050826
Update doc.
Sep 19, 2024
5f1216e
Update dependency.
Sep 19, 2024
be9b047
Update dependency.
Sep 19, 2024
b51cc40
Update job completion detection.
Sep 20, 2024
8cc437e
Add the verbose argument to the function of checking qsub job complet…
Sep 20, 2024
ab56e79
Remove the library call from the functions.
Sep 20, 2024
bb71ff8
Add qsub statements to the creation of the settings in case you are u…
Sep 20, 2024
9ab9b08
Relocate qsub functions for the analysis part and optimize the functi…
Sep 20, 2024
4c0131d
Update the roxygen structure.
Sep 20, 2024
296b0d6
Update dependency.
Sep 20, 2024
daf3558
Update and fix namespaces.
Sep 20, 2024
86a3d16
Fix GitHub checks.
Sep 20, 2024
c43cba4
Fix function usage.
Sep 20, 2024
a683263
Replace read function.
Sep 20, 2024
e8ac7bc
Update modules/assim.sequential/R/Analysis_sda_block.R
infotroph Sep 21, 2024
0224572
Update modules/assim.sequential/DESCRIPTION
infotroph Sep 21, 2024
76f2b72
whitespace
infotroph Sep 21, 2024
345530f
Update roxygen text.
Sep 23, 2024
fcc3804
Revert "Update roxygen text."
Sep 23, 2024
9fa90a8
Merge branch 'PecanProject:develop' into develop
DongchenZ Sep 30, 2024
382e38d
Merge branch 'develop' of https://github.com/DongchenZ/pecan into dev…
Sep 30, 2024
2cdfea8
Update roxygen.
Sep 30, 2024
59c348d
Revert the changes.
Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/depends/pecan_package_dependencies.csv
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"doParallel","*","modules/data.atmosphere","Suggests",FALSE
"doParallel","*","modules/data.remote","Imports",FALSE
"doSNOW","*","base/remote","Suggests",FALSE
"doSNOW","*","modules/assim.sequential","Suggests",FALSE
"dplR","*","modules/data.land","Imports",FALSE
"dplyr","*","base/qaqc","Imports",FALSE
"dplyr","*","base/remote","Imports",FALSE
Expand All @@ -62,6 +63,7 @@
"ellipse","*","modules/assim.batch","Imports",FALSE
"emdbook","*","modules/assim.sequential","Suggests",FALSE
"foreach","*","base/remote","Imports",FALSE
"foreach","*","modules/assim.sequential","Suggests",FALSE
"foreach","*","modules/data.atmosphere","Suggests",FALSE
"foreach","*","modules/data.remote","Imports",FALSE
"fs","*","base/db","Imports",FALSE
Expand Down
5 changes: 5 additions & 0 deletions models/template/inst/analysis_qsub.job
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash -l
module load R/4.1.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very server specific, in the pecan.xml we have a section for qsub that allows you to specify the modules that need to be loaded. See https://pecanproject.github.io/pecan-documentation/master/xml-core-config.html#xml-host for an example.

echo "require (PEcAnAssimSequential)
qsub_analysis('@FOLDER_PATH@')
" | R --no-save
2 changes: 2 additions & 0 deletions modules/assim.sequential/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Imports:
stringr
Suggests:
corrplot,
doSNOW,
foreach,
ggrepel,
emdbook,
glue,
Expand Down
3 changes: 3 additions & 0 deletions modules/assim.sequential/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export(post.analysis.multisite.ggplot)
export(postana.bias.plotting.sda)
export(postana.bias.plotting.sda.corr)
export(postana.timeser.plotting.sda)
export(qsub_analysis)
export(qsub_analysis_submission)
export(rescaling_stateVars)
export(rwtmnorm)
export(sample_met)
Expand All @@ -61,6 +63,7 @@ import(furrr)
import(lubridate)
import(nimble)
importFrom(dplyr,"%>%")
importFrom(foreach,"%dopar%")
infotroph marked this conversation as resolved.
Show resolved Hide resolved
importFrom(lubridate,"%m+%")
importFrom(magrittr,"%>%")
importFrom(rlang,.data)
128 changes: 125 additions & 3 deletions modules/assim.sequential/R/Analysis_sda_block.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov,

#parallel for loop over each block.
PEcAn.logger::logger.info(paste0("Running MCMC ", "for ", length(block.list.all[[t]]), " blocks"))
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
return(0)
if (!is.null(settings$state.data.assimilation$qsub_analysis)) {# qsub_analysis <- list(cores = 28)
if ("try-error" %in% class(try(block.list.all[[t]] <- qsub_analysis_submission(block.list = block.list.all[[t]],
outdir = settings$outdir,
cores = as.numeric(settings$state.data.assimilation$qsub_analysis$cores))))) {
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
return(0)
}
} else {
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
return(0)
}
}
PEcAn.logger::logger.info("Completed!")

Expand Down Expand Up @@ -632,4 +641,117 @@ block.2.vector <- function (block.list, X, H) {
Pf = Pf,
mu.a = mu.a,
Pa = Pa))
}

##' This function provides means to split large SDA analysis (MCMC) runs into separate `qsub` jobs.
##' Including job creation, submission, and assemble.
##' @title qsub_analysis_submission
##' @param block.list list: MCMC configuration lists for the block SDA analysis.
##' @param outdir character: SDA output path.
##' @param job.per.folder numeric: number of jobs per folder.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems backwards to me. Generally with a HPC or cloud queueing system you want to specify the number of cores and/or nodes that you have, and then divide jobs across them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on folder numbers.

##' @param cores numeric: number of cpus used for parallel computaion. Default is NULL.
##' @export
##'
qsub_analysis_submission <- function(block.list, outdir, job.per.folder = 200, cores = NULL) {
L <- length(block.list)
# calculate proper folder number based on settings.
folder.num <- ceiling(L/job.per.folder)
# create folder.
# if we have previous outputs, remove them.
if (file.exists(file.path(outdir, "qsub_analysis"))) {
unlink(file.path(outdir, "qsub_analysis"), recursive = T)
}
# create new folder.
dir.create(file.path(outdir, "qsub_analysis"))
# loop over sub-folders.
folder.paths <- job.ids <- c()
PEcAn.logger::logger.info(paste("Submitting", folder.num, "jobs."))
for (i in 1:folder.num) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some systems will penalize you if you do many submissions in parallel, or only run two and then wait for those to be done, and more jobs you submit, the lower your priority in the queue. To overcome some of this I added the modellauncher this will create a text file with as first line the command to execute, and will have a list of folders it needs to run the command in.

# create folder for each set of job runs.
# calculate start and end index for the current folder.
head.num <- (i-1)*job.per.folder + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all jobs expected to take the same amount of time? If not (e.g. could one block have 1 site while another block has 1000 sites?), can we estimate which are expected to be longer or shorter so that we can load balance a bit more intelligently than doing so uniformly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be balanced by number of sites within each block.

if (i*job.per.folder > L) {
tail.num <- L
} else {
tail.num <- i*job.per.folder
}
# naming and creating folder.
folder.name <- paste0("From_", head.num, "_to_", tail.num)
folder.path <- file.path(outdir, "qsub_analysis", folder.name)
folder.paths <- c(folder.paths, folder.path)
dir.create(folder.path)
# save corresponding block list to the folder.
blocks <- block.list[head.num:tail.num]
save(blocks, file = file.path(folder.path, "block.Rdata"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to tell what the structure of blocks actually is, but is this something that could be saved as a csv dataframe or something similar to avoid Rdata? At the minimum I'd recommend switching to RDS, which is more transparent about what objects are being reloaded.

# create job file.
jobsh <- readLines(con = system.file("analysis_qsub.job", package = "PEcAn.ModelName"), n = -1, warn=FALSE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the generic template model being hard-coded here? Do we really want to create a package dependency of the SDA module on the generic template model?

jobsh <- gsub("@FOLDER_PATH@", folder.path, jobsh)
writeLines(jobsh, con = file.path(folder.path, "job.sh"))
# qsub command.
qsub <- "qsub -l h_rt=48:00:00 -l buyin -pe omp 28 -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qsub settings are system and user specific, and thus need to be read from the settings object, not hard-coded. Also, I'd STRONGLY recommend setting up a single array-style qsub over submitting multiple jobs in a loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See remote execution. Also a lot of this code seems to be in the PEcAn::remote package for example start_qsub

qsub <- gsub("@NAME@", paste0("Job-", i), qsub)
qsub <- gsub("@STDOUT@", file.path(folder.path, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(folder.path, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
cmd <- qsub[[1]]
out <- system2(cmd, file.path(folder.path, "job.sh"), stdout = TRUE, stderr = TRUE)
# grab job ids for future job completion detection.
job.ids <- c(job.ids, PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE))
}
# checking results.
PEcAn.logger::logger.info("Checking results.")
# if remaining number of jobs larger than 0.
while (length(job.ids) > 0) {
Sys.sleep(60)
completed_jobs <- job.ids %>% purrr::map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
job.ids <- job.ids[which(!job.ids %in% completed_jobs)]
}
# assemble results.
PEcAn.logger::logger.info("Assembling results.")
analysis <- c()
for (path in seq_along(folder.paths)) {
res_env <- new.env()
load(file.path(path, "results.Rdata"), envir = res_env)
analysis <- c(analysis, res_env$results)
}
return(analysis)
}

##' This function can help to execute `foreach` parallel MCMC sampling given generated MCMC configuration lists.
##' @title qsub_analysis
##' @param folder.path character: path where the `block.Rdata` file is stored.
##' @param cores numeric: number of cpus used for parallel computaion. Default is NULL.
##' @importFrom foreach %dopar%
##' @export
qsub_analysis <- function(folder.path, cores = NULL) {
# load file.
load(file.path(folder.path, "block.Rdata"))
# initialize parallel.
if (is.null(cores)) {
cores <- parallel::detectCores()
}
cl <- parallel::makeCluster(cores)
doSNOW::registerDoSNOW(cl)
# progress bar
pb <- utils::txtProgressBar(min=1, max=length(blocks), style=3)
progress <- function(n) utils::setTxtProgressBar(pb, n)
opts <- list(progress=progress)
# parallel computation.
l <- NULL # fix GitHub check issue.
results <- foreach::foreach(l = blocks, .packages=c("Kendall", "purrr"), .options.snow=opts) %dopar% {
MCMC_block_function(l)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: How closely equivalent is this to parallel::parLapply(cl, blocks, MCMC_block_function)? Would it be worth considering that approach since it only uses existing dependencies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I haven't used this parallel function before. The reason that I used foreach is that it has great management of memory usage with large data inputs. But I can try this function out to see if it performs similarly to the foreach package.

# wrap results.
parallel::stopCluster(cl)
save(results, file = file.path(folder.path, "results.Rdata"))
}
16 changes: 16 additions & 0 deletions modules/assim.sequential/man/qsub_analysis.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions modules/assim.sequential/man/qsub_analysis_submission.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading