Skip to content

Commit

Permalink
Merge pull request #136 from marius311/workercmd
Browse files Browse the repository at this point in the history
Make ElasticManager more convenient
  • Loading branch information
kescobo authored Jul 16, 2020
2 parents ed9dc32 + d4237a4 commit 9d727f7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 29 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.4.0"
[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Expand Down
43 changes: 17 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ You can also write your own custom cluster manager; see the instructions in the

### Slurm: a simple example

```jl
```julia
using ClusterManagers

# Arguments to the Slurm srun(1) command can be given as keyword
Expand Down Expand Up @@ -52,7 +52,7 @@ end

### SGE - a simple interactive example

```jl
```julia
julia> using ClusterManagers

julia> ClusterManagers.addprocs_sge(5)
Expand All @@ -79,7 +79,7 @@ julia> From worker 2: compute-6

Some clusters require the user to specify a list of required resources. For example, it may be necessary to specify how much memory will be needed by the job - see this [issue](https://github.com/JuliaLang/julia/issues/10390).

```jl
```julia
julia> using ClusterManagers

julia> addprocs_sge(5,res_list="h_vmem=4G,tmem=4G")
Expand Down Expand Up @@ -136,36 +136,27 @@ The `ElasticManager` is useful in scenarios where we want to dynamically add wor
It achieves this by listening on a known port on the master. The launched workers connect to this
port and publish their own host/port information for other workers to connect to.

##### Usage

On the master, you need to instantiate an instance of `ElasticManager`. The constructors defined are:
```jl
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)

```julia
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
```

On the worker, you need to call `ClusterManagers.elastic_worker` with the addr/port that the master
is listening on and the same cookie. `elastic_worker` is defined as:
```
ClusterManagers.elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
```

For example, on the master:
On Linux and Mac, you can set `addr=:auto` to automatically use the host's private IP address on the local network, which will allow other workers on this network to connect. You can also use `port=0` to let the OS choose a random free port for you (some systems may not support this). Once created, printing the `ElasticManager` object prints the command which you can run on workers to connect them to the master, e.g.:

```jl
using ClusterManagers
em=ElasticManager(cookie="foobar")
```julia
julia> em = ElasticManager(addr=:auto, port=0)
ElasticManager:
Active workers : []
Number of workers to be added : 0
Terminated workers : []
Worker connect command :
/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)'
```

and launch each worker locally as
`echo "using ClusterManagers; ClusterManagers.elastic_worker(\"foobar\")" | julia &`

or if you want a REPL on the worker, you can start a julia process normally and manually enter
```jl
using ClusterManagers
@schedule ClusterManagers.elastic_worker("foobar", "addr_of_master", port_of_master; stdout_to_master=false)
```
By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor.

The above will yield back the REPL prompt and also display any printed output locally.
Once workers are connected, you can print the `em` object again to see them added to the list of active workers.
1 change: 1 addition & 0 deletions src/ClusterManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ClusterManagers

using Distributed
using Sockets
using Pkg

export launch, manage, kill, init_worker, connect
import Distributed: launch, manage, kill, init_worker, connect
Expand Down
47 changes: 45 additions & 2 deletions src/elastic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ struct ElasticManager <: ClusterManager
terminated::Set{Int} # terminated worker ids
topology::Symbol
sockname
printing_kwargs

function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)
function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
Distributed.init_multi()
cookie !== nothing && cluster_cookie(cookie)

if addr == :auto
addr = get_private_ip()
end

l_sock = listen(addr, port)

lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock))
lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs)

@async begin
while true
Expand Down Expand Up @@ -113,6 +118,9 @@ function Base.show(io::IO, mgr::ElasticManager)
seek(iob, position(iob)-1)
println(iob, "]")

println(iob, " Worker connect command : ")
print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...))

print(io, String(take!(iob)))
end

Expand All @@ -125,3 +133,38 @@ function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=tr
stdout_to_master && redirect_stdout(c)
start_worker(c, cookie)
end


function get_private_ip()
if Sys.islinux()
cmd = `hostname --ip-address`
elseif Sys.isapple()
cmd = `ipconfig getifaddr en0`
else
error("`addr=:auto` is only supported on Linux and Mac")
end
try
return IPv4(first(split(strip(read(cmd, String)))))
catch err
error("""Failed to automatically get host's IP address (output below). Please specify `addr=` explicitly.
\$ $(repr(cmd))
$err
""")
end
end

function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true)

ip = string(em.sockname[1])
port = convert(Int,em.sockname[2])
cookie = cluster_cookie()
exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia"
project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : ()

join([
exename,
project...,
"-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port)'"
]," ")

end
18 changes: 17 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,2 +1,18 @@
# only package loading for now
using Test
using ClusterManagers

TIMEOUT = 10.

@testset "ElasticManager" begin

em = ElasticManager(addr=:auto, port=0)

# launch worker
run(`sh -c $(ClusterManagers.get_connect_cmd(em))`, wait=false)

# wait at most TIMEOUT seconds for it to connect
@test :ok == timedwait(TIMEOUT) do
length(em.active) == 1
end

end

0 comments on commit 9d727f7

Please sign in to comment.