Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ElasticManager more convenient #136

Merged
merged 2 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.4.0"
[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Expand Down
43 changes: 17 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ You can also write your own custom cluster manager; see the instructions in the

### Slurm: a simple example

```jl
```julia
using ClusterManagers

# Arguments to the Slurm srun(1) command can be given as keyword
Expand Down Expand Up @@ -52,7 +52,7 @@ end

### SGE - a simple interactive example

```jl
```julia
julia> using ClusterManagers

julia> ClusterManagers.addprocs_sge(5)
Expand All @@ -79,7 +79,7 @@ julia> From worker 2: compute-6

Some clusters require the user to specify a list of required resources. For example, it may be necessary to specify how much memory will be needed by the job - see this [issue](https://github.com/JuliaLang/julia/issues/10390).

```jl
```julia
julia> using ClusterManagers

julia> addprocs_sge(5,res_list="h_vmem=4G,tmem=4G")
Expand Down Expand Up @@ -136,36 +136,27 @@ The `ElasticManager` is useful in scenarios where we want to dynamically add wor
It achieves this by listening on a known port on the master. The launched workers connect to this
port and publish their own host/port information for other workers to connect to.

##### Usage

On the master, you need to instantiate an instance of `ElasticManager`. The constructors defined are:
```jl
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)

```julia
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
```

On the worker, you need to call `ClusterManagers.elastic_worker` with the addr/port that the master
is listening on and the same cookie. `elastic_worker` is defined as:
```
ClusterManagers.elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
```

For example, on the master:
On Linux, you can set `addr=:auto` to automatically use the host's private IP address on the local network, which will allow other workers on this network to connect. You can also use `port=0` to let the OS choose a random free port for you (some systems may not support this). Once created, printing the `ElasticManager` object prints the command which you can run on workers to connect them to the master, e.g.:

```jl
using ClusterManagers
em=ElasticManager(cookie="foobar")
```julia
julia> em = ElasticManager(addr=:auto, port=0)
ElasticManager:
Active workers : []
Number of workers to be added : 0
Terminated workers : []
Worker connect command :
/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)'
```

and launch each worker locally as
`echo "using ClusterManagers; ClusterManagers.elastic_worker(\"foobar\")" | julia &`

or if you want a REPL on the worker, you can start a julia process normally and manually enter
```jl
using ClusterManagers
@schedule ClusterManagers.elastic_worker("foobar", "addr_of_master", port_of_master; stdout_to_master=false)
```
By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor.

The above will yield back the REPL prompt and also display any printed output locally.
Once workers are connected, you can print the `em` object again to see them added to the list of active workers.
1 change: 1 addition & 0 deletions src/ClusterManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ClusterManagers

using Distributed
using Sockets
using Pkg

export launch, manage, kill, init_worker, connect
import Distributed: launch, manage, kill, init_worker, connect
Expand Down
37 changes: 35 additions & 2 deletions src/elastic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ struct ElasticManager <: ClusterManager
terminated::Set{Int} # terminated worker ids
topology::Symbol
sockname
printing_kwargs

function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)
function ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there are only two allowed printing_kwargs here, a tuple seems like a weird way to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just didn't want to balloon the number of keyword arguments for ElasticManager and also ElasticManager(..., printing_kwargs=(same_project=true,)) implies same_project=true applies just to printing but ElasticManager(..., same_project=true) might not. But I'm happy to change it however you see fit.

Distributed.init_multi()
cookie !== nothing && cluster_cookie(cookie)

if addr == :auto
addr = get_private_ip()
end

l_sock = listen(addr, port)

lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock))
lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), printing_kwargs)

@async begin
while true
Expand Down Expand Up @@ -113,6 +118,9 @@ function Base.show(io::IO, mgr::ElasticManager)
seek(iob, position(iob)-1)
println(iob, "]")

println(iob, " Worker connect command : ")
print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...))

print(io, String(take!(iob)))
end

Expand All @@ -125,3 +133,28 @@ function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=tr
stdout_to_master && redirect_stdout(c)
start_worker(c, cookie)
end


function get_private_ip()
if Sys.islinux()
IPv4(strip(read(`hostname --ip-address`, String)))
else
error("addr=:auto is only supported on Linux")
end
end

function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true)

ip = string(em.sockname[1])
port = convert(Int,em.sockname[2])
cookie = cluster_cookie()
exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia"
project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : ()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this one function is sufficient reason to take a dependency on Pkg. And how sure are we that that implementation is stable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking this over. I'm not sure I understand the issue with taking a dependency. Everyone's Julia installation will always include Pkg and load time is tiny. Re stability, I can say at least that Pkg.API.Context().env.project_file seems to be the way Pkg itself looks up the current package to e.g. print when you do Pkg.status().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is not so much about loading time, and more about maintenance burden. The reason I ask about stability is because, if it's not exported/part of the stable API, then updates are free to break it without consideration. Given that this package doesn't have great (or any really) tests yet, it may be particularly important that we don't add too much maintenance burden.

That said, I'm not the one to approve/reject, just wanted to flag it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as of julia 1.4 there is also Pkg.project().name. for what version will Pkg.API.Context().env.project_file work?

Copy link
Contributor Author

@marius311 marius311 May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ for ver in 1.0 1.1 1.2 1.3 1.4; do docker run --rm julia:$ver julia -e "using Pkg; @show Pkg.API.Context().env.project_file"; done
(((Pkg.API).Context()).env).project_file = "/root/.julia/environments/v1.0/Project.toml"
(((Pkg.API).Context()).env).project_file = "/root/.julia/environments/v1.1/Project.toml"
(Pkg.API.Context()).env.project_file = "/root/.julia/environments/v1.2/Project.toml"
(Pkg.API.Context()).env.project_file = "/root/.julia/environments/v1.3/Project.toml"
(Pkg.API.Context()).env.project_file = "/root/.julia/environments/v1.4/Project.toml"

Everything back to the lowest supported version by this package (1.0) looks good, as does nightly which I just checked by hand.


join([
exename,
project...,
"-e 'using ClusterManagers; ClusterManagers.elastic_worker(\"$cookie\",\"$ip\",$port)'"
]," ")

end