-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improved the current SDA workflow to reach the North American runs with 6400 sites. #3340
base: develop
Are you sure you want to change the base?
Changes from 10 commits
81d6c05
39a886b
cead5d5
6ce1b39
dc06316
7dc12d3
a538129
4c172a1
c7d0ca9
f6f1c2a
cc025a4
fecb781
b407404
e9edbd6
9346ed3
08b93dc
3b1a250
0fc424a
b29173f
bb8b142
75f02eb
b7f72b4
70d6a90
12fa426
cd6360e
98d8046
1870d98
abdc1f8
0050826
5f1216e
be9b047
b51cc40
8cc437e
ab56e79
bb71ff8
9ab9b08
4c0131d
296b0d6
daf3558
86a3d16
c43cba4
a683263
e8ac7bc
0224572
76f2b72
345530f
fcc3804
9fa90a8
382e38d
2cdfea8
59c348d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/bash -l | ||
module load R/4.1.2 | ||
echo "require (PEcAnAssimSequential) | ||
qsub_analysis('@FOLDER_PATH@') | ||
" | R --no-save |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,8 @@ Imports: | |
stringr | ||
Suggests: | ||
corrplot, | ||
doSNOW, | ||
foreach, | ||
ggrepel, | ||
emdbook, | ||
glue, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,9 +58,18 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov, | |
|
||
#parallel for loop over each block. | ||
PEcAn.logger::logger.info(paste0("Running MCMC ", "for ", length(block.list.all[[t]]), " blocks")) | ||
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) { | ||
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.") | ||
return(0) | ||
if (!is.null(settings$state.data.assimilation$qsub_analysis)) {# qsub_analysis <- list(cores = 28) | ||
if ("try-error" %in% class(try(block.list.all[[t]] <- qsub_analysis_submission(block.list = block.list.all[[t]], | ||
outdir = settings$outdir, | ||
cores = as.numeric(settings$state.data.assimilation$qsub_analysis$cores))))) { | ||
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.") | ||
return(0) | ||
} | ||
} else { | ||
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) { | ||
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.") | ||
return(0) | ||
} | ||
} | ||
PEcAn.logger::logger.info("Completed!") | ||
|
||
|
@@ -632,4 +641,117 @@ block.2.vector <- function (block.list, X, H) { | |
Pf = Pf, | ||
mu.a = mu.a, | ||
Pa = Pa)) | ||
} | ||
|
||
##' This function provides means to split large SDA analysis (MCMC) runs into separate `qsub` jobs. | ||
##' Including job creation, submission, and assemble. | ||
##' @title qsub_analysis_submission | ||
##' @param block.list list: MCMC configuration lists for the block SDA analysis. | ||
##' @param outdir character: SDA output path. | ||
##' @param job.per.folder numeric: number of jobs per folder. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic seems backwards to me. Generally with a HPC or cloud queueing system you want to specify the number of cores and/or nodes that you have, and then divide jobs across them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. based on folder numbers. |
||
##' @param cores numeric: number of cpus used for parallel computaion. Default is NULL. | ||
##' @export | ||
##' | ||
qsub_analysis_submission <- function(block.list, outdir, job.per.folder = 200, cores = NULL) { | ||
L <- length(block.list) | ||
# calculate proper folder number based on settings. | ||
folder.num <- ceiling(L/job.per.folder) | ||
# create folder. | ||
# if we have previous outputs, remove them. | ||
if (file.exists(file.path(outdir, "qsub_analysis"))) { | ||
unlink(file.path(outdir, "qsub_analysis"), recursive = T) | ||
} | ||
# create new folder. | ||
dir.create(file.path(outdir, "qsub_analysis")) | ||
# loop over sub-folders. | ||
folder.paths <- job.ids <- c() | ||
PEcAn.logger::logger.info(paste("Submitting", folder.num, "jobs.")) | ||
for (i in 1:folder.num) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some systems will penalize you if you do many submissions in parallel, or only run two and then wait for those to be done, and more jobs you submit, the lower your priority in the queue. To overcome some of this I added the |
||
# create folder for each set of job runs. | ||
# calculate start and end index for the current folder. | ||
head.num <- (i-1)*job.per.folder + 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are all jobs expected to take the same amount of time? If not (e.g. could one block have 1 site while another block has 1000 sites?), can we estimate which are expected to be longer or shorter so that we can load balance a bit more intelligently than doing so uniformly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be balanced by number of sites within each block. |
||
if (i*job.per.folder > L) { | ||
tail.num <- L | ||
} else { | ||
tail.num <- i*job.per.folder | ||
} | ||
# naming and creating folder. | ||
folder.name <- paste0("From_", head.num, "_to_", tail.num) | ||
folder.path <- file.path(outdir, "qsub_analysis", folder.name) | ||
folder.paths <- c(folder.paths, folder.path) | ||
dir.create(folder.path) | ||
# save corresponding block list to the folder. | ||
blocks <- block.list[head.num:tail.num] | ||
save(blocks, file = file.path(folder.path, "block.Rdata")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find it hard to tell what the structure of |
||
# create job file. | ||
jobsh <- readLines(con = system.file("analysis_qsub.job", package = "PEcAn.ModelName"), n = -1, warn=FALSE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the generic template model being hard-coded here? Do we really want to create a package dependency of the SDA module on the generic template model? |
||
jobsh <- gsub("@FOLDER_PATH@", folder.path, jobsh) | ||
writeLines(jobsh, con = file.path(folder.path, "job.sh")) | ||
# qsub command. | ||
qsub <- "qsub -l h_rt=48:00:00 -l buyin -pe omp 28 -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. qsub settings are system and user specific, and thus need to be read from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See remote execution. Also a lot of this code seems to be in the PEcAn::remote package for example |
||
qsub <- gsub("@NAME@", paste0("Job-", i), qsub) | ||
qsub <- gsub("@STDOUT@", file.path(folder.path, "stdout.log"), qsub) | ||
qsub <- gsub("@STDERR@", file.path(folder.path, "stderr.log"), qsub) | ||
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE) | ||
cmd <- qsub[[1]] | ||
out <- system2(cmd, file.path(folder.path, "job.sh"), stdout = TRUE, stderr = TRUE) | ||
# grab job ids for future job completion detection. | ||
job.ids <- c(job.ids, PEcAn.remote::qsub_get_jobid( | ||
out = out[length(out)], | ||
qsub.jobid = settings$host$qsub.jobid, | ||
stop.on.error = TRUE)) | ||
} | ||
# checking results. | ||
PEcAn.logger::logger.info("Checking results.") | ||
# if remaining number of jobs larger than 0. | ||
while (length(job.ids) > 0) { | ||
Sys.sleep(60) | ||
completed_jobs <- job.ids %>% purrr::map(function(id) { | ||
if (PEcAn.remote::qsub_run_finished( | ||
run = id, | ||
host = host, | ||
qstat = qstat)) { | ||
return(id) | ||
} | ||
}) %>% unlist() | ||
job.ids <- job.ids[which(!job.ids %in% completed_jobs)] | ||
} | ||
# assemble results. | ||
PEcAn.logger::logger.info("Assembling results.") | ||
analysis <- c() | ||
for (path in seq_along(folder.paths)) { | ||
res_env <- new.env() | ||
load(file.path(path, "results.Rdata"), envir = res_env) | ||
analysis <- c(analysis, res_env$results) | ||
} | ||
return(analysis) | ||
} | ||
|
||
##' This function can help to execute `foreach` parallel MCMC sampling given generated MCMC configuration lists. | ||
##' @title qsub_analysis | ||
##' @param folder.path character: path where the `block.Rdata` file is stored. | ||
##' @param cores numeric: number of cpus used for parallel computaion. Default is NULL. | ||
##' @importFrom foreach %dopar% | ||
##' @export | ||
qsub_analysis <- function(folder.path, cores = NULL) { | ||
# load file. | ||
load(file.path(folder.path, "block.Rdata")) | ||
# initialize parallel. | ||
if (is.null(cores)) { | ||
cores <- parallel::detectCores() | ||
} | ||
cl <- parallel::makeCluster(cores) | ||
doSNOW::registerDoSNOW(cl) | ||
# progress bar | ||
pb <- utils::txtProgressBar(min=1, max=length(blocks), style=3) | ||
progress <- function(n) utils::setTxtProgressBar(pb, n) | ||
opts <- list(progress=progress) | ||
# parallel computation. | ||
l <- NULL # fix GitHub check issue. | ||
results <- foreach::foreach(l = blocks, .packages=c("Kendall", "purrr"), .options.snow=opts) %dopar% { | ||
MCMC_block_function(l) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive question: How closely equivalent is this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I haven't used this parallel function before. The reason that I used |
||
# wrap results. | ||
parallel::stopCluster(cl) | ||
save(results, file = file.path(folder.path, "results.Rdata")) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very server specific, in the pecan.xml we have a section for qsub that allows you to specify the modules that need to be loaded. See https://pecanproject.github.io/pecan-documentation/master/xml-core-config.html#xml-host for an example.