Skip to content

Commit

Permalink
Merge pull request #135 from bjarthur/bja/lsf
Browse files Browse the repository at this point in the history
refactor LSFManager to use jobs arrays
  • Loading branch information
bjarthur authored Sep 12, 2020
2 parents 9d727f7 + 2b96e2c commit dde400e
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 16 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Support for different job queue systems commonly used on compute clusters.

| Job queue system | Command to add processors |
| ---------------- | ------------------------- |
| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer, flags=``)` or `addprocs(LSFManager(np, flags))` |
| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` |
| Sun Grid Engine | `addprocs_sge(np::Integer, queue="")` or `addprocs(SGEManager(np, queue))` |
| SGE via qrsh | `addprocs_qrsh(np::Integer, queue="")` or `addprocs(QRSHManager(np, queue))` |
| PBS | `addprocs_pbs(np::Integer, queue="")` or `addprocs(PBSManager(np, queue))` |
Expand Down Expand Up @@ -111,8 +111,8 @@ command to bypass the filesystem and captures STDOUT directly.

### Load Sharing Facility (LSF)

`LSFManager` supports IBM's scheduler. Similar to `QRSHManager` in that it
uses the `-I` (i.e. interactive) flag to `bsub`.
`LSFManager` supports IBM's scheduler. See the `addprocs_lsf` docstring
for more information.

### Using `LocalAffinityManager` (for pinning local workers to specific cores)

Expand Down
91 changes: 80 additions & 11 deletions src/lsf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,45 @@ export LSFManager, addprocs_lsf
struct LSFManager <: ClusterManager
np::Integer
bsub_flags::Cmd
ssh_cmd::Cmd
retry_delays
throttle::Integer
end

struct LSFException <: Exception
msg
end

function bpeek(manager, jobid, iarray)
old_stderr = stderr
rd,_ = redirect_stderr()
try
io = open(`$(manager.ssh_cmd) bpeek $(jobid)\[$iarray\]`)
success(io) || throw(LSFException(String(readavailable(rd))))
return io
finally
redirect_stderr(old_stderr)
end
end

function _launch(manager, launched, c, jobid, iarray)
config = WorkerConfig()

io = retry(()->bpeek(manager, jobid, iarray),
delays=manager.retry_delays,
check=(s,e)->occursin("Not yet started", e.msg))()
port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)"
for line in eachline(io)
mm = match(port_host_regex, line)
isnothing(mm) && continue
config.host = mm.captures[2]
config.port = parse(Int, mm.captures[1])
break
end
config.userdata = `$jobid\[$iarray\]`

push!(launched, config)
notify(c)
end

function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition)
Expand All @@ -16,16 +55,15 @@ function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition
jobname = `julia-$(getpid())`

cmd = `$exename $exeflags $(worker_arg())`
bsub_cmd = `bsub -I $(manager.bsub_flags) -cwd $dir -J $jobname "$cmd"`
bsub_cmd = `$(manager.ssh_cmd) bsub $(manager.bsub_flags) -cwd $dir -J $(jobname)\[1-$np\] "$cmd"`

stream_proc = [open(bsub_cmd) for i in 1:np]
line = open(readline, bsub_cmd)
m = match(r"Job <([0-9]+)> is submitted", line)
jobid = m.captures[1]

for i in 1:np
config = WorkerConfig()
config.io = stream_proc[i]
push!(launched, config)
notify(c)
end
asyncmap((i)->_launch(manager, launched, c, jobid, i),
1:np;
ntasks=manager.throttle)

catch e
println("Error launching workers")
Expand All @@ -35,7 +73,38 @@ end

manage(manager::LSFManager, id::Int64, config::WorkerConfig, op::Symbol) = nothing

kill(manager::LSFManager, id::Int64, config::WorkerConfig) = kill(config.io)
kill(manager::LSFManager, id::Int64, config::WorkerConfig) = remote_do(exit, id)

"""
addprocs_lsf(np::Integer;
bsub_flags::Cmd=``,
ssh_cmd::Cmd=``,
retry_delays=ExponentialBackOff(n=10,
first_delay=1, max_delay=512,
factor=2),
throttle::Integer=np,
params...) =
Launch `np` workers on a cluster managed by IBM's Platform Load Sharing
Facility. `bsub_flags` can be used to pass flags to `bsub` that are specific
to your cluster or workflow needs. `ssh_cmd` can be used to launch workers
from other than the cluster head node (e.g. your personal workstation).
`retry_delays` is a vector of numbers specifying in seconds how long to
repeatedly wait for a worker to start. `throttle` specifies how many workers
to launch at once.
# Examples
addprocs_lsf(np::Integer, bsub_flags::Cmd=``; params...) =
addprocs(LSFManager(np, bsub_flags); params...)
```
addprocs_lsf(1000; ssh_cmd=`ssh login`, throttle=10)
```
"""
addprocs_lsf(np::Integer;
bsub_flags::Cmd=``,
ssh_cmd::Cmd=``,
retry_delays=ExponentialBackOff(n=10,
first_delay=1, max_delay=512,
factor=2),
throttle::Integer=np,
params...) =
addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle); params...)
28 changes: 26 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using Test
using ClusterManagers

TIMEOUT = 10.
using Distributed

@testset "ElasticManager" begin
TIMEOUT = 10.

em = ElasticManager(addr=:auto, port=0)

Expand All @@ -15,4 +15,28 @@ TIMEOUT = 10.
length(em.active) == 1
end

wait(rmprocs(workers()))
end


@static if Sys.iswindows()
windows_which(command) = `powershell.exe -Command Get-Command $command`
is_lsf_installed() = success(windows_which("bsub.exe"))
else
is_lsf_installed() = success(`which bsub`)
end

if is_lsf_installed()

@testset "LSFManager" begin
p = addprocs_lsf(1, bsub_flags=`-P scicompsoft`)
@test nprocs() == 2
@test workers() == p
@test fetch(@spawnat :any myid()) == p[1]
@test remotecall_fetch(+,p[1],1,1) == 2
rmprocs(p)
@test nprocs() == 1
@test workers() == [1]
end

end

0 comments on commit dde400e

Please sign in to comment.