Skip to content

Commit

Permalink
Keep most 'client oriented' tasks in the same threadpool
Browse files Browse the repository at this point in the history
This is to avoid them accidentally running in another (potentially busy)
threadpool.
  • Loading branch information
JamesWrigley authored and IanButterworth committed Jan 21, 2025
1 parent 56329d5 commit 90041ca
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
22 changes: 11 additions & 11 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
t = Threads.@spawn exec_conn_func(w)
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
else
# route request via node 1
t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand Down Expand Up @@ -194,7 +194,7 @@ function wait_for_conn(w)
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn begin
T = Threads.@spawn Threads.threadpool() begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
Expand Down Expand Up @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = Threads.@spawn readline(io)
readtask = Threads.@spawn Threads.threadpool() readline(io)
yield()
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy)
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
Threads.@spawn begin
Threads.@spawn Threads.threadpool() begin
sleep(1)
notify(launch_ntfy)
end
Expand All @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
Threads.@spawn setup_launched_worker(manager, wconfig, launched_q)
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
wconfig.port = port

let wconfig=wconfig
Threads.@spawn begin
Threads.@spawn Threads.threadpool() begin

Check warning on line 595 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L595

Added line #L595 was not covered by tests
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -1050,13 +1050,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = Threads.@spawn _rmprocs(pids, typemax(Int))
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))

Check warning on line 1053 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1053

Added line #L1053 was not covered by tests
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return Threads.@spawn nothing
return Threads.@spawn Threads.threadpool() nothing
end
end

Expand Down Expand Up @@ -1239,7 +1239,7 @@ function interrupt(pids::AbstractVector=workers())
@assert myid() == 1
@sync begin
for pid in pids
Threads.@spawn interrupt(pid)
Threads.@spawn Threads.threadpool() interrupt(pid)

Check warning on line 1242 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1242

Added line #L1242 was not covered by tests
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
for _ in 1:run_locally
Threads.@spawn Core.eval(m, ex)
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
end
end
nothing
Expand Down Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
Expand Down
4 changes: 2 additions & 2 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
Threads.@spawn try
Threads.@spawn Threads.threadpool() try

Check warning on line 181 in src/managers.jl

View check run for this annotation

Codecov / codecov/patch

src/managers.jl#L181

Added line #L181 was not covered by tests
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down Expand Up @@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

timer_task = Threads.@spawn begin
timer_task = Threads.@spawn Threads.threadpool() begin
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
Expand Down
2 changes: 1 addition & 1 deletion src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ function process_worker(rr)
msg = (remoteref_id(rr), myid())

# Needs to acquire a lock on the del_msg queue
T = Threads.@spawn begin
T = Threads.@spawn Threads.threadpool() begin
publish_del_msg!($w, $msg)
end
Base.errormonitor(T)
Expand Down

0 comments on commit 90041ca

Please sign in to comment.