Skip to content

Commit

Permalink
Improve worker launch thread-safety
Browse files Browse the repository at this point in the history
The `launched_q` array is filled concurrently by the launcher tasks for each
worker. Now it's wrapped inside `Base.Lockable` for thread-safety.
  • Loading branch information
JamesWrigley committed Jan 22, 2025
1 parent 91fe613 commit 877be4f
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)

# References to launched workers, filled when each worker is fully initialized and
# has connected to all nodes.
launched_q = Int[] # Asynchronously filled by the launch method
launched_q = Base.Lockable(Int[]) # Asynchronously filled by the launch method

# The `launch` method should add an object of type WorkerConfig for every
# worker launched. It provides information required on how to connect
Expand Down Expand Up @@ -522,7 +522,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
remote_do(set_valid_processes, pid, all_w)
end

sort!(launched_q)
sort!(@lock launched_q launched_q[])
end

function set_valid_processes(plist::Array{Int})
Expand Down Expand Up @@ -551,7 +551,7 @@ default_addprocs_params() = Dict{Symbol,Any}(

function setup_launched_worker(manager, wconfig, launched_q)
pid = create_worker(manager, wconfig)
push!(launched_q, pid)
@lock launched_q push!(launched_q[], pid)

# When starting workers on remote multi-core hosts, `launch` can (optionally) start only one
# process on the remote machine, with a request to start additional workers of the
Expand Down Expand Up @@ -589,7 +589,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
Threads.@spawn Threads.threadpool() begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
@lock launched_q push!(launched_q[], pid)
end
end
end
Expand Down

0 comments on commit 877be4f

Please sign in to comment.