Skip to content

Commit

Permalink
add throttle keyword to LSFManager
Browse files Browse the repository at this point in the history
  • Loading branch information
bjarthur committed Aug 23, 2020
1 parent b39bf48 commit 582ec89
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Support for different job queue systems commonly used on compute clusters.

| Job queue system | Command to add processors |
| ---------------- | ------------------------- |
| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays))` |
| Load Sharing Facility (LSF) | `addprocs_lsf(np::Integer; bsub_flags=``, ssh_cmd=``)` or `addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle))` |
| Sun Grid Engine | `addprocs_sge(np::Integer, queue="")` or `addprocs(SGEManager(np, queue))` |
| SGE via qrsh | `addprocs_qrsh(np::Integer, queue="")` or `addprocs(QRSHManager(np, queue))` |
| PBS | `addprocs_pbs(np::Integer, queue="")` or `addprocs(PBSManager(np, queue))` |
Expand Down
55 changes: 30 additions & 25 deletions src/lsf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ struct LSFManager <: ClusterManager
bsub_flags::Cmd
ssh_cmd::Cmd
retry_delays
throttle::Integer
end

struct LSFException <: Exception
Expand All @@ -23,6 +24,26 @@ function bpeek(manager, jobid, iarray)
end
end

function _launch(manager, launched, c, jobid, iarray)
config = WorkerConfig()

io = retry(()->bpeek(manager, jobid, iarray),
delays=manager.retry_delays,
check=(s,e)->occursin("Not yet started", e.msg))()
port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)"
for line in eachline(io)
mm = match(port_host_regex, line)
isnothing(mm) && continue
config.host = mm.captures[2]
config.port = parse(Int, mm.captures[1])
break
end
config.userdata = `$jobid\[$iarray\]`

push!(launched, config)
notify(c)
end

function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition)
try
dir = params[:dir]
Expand All @@ -40,28 +61,9 @@ function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition
m = match(r"Job <([0-9]+)> is submitted", line)
jobid = m.captures[1]

port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)"
@sync for i in 1:np
@async begin
config = WorkerConfig()

io = retry(()->bpeek(manager, jobid, i),
delays=manager.retry_delays,
check=(s,e)->occursin("Not yet started", e.msg))()
for line in eachline(io)
m = match(port_host_regex, line)
isnothing(m) && continue
config.host = m.captures[2]
config.port = parse(Int, m.captures[1])
break
end
config.userdata = `$jobid\[$i\]`

push!(launched, config)
notify(c)
@info i
end
end
asyncmap((i)->_launch(manager, launched, c, jobid, i),
1:np;
ntasks=manager.throttle)

catch e
println("Error launching workers")
Expand All @@ -80,19 +82,21 @@ kill(manager::LSFManager, id::Int64, config::WorkerConfig) = remote_do(exit, id)
retry_delays=ExponentialBackOff(n=10,
first_delay=1, max_delay=512,
factor=2),
throttle::Integer=np,
params...) =
Launch `np` workers on a cluster managed by IBM's Platform Load Sharing
Facility. `bsub_flags` can be used to pass flags to `bsub` that are specific
to your cluster or workflow needs. `ssh_cmd` can be used to launch workers
from other than the cluster head node (e.g. your personal workstation).
`retry_delays` is a vector of numbers specifying in seconds how long to
repeatedly wait for a worker to start.
repeatedly wait for a worker to start. `throttle` specifies how many workers
to launch at once.
# Examples
```
addprocs_lsf(1000; ssh_cmd=`ssh login`)
addprocs_lsf(1000; ssh_cmd=`ssh login`, throttle=10)
```
"""
addprocs_lsf(np::Integer;
Expand All @@ -101,5 +105,6 @@ addprocs_lsf(np::Integer;
retry_delays=ExponentialBackOff(n=10,
first_delay=1, max_delay=512,
factor=2),
throttle::Integer=np,
params...) =
addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays); params...)
addprocs(LSFManager(np, bsub_flags, ssh_cmd, retry_delays, throttle); params...)

0 comments on commit 582ec89

Please sign in to comment.